Background
This post builds on the work developed in this post; Background Message Processing in Dotnet 6. We extend the Messenger<T>
and BackgroundMessenger<T>
concepts into a new MessageRouter
that takes a single InternalMessage
and transforms and routes it to the corresponding BackgroundMessenger
that publishes the specified message to its intended audience.
Challenge
Let's say we have a basic CRUD api consisting of Commands
Queries
and Handlers
. When ever we Create
, Update
or Delete
an entity in our domain we want a corresponding message to be sent but we recognize that the Handlers
of these Commands
shouldn't be concerned with the messaging aspects of the event. Our producers, the Handlers
, should only have to be concerned with their intended operation, Create
, Update
or Delete
and with the contextual knowledge each of those operations has they should be able to emit a generic event with a payload and a type. We can let another system worry about where the individual type of message gets formed and routed and keep our producers free of all but the simplest declarative logic to create an InternalMessage
assigned the correct type.
Solution
We'll build out a pattern for solving this challenge based on several interrelated components.
The Internal Message
The first component is the InternalMessage
, this message captures the data and the context the event handler already has and submits it to our internal messaging infrastructure. We define an abstract
InternalMessage
record that takes a type enum and a payload.
public abstract record InternalMessage<TEnum, TPayload>(TEnum Type, TPayload Payload)
where TEnum : Enum
where TPayload : class;
Our InternalMessage
is comprised of an enum defining our message types and the payload representing our entity that's been acted upon. For our example we define a SampleEntity
and its related InternalMessage
and Types
.
public record SampleEntity(int Id, string Name);
public record SampleHubInternalMessage(SampleHubMessageType Type, SampleEntity Payload)
: InternalMessage<SampleHubMessageType, SampleEntity>(Type, Payload);
public enum SampleHubMessageType
{
Created = 1,
Updated = 2,
Deleted = 3
}
The Messages We'll Produce
For our CRUD example we'll use an Azure SignalR Hub
as our messaging platform but any other medium would work. We define a HubClient
with strongly typed events for each of our operations.
public interface ISampleHubClient
{
Task EntityCreated(EntityCreatedMessage message);
Task EntityUpdated(EntityUpdatedMessage message);
Task EntityDeleted(EntityDeletedMessage message);
}
For each of our HubClient
methods we define a strongly typed message representing the event that took place.
public record EntityCreatedMessage(SampleEntity Entity, string Metadata);
public record EntityUpdatedMessage(SampleEntity Entity, string Metadata);
public record EntityDeletedMessage(int Id);
Note how we only return the Id
for the EntityDeleted
event since in this example the whole deleted entity is not necessary, nor is the any Metadata
that might be associated with entity.
The Message Factory
Next we have the MessageFactory
. The MessageFactory
is what produces our resultant messages for a given type. We define the MessageFactory
with a simple interface.
public interface IMessageFactory<TEnum, TPayload, TInternalMessage>
where TEnum : Enum
where TPayload : class
where TInternalMessage : InternalMessage<TEnum, TPayload>
{
ImmutableDictionary<TEnum, Func<TInternalMessage, IAsyncEnumerable<object>>> MessageFactoryMap { get; }
}
With this interface we expose a map between our message type enum and a function that takes our InternalMessage
and produces an asynchronous stream of messages. But why define the result as an asynchronous stream? This choice gives some benefits but in summary it's an asynchronous covariant collection of our resultant messages. This provides us with:
- Async: Allows us to produce our messages asynchronously in the case we need some external resource to fully form the resultant messages.
- Collection: Allows us to produce multiple output messages for a single
InternalMessage
. - Covariant: Allows us to assign our more derived message type to the underlying
object
collection.
So, our example implementation of a MessageFactory
for our SampleHub
looks like this.
public class SampleHubMessageFactory : IMessageFactory<SampleHubMessageType, SampleEntity, SampleHubInternalMessage>
{
private readonly IMetadataRepository metadataRepository;
public SampleHubMessageFactory(IMetadataRepository metadataRepository)
{
this.metadataRepository = metadataRepository ?? throw new ArgumentNullException(nameof(metadataRepository));
MessageFactoryMap = new Dictionary<SampleHubMessageType, Func<SampleHubInternalMessage, IAsyncEnumerable<object>>>()
{
{ SampleHubMessageType.Created, msg => GetCreatedMessage(msg) },
{ SampleHubMessageType.Updated, msg => GetUpdatedMessage(msg) },
{ SampleHubMessageType.Deleted, msg => GetDeletedMessage(msg) }
}.ToImmutableDictionary();
}
public ImmutableDictionary<SampleHubMessageType, Func<SampleHubInternalMessage, IAsyncEnumerable<object>>> MessageFactoryMap { get; }
public async IAsyncEnumerable<EntityCreatedMessage> GetCreatedMessage(SampleHubInternalMessage internalMessage)
{
var metadata = await metadataRepository.GetMetadadataAsync(internalMessage.Payload.Id);
yield return new EntityCreatedMessage(internalMessage.Payload, metadata);
}
public async IAsyncEnumerable<EntityUpdatedMessage> GetUpdatedMessage(SampleHubInternalMessage internalMessage)
{
var metadata = await metadataRepository.GetMetadadataAsync(internalMessage.Payload.Id);
yield return new EntityUpdatedMessage(internalMessage.Payload, metadata);
}
public IAsyncEnumerable<EntityDeletedMessage> GetDeletedMessage(SampleHubInternalMessage internalMessage)
=> new[] { new EntityDeletedMessage(internalMessage.Payload.Id) }.ToAsyncEnumerable();
}
In this example we showcase the ability to inject dependencies, that would have otherwise been injected into our message producers, directly into our MessageFactory
. Remember that the goal of all this is to keep our message producers, the Handlers
, Services
, Controllers
etc. free of anything but the most fundamental knowledge of messaging. They simply package up the data they have, given the context of a message Type
and send it on it's way. In this example we use our MessageFactory
to gather some extra metadata about the entity before packaging it up and returning to our MessageRouter
.
The Message Router
The MessageRouter
reads from an IMessengerReader<TInternalMessage>
and invokes the corresponding MessageFactory
method before writing the result to a IMessengerWriter<TMessage>
.
public class MessageRouter<TEnum, TPayload, TInternalMessage>
: BackgroundService
where TEnum : Enum
where TPayload : class
where TInternalMessage : InternalMessage<TEnum, TPayload>
{
private readonly ILogger<MessageRouter<TEnum, TPayload, TInternalMessage>> logger;
private readonly IMessengerReader<TInternalMessage> messenger;
private readonly ImmutableDictionary<TEnum, Func<TInternalMessage, IAsyncEnumerable<object>>> messageFactoryMap;
private readonly IServiceProvider serviceProvider;
private ConcurrentDictionary<Type, dynamic> writerCache = new ConcurrentDictionary<Type, dynamic>();
public MessageRouter(
ILogger<MessageRouter<TEnum, TPayload, TInternalMessage>> logger,
IMessengerReader<TInternalMessage> messenger,
IMessageFactory<TEnum, TPayload, TInternalMessage> messageFactory,
IServiceProvider serviceProvider)
{
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.messenger = messenger ?? throw new ArgumentNullException(nameof(messenger));
messageFactoryMap = messageFactory?.MessageFactoryMap ?? throw new ArgumentNullException(nameof(messageFactoryMap));
this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await MultiplexInternalMessagesAsync(stoppingToken);
}
catch (OperationCanceledException)
{
}
finally
{
await messenger.Shutdown();
}
}
private async ValueTask MultiplexInternalMessagesAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
stoppingToken.ThrowIfCancellationRequested();
if (await messenger.WaitToReadAsync(stoppingToken))
{
stoppingToken.ThrowIfCancellationRequested();
await foreach (var internalMessage in messenger.ReadAllAsync(stoppingToken))
{
stoppingToken.ThrowIfCancellationRequested();
await GenerateMessagesAndWriteAsync(internalMessage);
}
}
}
}
private async ValueTask GenerateMessagesAndWriteAsync(TInternalMessage internalMessage)
{
if (messageFactoryMap.ContainsKey(internalMessage.Type))
{
var externalMessages = messageFactoryMap[internalMessage.Type](internalMessage);
await foreach (var externalMessage in externalMessages)
{
var writerType = typeof(IMessengerWriter<>).MakeGenericType(externalMessage.GetType());
if (!writerCache.TryGetValue(writerType, out dynamic writer))
{
writer = serviceProvider.GetRequiredService(writerType);
writerCache.TryAdd(writerType, writer);
}
writer.WriteAsync((dynamic)externalMessage);
logger.LogInformation($"Sent {JsonSerializer.Serialize(externalMessage)} to {typeof(IMessengerWriter<>).Name}<{writerType.GenericTypeArguments[0].Name}>");
}
}
}
}
The MessageRouter
listens for any InternalMessages
as defined by it's type parameters through an IMessengerReader<TInternalMessage>
interface. Then, using the injected MessageFactory
transforms the InternalMessage
into a message meant for consumers. Finally, a registered IMessengerWriter<TMessage>
is procured either from the cache of instances already generated or from the ServiceProvider
and the message is dispatched to all registered IPublisher<TMessage>
through a BackgroundMessenger
as discussed in this previous post.
Wiring it all up
To wire this all up, first, we register a Messenger
for each of our external message types, using the extension method provided in the previous post
services.AddMessenger<EntityCreatedMessage>(new[] { typeof(SampleHubPublisher) });
services.AddMessenger<EntityUpdatedMessage>(new[] { typeof(SampleHubPublisher) });
services.AddMessenger<EntityDeletedMessage>(new[] { typeof(SampleHubPublisher) });
Then we add our MessageRouter
and provide our MessageFactory
services.AddMessageRouter<SampleHubMessageType, SampleEntity, SampleHubInternalMessage>(typeof(SampleHubMessageFactory));
AddMessageRouter
is another extensions method defined to wire up all the necessary components of our infrastructure.
public static void AddMessageRouter<TEnum, TPayload, TInternalMessage>(this IServiceCollection services,
Type messageFactory = null,
Func<IServiceProvider, IMessageFactory<TEnum, TPayload, TInternalMessage>> factoryFactory = null)
where TEnum : Enum
where TPayload : class
where TInternalMessage : InternalMessage<TEnum, TPayload>
{
if (messageFactory is null && factoryFactory is null)
throw new ArgumentException($"Must provide a {nameof(messageFactory)}.");
if (messageFactory is not null && factoryFactory is not null)
throw new ArgumentException($"Must only provide one {nameof(messageFactory)}.");
if ((messageFactory is not null) &&
!messageFactory.GetInterfaces().Contains(typeof(IMessageFactory<TEnum, TPayload, TInternalMessage>)))
throw new ArgumentException($"{nameof(messageFactory)} must of type {typeof(IMessageFactory<,,>).Name}<{typeof(TEnum).Name},{typeof(TPayload).Name},{typeof(TInternalMessage).Name}>");
if (messageFactory is not null)
services.AddSingleton(typeof(IMessageFactory<TEnum, TPayload, TInternalMessage>), messageFactory);
else
services.AddSingleton(factoryFactory);
services.AddSingleton<Messenger<TInternalMessage>>();
services.AddSingleton<IMessengerWriter<TInternalMessage>>(sp => sp.GetRequiredService<Messenger<TInternalMessage>>());
services.AddSingleton<IMessengerReader<TInternalMessage>>(sp => sp.GetRequiredService<Messenger<TInternalMessage>>());
services.AddHostedService<MessageRouter<TEnum, TPayload, TInternalMessage>>();
}
This extension method allows us to either: provide a type for our MessageFactory
or a factory function for the MessageFactory
. Then registers all the infrastructure we need for our MessageRouter
Conclusion
Now we can see the full life-cycle of an event. From a user interaction that, say, Updated
an entity to the InternalMessage
that captured the data and context associated with that event, to the Factory
and Router
that published a fully formed message intended for consumers.
Instead of tangling messaging concerns with our Command Handlers
we have moved those responsibilities to dedicated components and kept our Handlers
free to execute only those tasks for which they are responsible. You can find all the code for this post on my GitHub Project ConcurrentFlows.HashNode