Apache Camel has been solving enterprise integration on the JVM since 2007 — 22k stars, 300+ transports, hundreds of production deployments at banks, telcos, governments. The .NET ecosystem never got a real equivalent. MassTransit and Wolverine cover message-bus and saga scenarios well, but they aren't pipeline engines and they don't pretend to be.
redb.Route is the missing piece: a fluent C# DSL that wires Kafka, RabbitMQ, Redis, SQL, HTTP, gRPC, SFTP, MQTT, S3 and 14 more transports through From → Process → To pipelines, with 30+ Enterprise Integration Patterns and a compiled expression engine. Apache 2.0, .NET 8 / 9 / 10. This post is a technical walkthrough.
The shape of every pipeline
Every redb.Route pipeline is From → [processors] → To. Messages flow as IExchange instances carrying a body, headers, and properties.
From("kafka://orders?groupId=svc&brokers=localhost:9092")
.Filter(Header("type").isEqualTo("new"))
.To("rabbitmq://events?host=localhost");
For complex routing, group routes into a RouteBuilder:
public class OrderRoutes : RouteBuilder
{
protected override void Configure()
{
From("kafka://orders?groupId=svc&brokers=localhost:9092")
.RouteId("order-pipeline")
.Choice()
.When(Header("priority").isEqualTo("high"))
.Log("High priority order")
.To("direct://fast-lane")
.When(Header("priority").isEqualTo("low"))
.To("seda://batch-queue")
.Otherwise()
.To("direct://standard")
.EndChoice();
From("direct://fast-lane")
.Retry(3)
.SetHeader("processed-at", e => DateTimeOffset.UtcNow)
.Process(async (exchange, ct) =>
{
var body = exchange.In.Body as string;
exchange.In.Body = $"PROCESSED: {body}";
})
.To("rabbitmq://processed?host=localhost");
}
}
Registration:
builder.Services.AddRedbRoute(route => route.AddRouteBuilder<OrderRoutes>());
builder.Services.AddRedbRouteKafka();
builder.Services.AddRedbRouteRabbitMQ();
22 transports as first-class URI schemes
kafka:// rabbitmq:// redis:// sql://
http:// grpc:// sftp:// ftp://
mqtt:// s3:// ibmmq:// amqp://
azuresb:// elasticsearch:// firebase:// ldap://
mail:// tcp:// websocket:// signalr://
cron:// file://
+ built-in: direct:// seda:// timer:// log:// mock://
Every transport uses the same IExchange contract. Swapping Kafka for RabbitMQ means changing the From URI — the pipeline logic is unchanged.
Type-safe fluent builders are available as an alternative to URI strings:
var source = Kafka.Topic("orders")
.Brokers("broker1:9092,broker2:9092")
.GroupId("order-svc")
.Acks("All");
From(source).To("seda://internal");
EIP patterns as DSL steps
Content-Based Router — route by message content:
From("kafka://events")
.Choice()
.When(Header("type").isEqualTo("order")) .To("direct://orders")
.When(Header("type").isEqualTo("invoice")) .To("direct://invoices")
.Otherwise() .To("seda://unclassified")
.EndChoice();
Splitter + Aggregator — split a batch, process each item, re-aggregate by correlation key. Sequential by default; one method turns the split into a bounded-parallel pipeline:
From("seda://batch")
.Split(Body()) // Body() helper — split the IEnumerable body
.ParallelProcessing() // process items concurrently
.MaxDegreeOfParallelism(8) // bounded fan-out
.Process(async (ex, ct) => await EnrichOrder(ex))
.To("direct://enriched")
.EndSplit();
From("direct://enriched")
.Aggregate(Header("batch-id"), new ListAggregationStrategy())
.To("rabbitmq://processed-batches");
For true fan-out to multiple endpoints in parallel use Multicast:
From("kafka://orders")
.Multicast("direct://pricing", "direct://inventory", "direct://fraud-check");
WireTap — copy every message to an audit sink without interrupting the main flow:
From("kafka://transactions")
.WireTap("direct://audit-log")
.Process(async (ex, ct) => await ProcessTransaction(ex))
.To("rabbitmq://completed");
Idempotent Consumer — deduplicate across a cluster. Plug in the repository (in-memory, SQL, or redb.Core EAV) and the same message ID is rejected on every node:
From("kafka://payments")
.IdempotentConsumer(
e => e.In.GetHeader<string>("payment-id"),
new RedbIdempotentRepository(redbService)) // cluster-wide, two-phase commit
.Process(async (ex, ct) => await ProcessPayment(ex));
Saga — pipeline-level choreography with compensating steps. If ChargePayment throws, ReleaseInventory runs automatically. No external state-machine framework required:
From("direct://checkout")
.Saga(s => s
.Step(
action: async (ex, ct) => await inventory.Reserve(ex, ct),
compensate: async (ex, ct) => await inventory.Release(ex, ct))
.Step(
action: async (ex, ct) => await payments.Charge(ex, ct),
compensate: async (ex, ct) => await payments.Refund(ex, ct))
.Step(
action: async (ex, ct) => await shipments.Create(ex, ct),
compensate: async (ex, ct) => await shipments.Cancel(ex, ct))
.OnCompletion(async (ex, ct) => await PublishOrderCompleted(ex, ct)))
.To("rabbitmq://order-confirmed");
Full catalogue: Filter, Choice, Splitter, Aggregator, Multicast, WireTap, Recipient List, Dynamic Router, Resequencer, Scatter-Gather, Claim Check, Idempotent Consumer, Saga, Circuit Breaker, Throttle, Retry, Dead Letter, Loop, Delay, Debounce, Enrich, Timeout, TryCatch, Transacted, Process, Validate. 30+ patterns, all first-class DSL.
Error handling — four composable layers
Most .NET libraries give you one mechanism for failures: a retry policy on the consumer. redb.Route exposes four, designed to compose: per-step Retry, scoped DoTry/DoCatch, route-local OnException, and global OnException declared at the RouteBuilder level. Plus DeadLetterChannel for messages that exhaust retries.
Dead Letter Channel + a DLQ sub-route that knows why it failed
The failing exception travels with the exchange. The DLQ sub-route can read it, branch on the type, log structured info, archive, retry later:
From("kafka://orders")
.DeadLetterChannel("seda://orders-dlq")
.Retry(3)
.Process(async (ex, ct) => await ProcessOrder(ex, ct))
.To("rabbitmq://processed");
// The DLQ is a real route — inspect, branch, react
From("seda://orders-dlq")
.Log("DLQ: ${header.correlationId} — ${exception.message}")
.Choice()
.When(e => e.GetException() is TimeoutException)
.Delay(TimeSpan.FromMinutes(5))
.To("seda://retry-later")
.When(e => e.GetException() is HttpRequestException)
.To("sftp://archive/http-failures/")
.Otherwise()
.To("sftp://archive/poison/")
.EndChoice();
OnException — per-exception redelivery with exponential backoff
Declared globally at RouteBuilder level (applies to every route in the builder) or scoped to a single route. Each block configures attempts, delay, backoff, and what to do when the handler succeeds:
public class OrderRoutes : RouteBuilder
{
protected override void Configure()
{
// Global — applies to every From(...) below
OnException<HttpRequestException>()
.MaximumRedeliveries(5)
.RedeliveryDelay(TimeSpan.FromSeconds(1))
.UseExponentialBackOff()
.BackOffMultiplier(2.0)
.Handled() // mark as handled — exchange continues normally
.To("seda://http-failures")
.EndOnException();
OnException<DbException>()
.MaximumRedeliveries(2)
.UseOriginalMessage() // restore original body before sending to handler
.OnWhen(e => !((DbException)e.GetException()).Message.Contains("deadlock"))
.To("seda://db-failures")
.EndOnException();
// Multiple exception types in one block
OnException(typeof(TimeoutException), typeof(SocketException))
.MaximumRedeliveries(3)
.RedeliveryDelay(TimeSpan.FromSeconds(2))
.To("seda://network-failures")
.EndOnException();
From("kafka://orders")
.To("http://payments-svc/charge");
From("kafka://shipments")
.To("http://logistics-svc/dispatch");
}
}
Handled(), Continued(), OnWhen(predicate), RetryWhile(predicate), UseOriginalMessage() — all standard Camel error-handling primitives, and none of the .NET alternatives ship them as DSL.
TryCatch — scoped try/catch/finally inside a pipeline
When only one section of a route needs special handling:
.DoTry()
.To("http://external-api/submit")
.Process(async (e, ct) => await PostProcess(e, ct))
.DoCatch<HttpRequestException>()
.Log("HTTP failure: ${exception.message}")
.To("seda://retry-queue")
.DoCatch<TimeoutException>()
.To("sftp://archive/timeouts/")
.DoFinally()
.Log("Attempt complete")
.End()
Compiled expression engine
This is the one feature that distinguishes redb.Route from both Apache Camel and every .NET alternative. Inline expressions — string templates, arithmetic, comparisons, JSONPath, XPath — are translated to real Func<IExchange, T> delegates via System.Linq.Expressions at route-build time. No interpreter, no per-message parsing.
// String templates
.SetBody(Expr("${header.orderId}-${body}"))
.SetHeader("trace", Expr("${header.source}-${header.correlationId}"))
// Pre/post-increment
.SetHeader("attempt", Expr("${header.attempt++}")) // returns old value
.SetHeader("attempt", Expr("${++header.attempt}")) // returns new value
// Arithmetic
.SetHeader("total", Expr("${header.qty * header.price}"))
.SetHeader("net", Expr("${header.gross - header.tax}"))
// Predicates — fluent on top of compiled expressions
.Filter(Expr("header.amount").isGreaterThan(1000))
.When(Header("status").isEqualTo("active"))
Apache Camel's Simple Language is interpreted at every message dispatch. MassTransit, Wolverine and NServiceBus have no expression engine at all — every conditional is a hand-written C# lambda. With redb.Route you get both: terse string DSL for configuration-driven rules and strongly typed lambdas where you want them, with the same zero-overhead delegate at the bottom.
The engine supports 9 value types and 17 predicates, and the result of every Expr(...) is cached per route. You pay for parsing once at startup.
Transactional pipelines
A pipeline can wrap several steps in a single transaction. .Transacted() opens a TransactionScope; transports that implement ITransactedAction enlist into it. That means the Kafka commit, the RabbitMQ publisher confirm, and the SQL UPDATE all succeed or fail together — no half-processed messages, no manual two-phase coordination:
From(Kafka.Topic("orders")
.Brokers("broker:9092")
.GroupId("order-svc")
.IsolationLevel("ReadCommitted")
.EnableAutoCommit(false)) // commit driven by .Transacted()
.Transacted()
.Process(async (ex, ct) => await Validate(ex, ct))
.To(Sql.Execute("INSERT INTO orders (...) VALUES (...)")
.DataSource("main")
.Transacted()) // enlists into the same scope
.To(Rabbit.Queue("order-events")
.Confirms(true)); // publisher confirm before commit
MassTransit, NServiceBus and Wolverine all solve this for their own bus, but only for the bus. redb.Route makes it work across any combination of transports that implement ITransactedAction — Kafka EOS, RabbitMQ tx channels, IBM MQ, AMQP 1.0, SQL.
Outbox without an outbox framework
The transactional outbox pattern is usually presented as a feature you opt into via a framework (MassTransit, NServiceBus, Wolverine all bundle one). In redb.Route it's just four lines composed from existing primitives — SQL polling, transactional sink, idempotent consumer:
From(Sql.Poll("SELECT id, payload FROM outbox WHERE processed = 0 LIMIT 100")
.DataSource("main")
.OnSuccess("UPDATE outbox SET processed = 1 WHERE id = ANY(@ids)")
.Transacted()) // atomic claim + publish
.Split(Body())
.IdempotentConsumer(
e => e.In.GetHeader<string>("eventId"),
new RedbIdempotentRepository(redbService)) // dedup on republish
.To(Kafka.Topic("events")
.EnableTransactionalProducer(true)
.Acks("All")); // exactly-once on the broker
No magic table conventions, no separate IOutbox interface to register, no required ORM. It's pipelines all the way down.
Request-Response — HTTP and gRPC as first-class endpoints
The same DSL that handles fire-and-forget Kafka also handles synchronous RPC. Mark the listener InOut(), set In.Body to the response anywhere in the pipeline — the HTTP transport sends it back to the caller:
public class OrderApi : RouteBuilder
{
protected override void Configure()
{
From(Http.Listen("/api/orders").Port(8080).InOut())
.Unmarshal(typeof(JsonMessageSerializer), typeof(CreateOrderRequest))
.Validate(e => (e.In.Body as CreateOrderRequest)?.Amount > 0,
"Amount must be positive")
.Process(async (e, ct) =>
{
var req = (CreateOrderRequest)e.In.Body!;
var resp = await orderService.CreateAsync(req, ct);
e.In.Body = resp; // HTTP transport returns In.Body to the caller
})
.WireTap("kafka://order-created") // audit — fire-and-forget, non-blocking
.Marshal(typeof(JsonMessageSerializer));
}
}
Replace Http.Listen with Grpc.Listen or Ws.Listen — same pipeline. RPC, validation, business logic, audit, and serialization in one declarative route. No separate controller layer, no separate consumer layer.
Testing without a broker — mock://
The mock: transport records every message it receives so unit tests can assert against an in-memory endpoint. No Kafka container, no RabbitMQ container, no Testcontainers — a plain Host and a few lines of xUnit:
[Fact]
public async Task Filter_only_forwards_new_orders()
{
var host = Host.CreateDefaultBuilder()
.ConfigureServices(s => s.AddRedbRoute(route => route.AddRoutes(r =>
{
r.From("direct://input")
.Filter(Header("type").isEqualTo("new"))
.To("mock://received");
})))
.Build();
await host.StartAsync();
var producer = host.Services.GetRequiredService<IRouteProducer>();
var mock = host.Services.GetRequiredService<MockComponent>().GetEndpoint("received")!;
await producer.SendAsync("direct://input", "payload-1",
new Dictionary<string, object> { ["type"] = "new" });
await producer.SendAsync("direct://input", "payload-2",
new Dictionary<string, object> { ["type"] = "old" });
Assert.Equal(1, mock.ReceivedCount);
Assert.Equal("payload-1", mock.ReceivedExchanges[0].In.Body as string);
}
For async routes use MockDsl.Endpoint("name").ExpectedMessageCount(n) — it awaits the expected count with a timeout. This is the Camel testing idiom and one of the reasons unit tests on Camel routes are pleasant. .NET integration libraries usually leave you to Testcontainers.
Telemetry — OpenTelemetry built in, on by default
Every step in every route emits an Activity and a Meter sample. No AddInstrumentation, no per-step manual spans, no decorator wrapping. EnableTelemetry and EnableMetrics are true out of the box — point your collector at the process and you immediately see per-route traces and per-step latency.
For named sections that should show up as their own span or counter in Grafana / Jaeger / Tempo, the DSL has Traced and Metered:
From("kafka://orders")
.Traced("order-processing") // one Activity for the whole block
.SetBody(JPath("$.order"))
.Process(async (e, ct) => await Enrich(e, ct))
.EndTraced()
.Metered("order-throughput") // counter + histogram for the block
.To("rabbitmq://processed")
.EndMetered();
// Inline form when the named span wraps a single step
From("kafka://orders")
.Traced("validate", async (e, ct) => await ValidateOrder(e, ct))
.Metered("transform", e => { e.In.Body = Transform(e); })
.To("rabbitmq://processed");
Standard metric names: redb.route.messages.processed, redb.route.messages.failed, redb.route.processing.duration — emitted per route and per named step. They drop straight into any OTel-compatible backend.
What a real production route looks like
The examples above are deliberately short. Real routes nest: HTTP listener → auth → permission check → method dispatch → business handler → audit tap. Indentation is the route hierarchy. The whole shape of the request is visible top-down:
From http://0.0.0.0:5090/api/.../settings (inOut, cors)
Process Auth.ProcessAsync
ConvertBody<string>
Choice on Header redbHttp.Method
├─ POST
│ RequirePermission EditSettingsTables
│ ProcessWithRedb HandlePost
│ WireTap → direct://audit (onPrepare + newBodyFactory)
├─ DELETE
│ RequirePermission EditSettingsTables
│ ProcessWithRedb HandleDelete
│ WireTap → direct://audit
└─ else
ProcessWithRedb HandleGet
EndChoice
Respond → HTTP caller
This is one route from the production system mentioned at the bottom of the post — settings API for a logistics admin panel:
protected override void Configure()
{
From("http:0.0.0.0:5090/api/tsum/special-rc-settings?inOut=true&cors=true&corsOrigins=*")
.RouteId("tsum-api-special-rc-settings")
.Process(Auth.ProcessAsync) // attach IPrincipal to exchange
.ConvertBody<string>()
.Choice()
.When(Header("redbHttp.Method").isEqualTo("POST"))
.Process(TsumAuthProcessor.RequirePermission(TsumPermission.EditSettingsTables))
.ProcessWithRedb((redb, ex, ct) => HandlePost(redb, ex))
.WireTap("direct://tsum-audit",
onPrepare: e => TsumAuditHelper.SetHeaders(e, "DATA_CHANGE"),
newBodyFactory: TsumAuditHelper.BuildDetailsJson)
.When(Header("redbHttp.Method").isEqualTo("DELETE"))
.Process(TsumAuthProcessor.RequirePermission(TsumPermission.EditSettingsTables))
.ProcessWithRedb((redb, ex, ct) => HandleDelete(redb, ex))
.WireTap("direct://tsum-audit",
onPrepare: e => TsumAuditHelper.SetHeaders(e, "DATA_CHANGE"),
newBodyFactory: TsumAuditHelper.BuildDetailsJson)
.Otherwise()
.ProcessWithRedb((redb, ex, ct) => HandleGet(redb, ex))
.EndChoice();
}
private async Task HandleGet(IRedbService redb, IExchange exchange)
{
var shippingPointId = ParseQueryLong(exchange, "shippingPointId")
?? ParseQueryLong(exchange, "rcId");
if (!shippingPointId.HasValue)
{
BadRequest(exchange, "shippingPointId required");
return;
}
var items = await redb.Query<SpecialRcSettings>()
.Where(s => s.ShippingPoint == shippingPointId)
.ToListAsync();
JsonRouteHelper.SetJsonBody(exchange, items
.Select(MapToDto)
.OrderBy(x => x.CustomName)
.ToList());
}
Two features worth pointing out:
-
ProcessWithRedb((redb, ex, ct) => ...)— typed access to a named redb.Core service inside the pipeline.redb.Query<SpecialRcSettings>().Where(...).ToListAsync()is just LINQ over a typed EAV scheme. No DbContext, no migrations, no separate repository class. -
WireTap("direct://tsum-audit", onPrepare: ..., newBodyFactory: ...)— the audit hop runs in parallel with the main response, gets its own headers and its own body built fresh from the exchange. The HTTP client doesn't wait for audit, but audit sees the exact state of the exchange at that step.
This is the actual shape of a real route. Method dispatch, auth, permission, business handler, audit — in one declarative block that reads top-down.
How it compares
| Apache Camel | MassTransit | NServiceBus | Wolverine | redb.Route | |
|---|---|---|---|---|---|
| Language | Java/JVM | C# | C# | C# | C# |
| Transports | 300+ | 5 | 7 | 4 | 22 |
| EIP patterns (DSL) | 80+ | ~5 | ~5 | ~5 | 30+ |
| Expression engine | Interpreted | — | — | — | Compiled |
| Transactional pipelines across transports | Yes | Bus only | Bus only | Bus only | Yes |
| Runtime container | Karaf / Camel K | Worker Service | Worker Service | Worker Service | redb.Tsak |
| License | Apache 2.0 | Apache 2.0 | Commercial (>2 endpoints) | MIT | Apache 2.0 |
MassTransit, NServiceBus and Wolverine solve a different problem — they are message-bus frameworks with handler discovery, durable sagas and managed outbox. redb.Route is a pipeline and transport integration engine. They are not mutually exclusive: use redb.Route for cross-protocol routing and transformation, MassTransit for handler-style messaging and long-running saga state.
Deploying to production — redb.Tsak
Writing RouteBuilder classes is one thing. Running them in production across multiple nodes is another.
redb.Tsak is the runtime container built for redb.Route:
- Drop a
.dllor.tpkg(ZIP + manifest) intoLibs/— Tsak loads it without restart - Hot-reload — update the file while running, zero downtime for other routes
- Cluster mode — leader election, automatic context redistribution across nodes. No ZooKeeper, no etcd. Coordination uses row locks in redb.Core.
- REST API (32 endpoints), CLI (30 commands), Blazor dashboard with per-route metrics, logs, watchdog, cluster view
You do not change a single line of RouteBuilder code to go from dotnet run to a 3-node production cluster.
Full writeup on Tsak is coming in the next article. For now: github.com/redbase-app/redb-tsak.
Current state
Running at EWS — a 30-year-old national HoReCa food distributor:
- 3-node cluster (4 cores / 8 GB / 50 GB SSD per node)
- ~150k orders/month, ~3 months stable, 10–15% CPU under full load
- Active transports: SAP, Kafka, RabbitMQ, GPS feeds, Mercury / EGAIS / Chestny Znak / FGIS Grain
27 NuGet packages (core engine + 22 transports + 5 support libraries). Apache 2.0.
It's not Apache Camel — not in transport count, not in maturity, not in ecosystem. But for the kind of integration work most .NET teams actually ship, it covers the ground a single library reasonably can. Honest feedback, missing transports, and breaking-case bug reports are all welcome.
Links
| redb.Route | github.com/redbase-app/redb-route |
| redb.Tsak | github.com/redbase-app/redb-tsak |
| Architecture | redbase.app/architecture |
| Discussions | github.com/redbase-app/redb-route/discussions |
Over to you
This is exactly the moment when honest outside input is worth more than another internal sprint. A few specific things I'd love to hear:
- Transports. 22 cover most stacks I've seen, but obvious gaps remain. NATS / NATS JetStream? Pulsar? Service Bus topics with sessions? Google Pub/Sub? SQS+SNS as a pair? Salesforce Streaming API? OPC UA? Tell me what would make redb.Route a real fit for your stack.
- EIP patterns. 30+ ship today. Anything from the Hohpe/Woolf catalogue you'd actually use and can't easily get elsewhere in .NET? Message Store with replay? Routing Slip? Process Manager? Normalizer? Format Indicator?
-
DSL ergonomics. Anything in the examples above that reads awkwardly in C#? Where would
[Source]/[Sink]attributes, source generators, minimal-API-styleMapRoute("/orders"), or top-level statement DSL feel better thanRouteBuilderclasses? - Observability. OpenTelemetry is built in and on by default; Grafana dashboards and the live metrics / logs UI live in redb.Tsak (full writeup in the next article). What's still missing from your point of view — opinionated dashboard JSON, a turnkey OTel collector recipe, exemplars wired to trace IDs?
- Migration. If you have an existing Apache Camel route in production, would a side-by-side translation walkthrough (Camel Java → redb.Route C#) be useful? Which patterns hurt most?
- What stops you from trying it. "License is good, but…" / "I like the DSL, but…" — the but is the most valuable feedback I can get right now.
Drop a comment, open an issue, or start a thread in Discussions. Critical responses get the same priority as kind ones — both move the project forward.
Top comments (0)