AsyncMediator Series - Part 1

AsyncMediator Series - Part 1

Decoupling of your Business Logic via Flow

Joshua Steward's photo
Joshua Steward
·Oct 22, 2022·

10 min read

Inspiration

Origination executes distinctly from Consumption

From the beginning 🤔

The idea of a Mediator is generally the go to when we want to decouple a Request from its Processing. There's already some well established and widely used mediator packages e.g. MediatR; J. Bogard, Mediator.NET; E. Ma, etc. These all quite well implement variations on the Mediator Pattern...👌. Largely we see these operating in process and not across the wire, with the exception of MassTransit - Mediator of course, but we won't go there right now.😎 Additionally we can see many of the fundamental ideas presented within the ...Query Side and ... Command Side of my architecture; S. Deursen, also of Simple Injector Rock Stardom 🎸

On the market today

Right up front; I really enjoy both these libraries but ultimately I'd like to go another direction 🤔

First, let's take a closer look; all these options are quite feature rich, and support similar architectural patterns/styles.

What about the Mediator?

Traditionally, a Mediator represents a focal point between an Originator and one or many Consumers. This allows the Originator to avoid explicitly referring to or knowing about the Consumers

Blog - Async Mediator - Typical Mediator.png

The Consumers are typically conceptualized as Handlers of particular messages and can form Pipelines or chains. Each Handler playing a specific role in the choreography. Additional and complimentary concepts can be added with associated Middleware or Behaviors, etc. promoting some aspects of loose coupling. But the call from Originator to Handler is still synchronous with regard to its Registration. The Registration couples, albeit in the background, the Originator and the Consumer and both processes are tightly working together via the Mediator.

For instance MediatR holds a reference to all of its Handlers and dispatches associated Requests and/or Notifications. Mediator.Net has a nice abstraction in place; Pipes form its central core and you can even stream responses back via the receive pipeline. 💘IAsyncEnumerable. However, all Handlers are registered and well known. Both of these methodologies yield a static topology.

These structures have the advantage when you want a response back from a particular request. But I'm not interested in call and response; receiving a message should yield another subsequent message and we'll leave any correlleation back to the origination to simply be implicit.

Schema Driven Mediator

Conceptually

Conceptually what we're after is much the same as you'd find in a distributed messaging broker: Kafka, Pulsar, RabbitMQ, etc. With this in mind we'll split our message Origination entirely from it's Consumption;

This is the key I want changed; Origination executes distinctly from Consumption

Architectural Objectives

Primary Goals

Fundamentally we want an entirely decoupled architecture; the only shared knowledge should be the message Schema.

  1. Decoupling between processing units
    • Schema Driven communication
    • The Originating Process should remain separate from the Consuming Process
  2. Consumer Declared Consumption - this is where we decouple
    • No concept of registering Handlers
    • Consumers and their associated process may come and go
  3. Broadcast & Multicast Origination
    • One or Many Originators can produce a message
    • One or Many Consumers can listen

Blog - Async Mediator - Outbound.png

Above, we can see the type of outbound Broadcast/Multicast style brokerage we're after. A decoupled, fanout of Messages, is relatively easy to achieve.

Putting it together

Foundations

Our first step is to split the Originator from Consumer and further avoid any implicit coupling. All Requests, Commands, Notifications, Responses, etc. will travel asynchronously. The building blocks of this system will be the abstract concepts of Schema, Message, and ultimately Flow.

  • Schema - Defines the shape of data/payload/behavior to be implemented by a Message
  • Message - Takes the form defined by the Schema and represents an immutable and unique representation of that Schema at a point in time
  • Flow - Is the unidirectional transmission of Messages. A single Flow only permits a single Schema

Async Message Fanout

Abstractions

Producing a Flow are our two fundamental building blocks: FlowSource & FlowSink. These complimentary components define either end of a Flow constrained to a single Schema. The Messages will be transmitted in the background of our application via a Source/Sink set of abstractions. Each is defined below:

public interface IFlowSource<TSchema>
    : IAsyncDisposable, IDisposable
    where TSchema : Envelope
{
    ValueTask<bool> EmitAsync(TSchema message, CancellationToken cancelToken = default);
}

public interface IFlowSink<TSchema>
    : IAsyncDisposable, IDisposable
    where TSchema : Envelope
{
    IAsyncEnumerable<TSchema> ConsumeAsync(CancellationToken cancelToken = default);
}

Note, both ends of our unidirectional Flow are (Async)Disposable. The unidirectional nature of the Flow leads directly to two implications.

  1. Disposing a Sink, disposes only that recipient endpoint
  2. Disposing a Source, however, closes the entirety of the Flow

Here we've also defined a constraint of Envelope, as seen, this constraint, i.e our base Schema, ensures our Messages always have a CurrentId and a CausationId. All this means is that we know where we are now and where we came from.

public abstract record Envelope(
    string CurrentId,
    string CausationId);

Simple Message passing via the twin concepts of Source/Sink allow the Origination and Consumption to be decoupled. Our Source, via an Originator, produces messages at one end and the Sink receives them into a Consumer. This takes advantage of much the same concepts of any Message Bus but applies an asynchronous distributed behavior within a single application.

Blog - Async Mediator - Source-Sink.png

Implementations

Our implementations are built upon the now standard System...Dataflow namespace containing a wide variety of Dataflow primitives. This library is quite powerful in terms of message passing and parallel processing; complimenting the typical async/await paradigm. Other implementations are of course possible;

Each has tradeoffs of course, the main driver toward Dataflow was the simplicity of separating Message Passing and Message Consuming. I don't want to force any Consumer into participating in any semantic Observable Pipeline, especially if it is instead better represented as a discrete step. Flow remains opt-in and can be as simple as injection of the Sink and reading from it. Channels of course could underpin Dataflow but natively Channels are not intended to support any kind of Broadcast/Multicast.

FlowSink

To build our Flow first we'll look at the Sink. The purpose and intention of this component is to be injected via dependency injection into any Consumer that has an interest in the defined Schema. That is; any defined Consumer that must take action upon receiving a Message with the defined Schema. I've collapsed most of the disposal for brevity; the key piece is disposing of the link back to the Source.

internal sealed class FlowSink<TSchema>
    : IFlowSink<TSchema>
    where TSchema : Envelope
{
    private BufferBlock<TSchema>? Buffer { get; set; }

    private readonly IDisposable link;
    private volatile bool isDisposed;


    public FlowSink(ILinkableSource<TSchema> source)
    {
        Buffer = new(new()
        {
            EnsureOrdered = true,
            BoundedCapacity = DataflowBlockOptions.Unbounded
        });
        link = source.LinkTo(Buffer);
    }

    public IAsyncEnumerable<TSchema> ConsumeAsync(CancellationToken cancelToken = default) 
        => Buffer.ThrowIfDisposed(isDisposed)
            .EnumerateSource(cancelToken)
            .Attempt(onError: ex =>
            {
                this.Dispose();
                return AsyncEnumerable.Empty<TSchema>();
            });

    public void Dispose() {/*...*/}
    public ValueTask DisposeAsync() {/*...*/}
    private void DisposeCore()
    {
        isDisposed = true;
        link?.Dispose();
        Buffer = null;
        GC.SuppressFinalize(this);
    }
}

Our FlowSink is built on top of an internal BufferBlock bound to our Schema. We maintain a concrete implementation of BufferBlock with the intention of later using its Count but this could well be represented as a IPropagatorBlock until specificities are necessary. Next, the only item we're dependent on for construction is an ILinkableSource<TItem> defined as follows

LinkableSource

internal interface ILinkableSource<TSchema>
    where TSchema : Envelope
{
    IDisposable LinkTo(ITargetBlock<TSchema> sink);
}

Keeping this interface internal allows us to keep this concept scoped within our package. This narrowly exposes the internal mechanism of Linking two Dataflow Blocks together. Once linked, each block will process messages as defined by its implementation and operate independently of any other within the bounds set by the creation and linking.

Lastly, we can see that the Buffer is consumed via a Disposal and Exception protected cancellable extensions to IAsyncEnumerable. While not necessary for this, we've decorated the CancellationToken as the target of [EnumeratorCancellation] for broader use cases. To keep things brief I'll leave the full implementations for review via GitHub

[return: NotNull]
public static T ThrowIfDisposed<T>(
        this T? target,
        bool isDisposed)

internal static Func<IAsyncEnumerable<TSchema>> EnumerateSource<TSchema>(
        this ISourceBlock<TSchema> source,
        CancellationToken cancelToken)

public static IAsyncEnumerable<T> Attempt<T>(
        this Func<IAsyncEnumerable<T>> iterator,
        Func<Exception, IAsyncEnumerable<T>> onError,
        Func<Exception, bool>? canHandle = default)
        where T : class

These are relatively simple extensions, although Attempt has quite the signature 🤯 But each do just what's on the tin; ThrowIfDisposed, EnumerateSource; i.e. Orchestrate the Enumerator, and finally let Attempt manage the execution.

FlowSource

Next is our Broadcast/Multicast enabled Source. This is accomplished by exposing the capabilities of a BroadcastBlock. This block clones, in our case - returns, each message received and Offers it to each Linked block. The importance of Offer is such that if a Linked block cannot take the Message; that Message is then dropped, i.e. lost forever and for good. This leads to Backpressure, another high point for choosing Dataflow yet out of scope here, but we set all Blocks with an UnboundedCapacity for simplicity to begin. So Source can be implemented as such; again with collapsed disposal, we're both Completing and then awaiting Completion of the Source during cleanup.

internal sealed class FlowSource<TSchema>
    : IFlowSource<TSchema>,
    ILinkableSource<TSchema>
    where TSchema : Envelope
{
    private BroadcastBlock<TSchema>? Source { get; set; }

    private volatile bool isDisposed;    

    public FlowSource()
        => Source = new(msg => msg,
            new()
            {
                EnsureOrdered = true,
                BoundedCapacity = DataflowBlockOptions.Unbounded
            });

    public ValueTask<bool> EmitAsync(TSchema message, CancellationToken cancelToken = default)
        => Source.ThrowIfDisposed(isDisposed)
            .OfferAsync(message, TimeSpan.FromMilliseconds(300), cancelToken)
            .Attempt(onError: ex => ValueTask.FromResult(false));



    IDisposable ILinkableSource<TSchema>.LinkTo(ITargetBlock<TSchema> sink)
        => Source.ThrowIfDisposed(isDisposed)
            .LinkTo(sink, new()
            {
                PropagateCompletion = true,
            });

    public void Dispose() { /*...*/}
    public async ValueTask DisposeAsync() { /*...*/}
    private async ValueTask DisposeAsyncCore()
    {
        isDisposed = true;
        Source?.Complete();
        await (Source?.Completion ?? Task.CompletedTask);
        Source = null;
        GC.SuppressFinalize(this);
    }
}

This implementation exposes EmitAsync and transforms the standard Task<bool> of the Block into a disposed protected and orchestrated Attempt yielding a ValueTask<bool> via simple extensions. Additionally, we separately implement the internal ILinkableSource<TItem> interface to connect any downstream Sinks. This exposes a disposed protected call into .LinkTo ensuring that Completion is propagated. With this configuration set; if the Source is disposed and thus Completed this information will flow down to all Sinks which will then exit any ongoing or new iteration upon the Sink.

Registration

With just these two components we can achieve the first two Primary Goals, albeit with a little DI trickery

  1. Decoupling between processing units
  2. Consumer Declared Consumption
public static IServiceCollection AddFlow<TSchema>(this IServiceCollection services)
    where TSchema : Envelope
{
    services.TryAddSingleton<IFlowSource<TSchema>, FlowSource<TSchema>>();
    services.TryAddTransient<IFlowSink<TSchema>>(sp =>
    {
        var source = sp.GetRequiredService<IFlowSource<TSchema>>();
        var linkable = source as ILinkableSource<TSchema>
            ?? throw new ArgumentException(
                $"Invalid FlowSource Registration. Source Type, {source.GetType().Name}, is not linkable");
        return new FlowSink<TSchema>(linkable);
    });
    return services;
}

Broadcast/Multicast

Registering the FlowSource as a singleton ensures any consumer of this interface is Emitting to the same BroadcastBlock. This allows one to many Originators to enter Messages into the flow.

Declared Consumption

Thanks to the leniency of the default Dependency Injection of dotnet, i.e. IServiceCollection/IServiceProvider; we can register our Sink as a Transient. Doing so may yield a captured dependency if the consuming service has a longer lifetime than our Sink. However, the advantage is that we ensure each Consumer receives a unique and linked instance of our Sink. In this variation we're leveraging the DI container to dynamically construct our topology. Assuming, 🤞, our Consumers dispose of the Sink properly it will be appropriately removed from the Flow topology.

Closure

With just these two primary components, FlowSource & FlowSink, we have achieved a basic decoupling and fanout of Messages. We really don't even need a specific Mediator to do this.

The next evolution is going to be tying things together via the Envelope. This will allow us to put backpressure on a Source while maintaining independence. Additionally, it will provide a vector to ack/nack our Source.

Of course, all the code for this post can be found on GitHub ConcurrentFlows.AsyncMediator

 
Share this