10 min read

Let's build lightweight in-memory bus


When you’re building a modular monolith, reducing coupling between modules is always a challenge. Asynchronous messaging is one of the best ways to achieve this. It enables modules to communicate without being directly dependent on each other.

In distributed systems we use a message broker to provide communication between processes. But in modular monoliths, messages can simply flow in memory. This gives you asynchronous communication with no extra infrastructure. And when the system eventually grows into a distributed one, you can refactor towards an external broker without rewriting your domain logic.

In this article I’ll cover:

  • What channels are
  • How to implement simple asynchronous processing with channels
  • How to build a small library to abstract channel usage and expose a clean API

Understanding channels

Channels are an implementation of the producer/consumer model for in-memory asynchronous data processing. They allow you to pass messages between components through a thread-safe FIFO (first-in, first-out) queue.

Why channels?

  • Safe: handles concurrency for you
  • Fast: designed for high throughput
  • Simple: great fit for in-process asynchronous workflows

Let’s start with a simple example.

Using a channel directly

Suppose we have an API endpoint to register orders. After saving the order we want to publish an event so another component can send a confirmation email.

Register a channel

services.AddSingleton<Channel<OrderRegistered>>(
    _ => Channel.CreateUnbounded<OrderRegistered>(
        new UnboundedChannelOptions
        {
            SingleWriter = true,
            SingleReader = true,
            AllowSynchronousContinuations = false
        }));

Channels can be bounded (with limited capacity, producers block when full) or unbounded (no limit). Here we use unbounded for simplicity.

Publishing an event

public async Task<Guid> RegisterOrderAsync(AddOrderRequest request)
{
    var order = new Order
    {
        Id = Guid.NewGuid(),
        UserId = request.UserId
    };

    dbContext.Orders.Add(order);
    await dbContext.SaveChangesAsync();

    await channel.Writer.WriteAsync(new OrderRegistered(order.Id));
    
    return order.Id;
}

Consuming an event

public class OrderRegisteredHandler(
    Channel<OrderRegistered> channel, 
    IServiceScopeFactory scopeFactory, 
    ILogger<OrderRegisteredHandler> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var message = await channel.Reader.ReadAsync(stoppingToken);
                using var scope = scopeFactory.CreateScope();
                var orderService = scope.ServiceProvider.GetRequiredService<IOrdersService>();
                var order = await orderService.GetOrderAsync(message.Id);
                await SendEmail(order);
            }
            catch (Exception e)
            {
                logger.LogError(e, "Exception thrown in OrderRegisteredHandler");
            }
        }
    }

    private async Task SendEmail(Order order)
    {
        // Sending email...
    }
}

This works, but doesn’t scale. With multiple events, you’d either need:

  • one channel + background service per event, or
  • a single channel with switch/case logic to handle event types.

Both approaches get messy. Let’s fix that by building an abstraction: an in-memory bus.

Abstracting the bus

We’ll create a small library with these components:

  • Bus - publishes events into the channel
  • Handler contract - every handler implements IHandler<T>
  • Message dispatcher - finds the correct handler for an event and invokes it
  • Message processor - background service that pulls messages and passes them to the dispatcher

Bus

Bus with Publish<T> method implementation

public interface IBus
{
    Task Publish<T>(T message) where T : class;
}

public class Bus(Channel<object> channel, ILogger<Bus> logger) : IBus
{
    public async Task Publish<T>(T message) where T : class
    {
        logger.LogTrace("Publishing event of type {MessageType} to channel", typeof(T).Name);
        await channel.Writer.WriteAsync(message);
        logger.LogTrace("Published event of type {MessageType} to channel", typeof(T).Name);
    }
}

Handler contract

Simple contract that our handlers will implement in order to process messages.

public interface IHandler<T>
{
    /// <summary>
    /// Handles the specified message.
    /// </summary>
    /// <param name="message">The message to handle.</param>
    Task Handle(T message, CancellationToken stoppingToken);
}

Message dispatcher

As mentioned before message dispatcher should grap event, find correct handler and invoke it.

public class MessageDispatcher
{
    private readonly IServiceProvider _serviceProvider;
    
    public MessageDispatcher(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
    }
    
    public async Task DispatchAsync(object message, CancellationToken token)
    {
        if (message == null)
        {
            throw new ArgumentNullException(nameof(message));
        }
        
        var messageType = message.GetType();
        var handlerType = typeof(IHandler<>).MakeGenericType(messageType);
        
        using var scope = _serviceProvider.CreateScope();
        var handler = scope.ServiceProvider.GetService(handlerType);

        if (handler == null)
        {
            throw new InvalidOperationException($"No handler registered for message type {messageType.Name}");
        }

        var handleMethod = handlerType.GetMethod("Handle");
        if (handleMethod == null)
        {
            throw new InvalidOperationException($"Handler for {messageType.Name} does not implement Handle method.");
        }

        await (Task) handleMethod.Invoke(handler, new[] { message, token });
    }
}

Message processor

Finally the last part - Message Processor which is background service that reads events from the channel and passes it to Message Dispatcher:

public class MessageProcessor(
    Channel<object> channel, 
    MessageDispatcher messageDispatcher, 
    ILogger<MessageProcessor> logger) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var message = await channel.Reader.WaitToReadAsync(stoppingToken);
                try
                {
                    await messageDispatcher.DispatchAsync(message, stoppingToken);
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, "Error processing message of type {MessageType}", message.GetType().Name);
                }
            }
            
        }
        catch (Exception ex) when (ex is not OperationCanceledException)
        {
            logger.LogError(ex, "Exception occured in MessageProcessor");
        }
    }
}

Registering the library in DI

As final part we have to add setup for all of those components so let’s declare Service Collection extensions to register the bus components in DI. First let’s add Configurator class that implements method RegisterHandlers that should:

  • Find classes that implements IHandler<> in assembly and add those handlers as transient services
  • Register channel
  • Register Bus instance
  • Register Message Dispatcher
  • Register Message Processor
public class BusConfigurator(IServiceCollection services)
{
    public void RegisterHandlers(Assembly assembly)
    {
        var handlers = assembly.FindHandlers();
        foreach (var handler in handlers)
        {
            var interfaces = handler.GetInterfaces();
            foreach (var @interface in interfaces)
            {
                if (@interface.IsGenericType && @interface.GetGenericTypeDefinition() == typeof(IHandler<>))
                {
                    services.AddTransient(@interface, handler);
                }
            }
        }
        
        services.AddSingleton<Channel<object>>(_ => Channel.CreateUnbounded<object>(new UnboundedChannelOptions
        {
            SingleWriter = true,
            SingleReader = true,
            AllowSynchronousContinuations = false
        }));

        services.AddSingleton<IBus, Bus>();
        services.AddSingleton<MessageDispatcher>();
        services.AddHostedService<MessageProcessor>();
    }
}

I’ve added simple assembly extension method to hide the boilerplate code for scanning the assemblies for handlers:

public static Type[] FindHandlers(this Assembly assembly)
{
    var handlers = assembly
        .GetTypes()
        .Where(t => t.GetInterfaces()
            .Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandler<>)))
        .ToArray();

    return handlers;
}

Extension for DI:

public static class BusCollectionExtensions
{
    public static void AddBus(this IServiceCollection services, Action<BusConfigurator> configure)
    {
        var configurator = new BusConfigurator(services);
        configure.Invoke(configurator);
    }
}

It looks we have all the components implemented. Now let’s see how to use library in our example.

Using the bus in the example

Register the Bus:

builder.Services.AddBus(configurator =>
{
    configurator.RegisterHandlers(typeof(Program).Assembly);
});

Publish an event after saving an order:

public async Task<Guid> RegisterOrderAsync(AddOrderRequest request)
{
    var order = new Order
    {
        Id = Guid.NewGuid(),
        UserId = request.UserId
    };

    dbContext.Orders.Add(order);
    await dbContext.SaveChangesAsync();

    await bus.Publish(new OrderRegistered(order.Id));

    return order.Id;
}

Create a handler:

public class OrderRegisteredHandler(IOrdersService service, ILogger<OrderRegisteredHandler> logger) : IHandler<OrderRegistered>
{
    public Task Handle(OrderRegistered message, CancellationToken stoppingToken)
    {
        logger.LogDebug("Handling OrderRegistered event for OrderId: {OrderId}", message.Id);
        return Task.CompletedTask;
    }
}

Final thoughts

Channels are a great way to add asynchronous in-memory communication to a modular monolith. They reduce coupling without external dependencies and give you an easy path to evolve into a distributed system later.

This example is intentionally simple just to show the idea of in-memory message transport. To make it production-ready you’d likely add:

  • Prevalidation if all consumers and events are registered - avoid throwing configuration-related exceptions during runtime
  • Durability - prevent message loss in case of crashes
  • Retryability - handle transient failures
  • Outbox pattern - making sure saving data and publishing event is atomic operation

Thanks for reading. I hope this gave you some inspiration to try channels in your projects.