AsyncMediator Series - Part 2

AsyncMediator Series - Part 2

Channeling Completion toward Decoupling

Joshua Steward's photo
Joshua Steward
·Nov 15, 2022·

12 min read

Welcome Back! 👋

So glad you made it back for Part 2, in Part 1: Broadcast Messaging - In Memory we created the implementation of something called Flow. Which is simply the broadcast of messages across many Consumers from many Originators leveraging TPL Dataflow internally.

Towards our AsyncMediator

Our next step is to introduce the concept of Message Completion. This means the Consumer indicates whether it has Completed processing or Failed to process the received message. We want to maintain the broadcast capabilities of the Flow and that feature will drive us away from our TPL Dataflow implementation.

What we'll gain is the ability to dynamically on board Consumers and have a greater fine-grained control of all Consumers. Additionally, this will permit the ability to support Queries and not limit us to Commands

Building a new flow

First, let's lay out some requirments of our new implemtntation:

  • Must allow many Originators
  • Must allow many Consumers
  • Must allow Sinks to detach from Source
  • Must only Complete an Envelope when all Consumers finish

Our Library Api

We want this new flow to be easy and simple to consume. Ideally we should have a simple registration and well known interfaces to consume. To start with our registration should look something like this, along with a sample Message.

public static IServiceCollection AddMsgChannel<TPayload>(this IServiceCollection services)
    where TPayload : notnull

public sealed record Message(int Id);

We intend to provide consumers of the library a simple interface to interact with. For example, the Originator of Messages would look something like this:

public sealed class Originator
{
    private readonly IChannelSource<Message> source;

    public Originator(IChannelSource<Message> source)
        => this.source = source;

    public async ValueTask ProduceManyAsync(int count, CancellationToken cancelToken)
    {
        var messages = Enumerable.Range(0, count)
            .Select(i => new Message(i).ToEnvelope());
        var sending = messages.Select(async msg => await source.SendAsync(msg));
        await Task.WhenAll(sending);
    }
}

And, a simple Consumer of Messages would look like this:

public sealed class Consumer
{
    private readonly IChannelSink<Message> sink;

    public Consumer(IChannelSink<Message> sink) 
        => this.sink = sink;

    public async Task<IEnumerable<Message>> CollectAllAsync(CancellationToken cancelToken)
    {
        var set = new List<Message>();
        try
        {

            await foreach (var envelope in sink.ConsumeAsync(cancelToken))
                set.Add(envelope.Payload);
        }
        catch (OperationCanceledException)
        { /*We're Done*/ }
        return set;
    }
}

Guiding Test

All that we need to expose is the simple diametrical interfaces; Sink & Source. With these and our requirements in mind, we can write a test and let that drive and affirm our implementation.

[Fact]
public async Task Source_BroadcastsTo_AllConsumers()
{
    var provider = new ServiceCollection()
        .AddMsgChannel<Message>()
        .AddTransient<Originator>()
        .AddTransient<Consumer>()
        .BuildServiceProvider();

    var originator1 = provider.GetRequiredService<Originator>();
    var originator2 = provider.GetRequiredService<Originator>();
    var consumer1 = provider.GetRequiredService<Consumer>();
    var consumer2 = provider.GetRequiredService<Consumer>();
    originator1.Should().NotBe(originator2);
    consumer1.Should().NotBe(consumer2);        

    var count = 100;
    using var cts = new CancellationTokenSource();
    var consume1 = consumer1.CollectAllAsync(cts.Token);
    var consume2 = consumer2.CollectAllAsync(cts.Token);
    var sending1 = originator1.ProduceManyAsync(count, cts.Token);
    var sending2 = originator2.ProduceManyAsync(count, cts.Token);

    await Task.WhenAll(sending1, sending2);
    cts.Cancel();
    var set1 = await consume1;
    var set2 = await consume2;

    set1.Should().HaveCount(count * 2);
    set2.Should().HaveCount(count * 2);
    set1.Should().BeEquivalentTo(set2);
}

Our test sets up the Dependency Registration, then creates two of each Producer and Consumers. We assert that each instance is unique and then kick off our action. We trigger the Consumers to start listening and trigger the Producers to start sending a preset count of Messages. Once our Producers complete, we trigger the CancellationToken to stop our Consumers. After awaiting the consumption Tasks we can assert that each Consumer has received all Messages, double the original count, since we have two Producers. Finally, we assert that both sets of Messages are equal, indicating that we did multicast from each Prodcer to every Consumer.

To beign our implementation we look to the humble Envelope. This will be the container and carrier of our Payload and serve as the conduit for various utilities.

The Envelope

The first piece we need to extend is the Envelope. The Envelope will be the item that allows us to communicate an Ack/Nack back to the Originator.

public abstract record Envelope
{
    public virtual string EnvelopeId => $"{GetHashCode()}";
}

This provides our base and gives us an overridable EnvelopeId defaulting to the hash code of the record itself.

public sealed record Envelope<TPayload>(
    TPayload Payload,
    TaskCompletionSource TaskSource,
    Task Execution)
    : Envelope
    where TPayload : notnull
{
    private TaskCompletionSource TaskSource { get; } = TaskSource;
    private Task Execution { get; } = Execution;

    public Exception? Failure { get; private set; }

    public void Complete()
        => TaskSource.TrySetResult();

    public void Fail(Exception exception)
    {
        TaskSource.TrySetException(exception);
        Failure = exception;
    }

    public TaskAwaiter GetAwaiter()
        => Execution.GetAwaiter();
}

The first derivation Envelope carries a Payload, that is, any object of type TPayload. It exposes two methods to Complete: ACK, or Fail: NACK. Next, it privately holds a TaskCompletionSource that is used to manage the completion of the Envelope. Finally, a private Task, whose Awaiter is exposed, this allows the Envelope to be awaited by an Originator.

Task & TaskCompletionSource

The execution Task represents the abstract idea of any work necessary to process the Envelope. Note that the work to process the Message is not tied to the Task directly. We're not awaiting any Consumer. Instead, we're maintaining an outstanding future that can be finalized by a Complete or Fail method.

The TaskCompletionSource is used to signal that the Task has finished. Conmplete sets the results as successful. Fail takes an Exception indicating that processing failed. However, the Exception won't be propagated via an await. We don't want the Originator, who may be awaiting the Envelope, to pop an Exception off the Task. Instead, we use the pattern of a nullable Exception to expose failure and encourage null propagation.

Packaging an Envelope

To pack this all up we first stuff in the Payload and create our TaskCompletionSource. We'll wrap this together with a timeout and async callbacks for onComplete and onFailure.

public static Envelope<TPayload> ToEnvelope<TPayload>(
    this TPayload payload,
    TimeSpan timeout,
    Func<Task> onCompleted,
    Func<Task> onFailure)
    where TPayload : notnull
    => new TaskCompletionSource()
        .CreateEnvelope(payload, timeout, onCompleted, onFailure);

Next, our TaskCompletionSource is used to form an ExecutionMontior. The job of the monitor is to maintain the timeout and execute either onCompleted or onFailure

private static Envelope<TPayload> CreateEnvelope<TPayload>(
    this TaskCompletionSource taskSource,
    TPayload payload,
    TimeSpan timeout,
    Func<Task> onCompleted,
    Func<Task> onFailure)
    where TPayload : notnull
    => new Envelope<TPayload>(
        Payload: payload,
        TaskSource: taskSource,
        Execution: taskSource.CreateExecutionMonitor(timeout, onCompleted, onFailure));

private static Task CreateExecutionMonitor(this TaskCompletionSource source,
    TimeSpan timeout,
    Func<Task> onCompleted,
    Func<Task> onFailure)
{
    return AsyncExecutionMonitor(source.Task, timeout, onCompleted, onFailure);

    async Task AsyncExecutionMonitor(
        Task completion,
        TimeSpan timeout,
        Func<Task> onCompleted,
        Func<Task> onFailure)
    {
        if (await completion.TryWaitAsync(timeout))
            await onCompleted();
        else
            await onFailure();
    }
}

public static async Task<bool> TryWaitAsync(this Task task, TimeSpan timeout)
{
    await Task.WhenAny(task, Task.Delay(timeout));
    return task.IsCompletedSuccessfully;
}

The async monitor applies a global timeout to the future we're maintaining for the processing work. We cannot Cancel the work when the timeout has expired. This is a direct consequence of not having direct control, coupling, to the work being performed. The best we can do in this position is to Fail on our side and move on.

Within TryWaitAsync we can see how the TrySetException from within the Envelope is rerouted into a bool consequence. It simply leverages WhenAny and only considers if the Task was successful. We know that any exception captured by the call to Fail(ex) sets the failure on the Envelope itself.

Completion

Now with Envelope<TPayload> in hand.

Think for a moment; How, within our broadcast framework, do we determine if a Message is Complete? We can't simply say the Originating message is Complete when only one Consumer finishes. And what happens if 1 out of 20 Consumers fail? A reasonable default would be to wait for all Consumers and assume failure if any single Message fails. This follows the pattern of at-most-once delivery, although, we could extend to dedicated retrying of failures and more complex strategies. Now, recall we have a dynamic Consumer Set that we don't have direct knowledge of, so how do we manage the broadcast?

The BroadcastBlock we used for our original implementation in Part 1 doesn't expose it's Consumers in any way. We don't see how many there are, nor even if all Consumers have been offered the Envelope. We'll need more control for this and that's where we'll leverage Channels

ChannelSource

To control our broadcast we need knowledge and control over "Subscribers"/Consumers of our Source. The first step is to create our own variant of the TPL Dataflow Link. This is a structure representing the connection of a Source and Sink.

public record struct SinkLink(Guid LinkId) : IDisposable
{
    private Action<Guid>? unlink = default;
    private bool disposed = false;

    public SinkLink(Guid linkId, Action<Guid> unlink)
        : this(linkId)
        => this.unlink = unlink;

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (disposing && !disposed)
            unlink?.Invoke(LinkId);
        disposed = true;
        unlink = null;
    }
}

Our SinkLink is identifiable by its LinkId and it holds a reference that allows it to Unlink from the source via its LinkId.

Next, we'll capture the concept of many outgoing Messages in the form of an Outbox. This allows us to maintain record of all Messages broadcast and when they have Completed

public interface IOutbox<TPayload> where TPayload : notnull
{
    void Complete();
    Envelope<TPayload> GetEnvelope();
}

This Outbox is intended to be created, then Messages submitted and finally Completed.

internal sealed class EnvelopeOutbox<TPayload> : IOutbox<TPayload>
    where TPayload : notnull
{
    private readonly Envelope<TPayload> originator;
    private readonly Action<Guid> destructor;
    private readonly ConcurrentDictionary<Guid, Envelope<TPayload>> pool = new();
    private readonly ConcurrentBag<Exception> failures = new();

    private volatile int leaseCount = 0;
    private volatile bool complete = false;

    public EnvelopeOutbox(
        Envelope<TPayload> originator,
        Action<Guid> destructor)
    {
        this.originator = originator;
        this.destructor = destructor;
    }

    public void Complete()
        => complete = true;

    public Envelope<TPayload> GetEnvelope()
    {
        Interlocked.Increment(ref leaseCount);

        var id = Guid.NewGuid();
        var envelope = originator.Payload.ToEnvelope(
            TimeSpan.FromSeconds(30),
            () => EnvelopeDestructorAsync(id),
            () => EnvelopeDestructorAsync(id));

        pool[id] = envelope;
        return envelope;
    }

    private async Task EnvelopeDestructorAsync(Guid id)
    {
        Interlocked.Decrement(ref leaseCount);

        var envelope = pool[id];
        failures.MaybeAdd(envelope?.Failure);

        if (!ReadyForClosure) return;

        await CloseOutPool(id);
    }

    private async Task CloseOutPool(Guid id)
    {
        await Task.WhenAll(pool.Select(async e => await e.Value));

        if (failures.Any())
            originator.Fail(new AggregateException(failures));
        else
            originator.Complete();

        destructor(id);
    }

    private bool ReadyForClosure
        => leaseCount <= 0 && complete;
}

From this implementation we can see several things:

  1. Creation is dependent on an Originating Message and a Destructor to handle the finalization of the Outbox
  2. Each time a Message is requested we increment the internal leaseCount
  3. The outgoing Messages are connected such that they are destructed, decrement the leaseCount and give us an opportunity to Close the Outbox via EnvelopeDestructorAsync
  4. Once the Outbox is Complete and the leaseCount reaches zero we aggregate any Exceptions calling Complete or Fail on the Originator's Message
  5. Finally, the Source provided Outbox Destructor can be executed.

All of this allows our Source to broadcast and maintain knowledge of all outgoing Messages. From the previous we can see we need to define two Factory Delegates

delegate IChannelSink<TPayload> SinkFactory<TPayload>()
    where TPayload : notnull;

delegate IOutbox<TPayload> OutboxFactory<TPayload>(
    Envelope<TPayload> originator,
    Action<Guid> destructor)
    where TPayload : notnull;

First, the SinkFactory will be responsible for creating a new ChannelSink<TPayload>. We can register this delegate and inject it if we ever find ourselves dynamically creating new Consumers. But for now, we'll implement it within our Source. Secondly, we have the OutboxFactory that creates an Outbox each time we need to broadcast a Message.

The advantage of having these delegates is that it allows us to place their implementation close to the source of their use. The SinkFactory will be held within the ChannelSource, allowing us to place the new Sink within the collection of targets held by the Source. The OutboxFactory, while not entirely necessary, is a simple abstraction of the Outbox constructor that allows us to create one on demand and mock its implementation for testing.

Diving in

Now let's dive right into the implementation of the Source, we have:

internal sealed class ChannelSource<TPayload>
    : IChannelSource<TPayload>,
    ILinkableSource<TPayload>
    where TPayload : notnull
{
    private readonly OutboxFactory<TPayload> factory;
    private readonly ConcurrentDictionary<Guid, ChannelWriter<Envelope<TPayload>>> channels = new();
    private readonly ConcurrentDictionary<Guid, IOutbox<TPayload>> outboundMsgs = new();

    public ChannelSource(OutboxFactory<TPayload> outboxFactory)
        => this.factory = outboxFactory;

    public async ValueTask<bool> SendAsync(Envelope<TPayload> envelope, CancellationToken cancelToken = default)
    {
        cancelToken.ThrowIfCancellationRequested();
        var outboxId = Guid.NewGuid();
        var outbox = factory(envelope, (id) => outboundMsgs.Remove(id, out _));
        outboundMsgs.TryAdd(outboxId, outbox);
        var writers = channels.Values;

        var writing = writers.Select(async writer =>
        {
            await Task.Yield();
            var outbound = outbox.GetEnvelope();
            return writer.TryWrite(outbound);
        });
        outbox.Complete();
        var results = await Task.WhenAll(writing);

        return results.All(s => s);
    }

    public SinkFactory<TPayload> SinkFactory
        => () =>
        {
            var linkId = Guid.NewGuid();
            var sinkLink = new SinkLink(linkId, id => channels.Remove(id, out _));
            var channel = Channel.CreateUnbounded<Envelope<TPayload>>();
            channels.TryAdd(linkId, channel);
            var channelSink = new ChannelSink<TPayload>(channel, sinkLink);
            return channelSink;
        };
}

Breaking this down we have two concurrent collections, each keyed with a UUID. The first, channels is dedicated to holding all the targets our Source is going to broadcast to. While outboundMsgs will maintain an Outbox for all Messages in flight

private readonly ConcurrentDictionary<Guid, ChannelWriter<Envelope<TPayload>>> channels = new();
private readonly ConcurrentDictionary<Guid, IOutbox<TPayload>> outboundMsgs = new();

Then we have an implementation of the factory delegate SinkFactory<TPayload>. This delegate implantation lives within the Source in order to link the new Sink to the Source via the SinkLink. Additionally, the destructor we pass enables removal of the target from the private set.

public SinkFactory<TPayload> SinkFactory
    => () =>
    {
        var linkId = Guid.NewGuid();
        var sinkLink = new SinkLink(linkId, id => channels.Remove(id, out _));
        var channel = Channel.CreateUnbounded<Envelope<TPayload>>();
        channels.TryAdd(linkId, channel);
        var channelSink = new ChannelSink<TPayload>(channel, sinkLink);
        return channelSink;
    };

And finally, the real work is done within SendAsync(..). First, we check the cancelToken. Then, we iterate through all targets. The syncronuous TryWrite is used since we know these targets are unbound Sinks and we've side stepped any idea of Channel completion. However, there's no need to wait for any single TryWrite so the operation yields to the iteration and we asyncronuously wait for all writes to complete. Finally, we return success or failure simply based on whether all writes were successful or not.

public async ValueTask<bool> SendAsync(
    Envelope<TPayload> envelope, 
    CancellationToken cancelToken = default)
{
    cancelToken.ThrowIfCancellationRequested();
    var outboxId = Guid.NewGuid();
    var outbox = factory(envelope, (id) => outboundMsgs.Remove(id, out _));
    outboundMsgs.TryAdd(outboxId, outbox);
    var writers = channels.Values;

    var writing = writers.Select(async writer =>
    {
        await Task.Yield();
        var outbound = outbox.GetEnvelope();
        return writer.TryWrite(outbound);
    });
    outbox.Complete();
    var results = await Task.WhenAll(writing);

    return results.All(s => s);
}

💥Well that was easy 🤣

ChannelSink

The last core piece of Part 2; the ChannelSink. A much simpler implementation, all we need to do is continuously read from the Channel and handle a standard call to Dispose.

internal sealed class ChannelSink<TPayload>
    : IChannelSink<TPayload>,
    IDisposable
    where TPayload : notnull
{
    private readonly ChannelReader<Envelope<TPayload>> reader;
    private readonly IDisposable sinkLink;
    private bool disposed = false;

    public ChannelSink(
        ChannelReader<Envelope<TPayload>> reader,
        IDisposable sinkLink)
    {
        this.reader = reader;
        this.sinkLink = sinkLink;
    }

    public IAsyncEnumerable<Envelope<TPayload>> ConsumeAsync(CancellationToken cancelToken = default)
        => reader.ReadAllAsync(cancelToken);

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!disposed && disposing)
            sinkLink.Dispose();
        disposed = true;
    }
}

Registration - I didn't forget

Of course we can't forget the full implementation of our IServiceCollection extension

public static IServiceCollection AddMsgChannel<TPayload>(this IServiceCollection services)
    where TPayload : notnull
    => services
        .SetSingleton<OutboxFactory<TPayload>>(_
        => (originator, destructor)
            => new EnvelopeOutbox<TPayload>(originator, destructor))
        .SetSingleton<SinkFactory<TPayload>>(sp =>
        {
            var source = sp.GetRequiredService<IChannelSource<TPayload>>();
            var linkable = source as ILinkableSource<TPayload>
                ?? throw new ArgumentException($"Source {source.GetType().Name} must be linkable", nameof(source));
            return linkable.SinkFactory;
        })
        .SetSingleton<IChannelSource<TPayload>, ChannelSource<TPayload>>()
        .AddTransient<IChannelSink<TPayload>>(sp => sp.GetRequiredService<SinkFactory<TPayload>>().Invoke());

Closing Up

Part 1 saw the most basic broadcast implementation. That got us on the right track splitting Origination and Consumption. Now, while maintaining separation, we've allowed the concept of Completion and Acknowledgement to be introduced. At the end of Part 2 we have succeeded in passing our test!. But more than that, we can now broadcast a message from multiple Producers to multiple Consumers and allow a positive or negative acknowledgement to flow back to the Producer via the Originating Message

Blog - Async Mediator - SinkSource.png

🤯Soo much cooler visually! But seriously with this in place, the next step towards our Async Mediator is asmall jump and w're well on the right path.

 
Share this