Let's build lightweight in-memory bus
When you’re building a modular monolith, reducing coupling between modules is always a challenge. Asynchronous messaging is one of the best ways to achieve this. It enables modules to communicate without being directly dependent on each other.
In distributed systems we use a message broker to provide communication between processes. But in modular monoliths, messages can simply flow in memory. This gives you asynchronous communication with no extra infrastructure. And when the system eventually grows into a distributed one, you can refactor towards an external broker without rewriting your domain logic.
In this article I’ll cover:
- What channels are
- How to implement simple asynchronous processing with channels
- How to build a small library to abstract channel usage and expose a clean API
Understanding channels
Channels are an implementation of the producer/consumer model for in-memory asynchronous data processing. They allow you to pass messages between components through a thread-safe FIFO (first-in, first-out) queue.
Why channels?
- Safe: handles concurrency for you
- Fast: designed for high throughput
- Simple: great fit for in-process asynchronous workflows
Let’s start with a simple example.
Using a channel directly
Suppose we have an API endpoint to register orders. After saving the order we want to publish an event so another component can send a confirmation email.
Register a channel
services.AddSingleton<Channel<OrderRegistered>>(
_ => Channel.CreateUnbounded<OrderRegistered>(
new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = true,
AllowSynchronousContinuations = false
}));
Channels can be bounded (with limited capacity, producers block when full) or unbounded (no limit). Here we use unbounded for simplicity.
Publishing an event
public async Task<Guid> RegisterOrderAsync(AddOrderRequest request)
{
var order = new Order
{
Id = Guid.NewGuid(),
UserId = request.UserId
};
dbContext.Orders.Add(order);
await dbContext.SaveChangesAsync();
await channel.Writer.WriteAsync(new OrderRegistered(order.Id));
return order.Id;
}
Consuming an event
public class OrderRegisteredHandler(
Channel<OrderRegistered> channel,
IServiceScopeFactory scopeFactory,
ILogger<OrderRegisteredHandler> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var message = await channel.Reader.ReadAsync(stoppingToken);
using var scope = scopeFactory.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrdersService>();
var order = await orderService.GetOrderAsync(message.Id);
await SendEmail(order);
}
catch (Exception e)
{
logger.LogError(e, "Exception thrown in OrderRegisteredHandler");
}
}
}
private async Task SendEmail(Order order)
{
// Sending email...
}
}
This works, but doesn’t scale. With multiple events, you’d either need:
- one channel + background service per event, or
- a single channel with switch/case logic to handle event types.
Both approaches get messy. Let’s fix that by building an abstraction: an in-memory bus.
Abstracting the bus
We’ll create a small library with these components:
- Bus - publishes events into the channel
- Handler contract - every handler implements
IHandler<T> - Message dispatcher - finds the correct handler for an event and invokes it
- Message processor - background service that pulls messages and passes them to the dispatcher
Bus
Bus with Publish<T> method implementation
public interface IBus
{
Task Publish<T>(T message) where T : class;
}
public class Bus(Channel<object> channel, ILogger<Bus> logger) : IBus
{
public async Task Publish<T>(T message) where T : class
{
logger.LogTrace("Publishing event of type {MessageType} to channel", typeof(T).Name);
await channel.Writer.WriteAsync(message);
logger.LogTrace("Published event of type {MessageType} to channel", typeof(T).Name);
}
}
Handler contract
Simple contract that our handlers will implement in order to process messages.
public interface IHandler<T>
{
/// <summary>
/// Handles the specified message.
/// </summary>
/// <param name="message">The message to handle.</param>
Task Handle(T message, CancellationToken stoppingToken);
}
Message dispatcher
As mentioned before message dispatcher should grap event, find correct handler and invoke it.
public class MessageDispatcher
{
private readonly IServiceProvider _serviceProvider;
public MessageDispatcher(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public async Task DispatchAsync(object message, CancellationToken token)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var messageType = message.GetType();
var handlerType = typeof(IHandler<>).MakeGenericType(messageType);
using var scope = _serviceProvider.CreateScope();
var handler = scope.ServiceProvider.GetService(handlerType);
if (handler == null)
{
throw new InvalidOperationException($"No handler registered for message type {messageType.Name}");
}
var handleMethod = handlerType.GetMethod("Handle");
if (handleMethod == null)
{
throw new InvalidOperationException($"Handler for {messageType.Name} does not implement Handle method.");
}
await (Task) handleMethod.Invoke(handler, new[] { message, token });
}
}
Message processor
Finally the last part - Message Processor which is background service that reads events from the channel and passes it to Message Dispatcher:
public class MessageProcessor(
Channel<object> channel,
MessageDispatcher messageDispatcher,
ILogger<MessageProcessor> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
var message = await channel.Reader.WaitToReadAsync(stoppingToken);
try
{
await messageDispatcher.DispatchAsync(message, stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing message of type {MessageType}", message.GetType().Name);
}
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogError(ex, "Exception occured in MessageProcessor");
}
}
}
Registering the library in DI
As final part we have to add setup for all of those components so let’s declare Service Collection extensions to register the bus components in DI. First let’s add Configurator class that implements method RegisterHandlers that should:
- Find classes that implements
IHandler<>in assembly and add those handlers as transient services - Register channel
- Register Bus instance
- Register Message Dispatcher
- Register Message Processor
public class BusConfigurator(IServiceCollection services)
{
public void RegisterHandlers(Assembly assembly)
{
var handlers = assembly.FindHandlers();
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (@interface.IsGenericType && @interface.GetGenericTypeDefinition() == typeof(IHandler<>))
{
services.AddTransient(@interface, handler);
}
}
}
services.AddSingleton<Channel<object>>(_ => Channel.CreateUnbounded<object>(new UnboundedChannelOptions
{
SingleWriter = true,
SingleReader = true,
AllowSynchronousContinuations = false
}));
services.AddSingleton<IBus, Bus>();
services.AddSingleton<MessageDispatcher>();
services.AddHostedService<MessageProcessor>();
}
}
I’ve added simple assembly extension method to hide the boilerplate code for scanning the assemblies for handlers:
public static Type[] FindHandlers(this Assembly assembly)
{
var handlers = assembly
.GetTypes()
.Where(t => t.GetInterfaces()
.Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandler<>)))
.ToArray();
return handlers;
}
Extension for DI:
public static class BusCollectionExtensions
{
public static void AddBus(this IServiceCollection services, Action<BusConfigurator> configure)
{
var configurator = new BusConfigurator(services);
configure.Invoke(configurator);
}
}
It looks we have all the components implemented. Now let’s see how to use library in our example.
Using the bus in the example
Register the Bus:
builder.Services.AddBus(configurator =>
{
configurator.RegisterHandlers(typeof(Program).Assembly);
});
Publish an event after saving an order:
public async Task<Guid> RegisterOrderAsync(AddOrderRequest request)
{
var order = new Order
{
Id = Guid.NewGuid(),
UserId = request.UserId
};
dbContext.Orders.Add(order);
await dbContext.SaveChangesAsync();
await bus.Publish(new OrderRegistered(order.Id));
return order.Id;
}
Create a handler:
public class OrderRegisteredHandler(IOrdersService service, ILogger<OrderRegisteredHandler> logger) : IHandler<OrderRegistered>
{
public Task Handle(OrderRegistered message, CancellationToken stoppingToken)
{
logger.LogDebug("Handling OrderRegistered event for OrderId: {OrderId}", message.Id);
return Task.CompletedTask;
}
}
Final thoughts
Channels are a great way to add asynchronous in-memory communication to a modular monolith. They reduce coupling without external dependencies and give you an easy path to evolve into a distributed system later.
This example is intentionally simple just to show the idea of in-memory message transport. To make it production-ready you’d likely add:
- Prevalidation if all consumers and events are registered - avoid throwing configuration-related exceptions during runtime
- Durability - prevent message loss in case of crashes
- Retryability - handle transient failures
- Outbox pattern - making sure saving data and publishing event is atomic operation
Thanks for reading. I hope this gave you some inspiration to try channels in your projects.