# Message Routing in Dotnet 6

# Background
This post builds on the work developed in this post; [Background Message Processing in Dotnet 6](https://concurrentflows.hashnode.dev/background-message-processing-in-dotnet-6). We extend the `Messenger<T>` and `BackgroundMessenger<T>` concepts into a new `MessageRouter` that takes a single `InternalMessage` and transforms and routes it to the corresponding `BackgroundMessenger` that publishes the specified message to its intended audience.
# Challenge
Let's say we have a basic CRUD api consisting of `Commands` `Queries` and `Handlers`. When ever we `Create`, `Update` or `Delete` an entity in our domain we want a corresponding message to be sent but we recognize that the `Handlers` of these `Commands` shouldn't be concerned with the messaging aspects of the event. Our producers, the `Handlers`, should only have to be concerned with their intended operation, `Create`, `Update` or `Delete` and with the contextual knowledge each of those operations has they should be able to emit a generic event with a payload and a type. We can let another system worry about where the individual type of message gets formed and routed and keep our producers free of all but the simplest declarative logic to create an `InternalMessage` assigned the correct type. 
# Solution
We'll build out a pattern for solving this challenge based on several interrelated components.
## The Internal Message
The first component is the `InternalMessage`, this message captures the data and the context the event handler already has and submits it to our internal messaging infrastructure. We define an `abstract` `InternalMessage` record that takes a type enum and a payload.
```
    public abstract record InternalMessage<TEnum, TPayload>(TEnum Type, TPayload Payload)
        where TEnum : Enum
        where TPayload : class;
```
Our `InternalMessage` is comprised of an enum defining our message types and the payload representing our entity that's been acted upon. For our example we define a `SampleEntity` and its related `InternalMessage` and `Types`.
```
public record SampleEntity(int Id, string Name);

public record SampleHubInternalMessage(SampleHubMessageType Type, SampleEntity Payload)
    : InternalMessage<SampleHubMessageType, SampleEntity>(Type, Payload);

public enum SampleHubMessageType
{
    Created = 1,
    Updated = 2,
    Deleted = 3
}
```
## The Messages We'll Produce
For our CRUD example we'll use an Azure SignalR `Hub` as our messaging platform but any other medium would work. We define a `HubClient` with strongly typed events for each of our operations.
```
public interface ISampleHubClient
{
    Task EntityCreated(EntityCreatedMessage message);
    Task EntityUpdated(EntityUpdatedMessage message);
    Task EntityDeleted(EntityDeletedMessage message);
}
```
For each of our `HubClient` methods we define a strongly typed message representing the event that took place.
```
public record EntityCreatedMessage(SampleEntity Entity, string Metadata);

public record EntityUpdatedMessage(SampleEntity Entity, string Metadata);

public record EntityDeletedMessage(int Id);
```
Note how we only return the `Id` for the `EntityDeleted` event since in this example the whole deleted entity is not necessary, nor is the any `Metadata` that might be associated with entity.
## The Message Factory
Next we have the `MessageFactory`. The `MessageFactory` is what produces our resultant messages for a given type. We define the `MessageFactory` with a simple interface.
```
public interface IMessageFactory<TEnum, TPayload, TInternalMessage>    
    where TEnum : Enum
    where TPayload : class
    where TInternalMessage : InternalMessage<TEnum, TPayload>
{
    ImmutableDictionary<TEnum, Func<TInternalMessage, IAsyncEnumerable<object>>> MessageFactoryMap { get; }
}
```
With this interface we expose a map between our message type enum and a function that takes our `InternalMessage` and produces an asynchronous stream of messages. But why define the result as an asynchronous stream? This choice gives some benefits but in summary it's an asynchronous covariant collection of our resultant messages. This provides us with:

- **Async:** Allows us to produce our messages asynchronously in the case we need some external resource to fully form the resultant messages.
- **Collection:** Allows us to produce multiple output messages for a single `InternalMessage`.
- **Covariant:** Allows us to assign our more derived message type to the underlying `object` collection.

So, our example implementation of a `MessageFactory` for our `SampleHub` looks like this.
```
public class SampleHubMessageFactory : IMessageFactory<SampleHubMessageType, SampleEntity, SampleHubInternalMessage>
{
    private readonly IMetadataRepository metadataRepository;

    public SampleHubMessageFactory(IMetadataRepository metadataRepository)
    {
        this.metadataRepository = metadataRepository ?? throw new ArgumentNullException(nameof(metadataRepository));

        MessageFactoryMap = new Dictionary<SampleHubMessageType, Func<SampleHubInternalMessage, IAsyncEnumerable<object>>>()
        {
            { SampleHubMessageType.Created, msg => GetCreatedMessage(msg) },
            { SampleHubMessageType.Updated, msg => GetUpdatedMessage(msg) },
            { SampleHubMessageType.Deleted, msg => GetDeletedMessage(msg) }
        }.ToImmutableDictionary();
    }

    public ImmutableDictionary<SampleHubMessageType, Func<SampleHubInternalMessage, IAsyncEnumerable<object>>> MessageFactoryMap { get; }

    public async IAsyncEnumerable<EntityCreatedMessage> GetCreatedMessage(SampleHubInternalMessage internalMessage)
    {
        var metadata = await metadataRepository.GetMetadadataAsync(internalMessage.Payload.Id);
        yield return new EntityCreatedMessage(internalMessage.Payload, metadata);
    }

    public async IAsyncEnumerable<EntityUpdatedMessage> GetUpdatedMessage(SampleHubInternalMessage internalMessage)
    {
        var metadata = await metadataRepository.GetMetadadataAsync(internalMessage.Payload.Id);
        yield return new EntityUpdatedMessage(internalMessage.Payload, metadata);
    }

    public IAsyncEnumerable<EntityDeletedMessage> GetDeletedMessage(SampleHubInternalMessage internalMessage)
        => new[] { new EntityDeletedMessage(internalMessage.Payload.Id) }.ToAsyncEnumerable();
}
```
In this example we showcase the ability to inject dependencies, that would have otherwise been injected into our message producers, directly into our `MessageFactory`. Remember that the goal of all this is to keep our message producers, the `Handlers`, `Services`, `Controllers` etc. free of anything but the most fundamental knowledge of messaging. They simply package up the data they have, given the context of  a message `Type` and send it on it's way. In this example we use our `MessageFactory` to gather some extra metadata about the entity before packaging it up and returning to our `MessageRouter`.
## The Message Router
The `MessageRouter` reads from an `IMessengerReader<TInternalMessage>` and invokes the corresponding `MessageFactory` method before writing the result to a `IMessengerWriter<TMessage>`.
```
public class MessageRouter<TEnum, TPayload, TInternalMessage>
    : BackgroundService
    where TEnum : Enum
    where TPayload : class
    where TInternalMessage : InternalMessage<TEnum, TPayload>
{
    private readonly ILogger<MessageRouter<TEnum, TPayload, TInternalMessage>> logger;
    private readonly IMessengerReader<TInternalMessage> messenger;
    private readonly ImmutableDictionary<TEnum, Func<TInternalMessage, IAsyncEnumerable<object>>> messageFactoryMap;
    private readonly IServiceProvider serviceProvider;

    private ConcurrentDictionary<Type, dynamic> writerCache = new ConcurrentDictionary<Type, dynamic>();

    public MessageRouter(
        ILogger<MessageRouter<TEnum, TPayload, TInternalMessage>> logger,
        IMessengerReader<TInternalMessage> messenger,
        IMessageFactory<TEnum, TPayload, TInternalMessage> messageFactory,
        IServiceProvider serviceProvider)
    {
        this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
        this.messenger = messenger ?? throw new ArgumentNullException(nameof(messenger));
        messageFactoryMap = messageFactory?.MessageFactoryMap ?? throw new ArgumentNullException(nameof(messageFactoryMap));
        this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        try
        {
            await MultiplexInternalMessagesAsync(stoppingToken);
        }
        catch (OperationCanceledException)
        {
        }
        finally
        {
            await messenger.Shutdown();
        }
    }

    private async ValueTask MultiplexInternalMessagesAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            stoppingToken.ThrowIfCancellationRequested();
            if (await messenger.WaitToReadAsync(stoppingToken))
            {
                stoppingToken.ThrowIfCancellationRequested();
                await foreach (var internalMessage in messenger.ReadAllAsync(stoppingToken))
                {
                    stoppingToken.ThrowIfCancellationRequested();
                    await GenerateMessagesAndWriteAsync(internalMessage);
                }
            }
        }
    }

    private async ValueTask GenerateMessagesAndWriteAsync(TInternalMessage internalMessage)
    {
        if (messageFactoryMap.ContainsKey(internalMessage.Type))
        {
            var externalMessages = messageFactoryMap[internalMessage.Type](internalMessage);
            await foreach (var externalMessage in externalMessages)
            {
                var writerType = typeof(IMessengerWriter<>).MakeGenericType(externalMessage.GetType());
                if (!writerCache.TryGetValue(writerType, out dynamic writer))
                {
                    writer = serviceProvider.GetRequiredService(writerType);
                    writerCache.TryAdd(writerType, writer);
                }
                writer.WriteAsync((dynamic)externalMessage);
                logger.LogInformation($"Sent {JsonSerializer.Serialize(externalMessage)} to {typeof(IMessengerWriter<>).Name}<{writerType.GenericTypeArguments[0].Name}>");
            }
        }
    }
}
```
The `MessageRouter` listens for any `InternalMessages` as defined by it's type parameters through an `IMessengerReader<TInternalMessage>` interface. Then, using the injected `MessageFactory` transforms the `InternalMessage` into a message meant for consumers. Finally, a registered `IMessengerWriter<TMessage>` is procured either from the cache of instances already generated or from the `ServiceProvider` and the message is dispatched to all registered `IPublisher<TMessage>` through a `BackgroundMessenger` as discussed in [this previous post](https://concurrentflows.hashnode.dev/background-message-processing-in-dotnet-6).
## Wiring it all up
To wire this all up, first, we register a `Messenger` for each of our external message types, using the extension method provided in [the previous post](https://concurrentflows.hashnode.dev/background-message-processing-in-dotnet-6)
```
services.AddMessenger<EntityCreatedMessage>(new[] { typeof(SampleHubPublisher) });
services.AddMessenger<EntityUpdatedMessage>(new[] { typeof(SampleHubPublisher) });
services.AddMessenger<EntityDeletedMessage>(new[] { typeof(SampleHubPublisher) });
```
Then we add our `MessageRouter` and provide our `MessageFactory`
```
services.AddMessageRouter<SampleHubMessageType, SampleEntity, SampleHubInternalMessage>(typeof(SampleHubMessageFactory));
```
`AddMessageRouter` is another extensions method defined to wire up all the necessary components of our infrastructure.
```
public static void AddMessageRouter<TEnum, TPayload, TInternalMessage>(this IServiceCollection services,
    Type messageFactory = null,
    Func<IServiceProvider, IMessageFactory<TEnum, TPayload, TInternalMessage>> factoryFactory = null)
    where TEnum : Enum
    where TPayload : class
    where TInternalMessage : InternalMessage<TEnum, TPayload>
{
    if (messageFactory is null && factoryFactory is null)
        throw new ArgumentException($"Must provide a {nameof(messageFactory)}.");
    if (messageFactory is not null && factoryFactory is not null)
        throw new ArgumentException($"Must only provide one {nameof(messageFactory)}.");
    if ((messageFactory is not null) &&
        !messageFactory.GetInterfaces().Contains(typeof(IMessageFactory<TEnum, TPayload, TInternalMessage>)))
        throw new ArgumentException($"{nameof(messageFactory)} must of type {typeof(IMessageFactory<,,>).Name}<{typeof(TEnum).Name},{typeof(TPayload).Name},{typeof(TInternalMessage).Name}>");

    if (messageFactory is not null)
        services.AddSingleton(typeof(IMessageFactory<TEnum, TPayload, TInternalMessage>), messageFactory);
    else
        services.AddSingleton(factoryFactory);

    services.AddSingleton<Messenger<TInternalMessage>>();
    services.AddSingleton<IMessengerWriter<TInternalMessage>>(sp => sp.GetRequiredService<Messenger<TInternalMessage>>());
    services.AddSingleton<IMessengerReader<TInternalMessage>>(sp => sp.GetRequiredService<Messenger<TInternalMessage>>());
    services.AddHostedService<MessageRouter<TEnum, TPayload, TInternalMessage>>();
}
```
This extension method allows us to either: provide a type for our `MessageFactory` or a factory function for the `MessageFactory`. Then registers all the infrastructure we need for our `MessageRouter`
# Conclusion
Now we can see the full life-cycle of an event. From a user interaction that, say, `Updated` an entity to the `InternalMessage` that captured the data and context associated with that event, to the `Factory` and `Router` that published a fully formed message intended for consumers.
Instead of tangling messaging concerns with our `Command Handlers` we have moved those responsibilities to dedicated components and kept our `Handlers` free to execute only those tasks for which they are responsible. You can find all the code for this post on my [GitHub Project ConcurrentFlows.HashNode](https://github.com/ptsteward/ConcurrentFlows.HashNode)
