# AsyncMediator Series - Part 2

# Welcome Back! 👋

So glad you made it back for Part 2, in Part 1: [Broadcast Messaging - In Memory](https://concurrentflows.hashnode.dev/broadcast-messaging-in-memory) we created the implementation of something called *Flow*. Which is simply the broadcast of messages across many *Consumers* from many *Originators* leveraging TPL Dataflow internally.

## Towards our AsyncMediator

Our next step is to introduce the concept of *Message Completion*. This means the *Consumer* indicates whether it has `Completed` processing or `Failed` to process the received message. We want to maintain the broadcast capabilities of the *Flow* and that feature will drive us away from our TPL Dataflow implementation.

What we'll gain is the ability to dynamically on board *Consumers* and have a greater fine-grained control of all *Consumers*. Additionally, this will permit the ability to support `Queries` and not limit us to `Commands`

# Building a new flow

First, let's lay out some requirments of our new implemtntation:

* **Must** allow many *Originators*
    
* **Must** allow many *Consumers*
    
* **Must** allow *Sinks* to detach from *Source*
    
* **Must** only *Complete* an `Envelope` when all *Consumers* finish
    

## Our Library Api

We want this new flow to be easy and simple to consume. Ideally we should have a simple registration and well known interfaces to consume. To start with our registration should look something like this, along with a sample *Message*.

```csharp
public static IServiceCollection AddMsgChannel<TPayload>(this IServiceCollection services)
    where TPayload : notnull

public sealed record Message(int Id);
```

We intend to provide consumers of the library a simple interface to interact with. For example, the *Originator* of *Messages* would look something like this:

```csharp
public sealed class Originator
{
    private readonly IChannelSource<Message> source;

    public Originator(IChannelSource<Message> source)
        => this.source = source;

    public async ValueTask ProduceManyAsync(int count, CancellationToken cancelToken)
    {
        var messages = Enumerable.Range(0, count)
            .Select(i => new Message(i).ToEnvelope());
        var sending = messages.Select(async msg => await source.SendAsync(msg));
        await Task.WhenAll(sending);
    }
}
```

And, a simple *Consumer* of *Messages* would look like this:

```csharp
public sealed class Consumer
{
    private readonly IChannelSink<Message> sink;

    public Consumer(IChannelSink<Message> sink) 
        => this.sink = sink;

    public async Task<IEnumerable<Message>> CollectAllAsync(CancellationToken cancelToken)
    {
        var set = new List<Message>();
        try
        {
            
            await foreach (var envelope in sink.ConsumeAsync(cancelToken))
                set.Add(envelope.Payload);
        }
        catch (OperationCanceledException)
        { /*We're Done*/ }
        return set;
    }
}
```

## Guiding Test

All that we need to expose is the simple diametrical interfaces; `Sink` & `Source`. With these and our requirements in mind, we can write a test and let that drive and affirm our implementation.

```csharp
[Fact]
public async Task Source_BroadcastsTo_AllConsumers()
{
    var provider = new ServiceCollection()
        .AddMsgChannel<Message>()
        .AddTransient<Originator>()
        .AddTransient<Consumer>()
        .BuildServiceProvider();

    var originator1 = provider.GetRequiredService<Originator>();
    var originator2 = provider.GetRequiredService<Originator>();
    var consumer1 = provider.GetRequiredService<Consumer>();
    var consumer2 = provider.GetRequiredService<Consumer>();
    originator1.Should().NotBe(originator2);
    consumer1.Should().NotBe(consumer2);        
    
    var count = 100;
    using var cts = new CancellationTokenSource();
    var consume1 = consumer1.CollectAllAsync(cts.Token);
    var consume2 = consumer2.CollectAllAsync(cts.Token);
    var sending1 = originator1.ProduceManyAsync(count, cts.Token);
    var sending2 = originator2.ProduceManyAsync(count, cts.Token);
    
    await Task.WhenAll(sending1, sending2);
    cts.Cancel();
    var set1 = await consume1;
    var set2 = await consume2;

    set1.Should().HaveCount(count * 2);
    set2.Should().HaveCount(count * 2);
    set1.Should().BeEquivalentTo(set2);
}
```

Our test sets up the Dependency Registration, then creates two of each *Producer* and *Consumers*. We assert that each instance is unique and then kick off our action. We trigger the *Consumers* to start listening and trigger the *Producers* to start sending a preset count of *Messages*. Once our *Producers* complete, we trigger the `CancellationToken` to stop our *Consumers*. After awaiting the consumption `Tasks` we can assert that each *Consumer* has received all *Messages*, double the original count, since we have two *Producers*. Finally, we assert that both sets of *Messages* are equal, indicating that we did multicast from each *Prodcer* to every *Consumer*.

To beign our implementation we look to the humble `Envelope`. This will be the container and carrier of our `Payload` and serve as the conduit for various utilities.

# The Envelope

The first piece we need to extend is the `Envelope`. The `Envelope` will be the item that allows us to communicate an Ack/Nack back to the *Originator*.

```csharp
public abstract record Envelope
{
    public virtual string EnvelopeId => $"{GetHashCode()}";
}
```

This provides our base and gives us an overridable `EnvelopeId` defaulting to the hash code of the `record` itself.

```csharp
public sealed record Envelope<TPayload>(
    TPayload Payload,
    TaskCompletionSource TaskSource,
    Task Execution)
    : Envelope
    where TPayload : notnull
{
    private TaskCompletionSource TaskSource { get; } = TaskSource;
    private Task Execution { get; } = Execution;

    public Exception? Failure { get; private set; }

    public void Complete()
        => TaskSource.TrySetResult();

    public void Fail(Exception exception)
    {
        TaskSource.TrySetException(exception);
        Failure = exception;
    }

    public TaskAwaiter GetAwaiter()
        => Execution.GetAwaiter();
}
```

The first derivation `Envelope` carries a *Payload*, that is, any object of type `TPayload`. It exposes two methods to `Complete`: **ACK**, or `Fail`: **NACK**. Next, it privately holds a `TaskCompletionSource` that is used to manage the completion of the `Envelope`. Finally, a private `Task`, whose `Awaiter` is exposed, this allows the `Envelope` to be awaited by an *Originator*.

## Task & TaskCompletionSource

The execution `Task` represents the abstract idea of any work necessary to process the `Envelope`. Note that the work to process the *Message* is not tied to the `Task` directly. We're ***not awaiting any Consumer***. Instead, we're maintaining an outstanding future that can be finalized by a `Complete` or `Fail` method.

The `TaskCompletionSource` is used to signal that the `Task` has finished. `Conmplete` sets the results as successful. `Fail` takes an `Exception` indicating that processing failed. However, the `Exception` won't be propagated via an `await`. We don't want the *Originator*, who may be awaiting the `Envelope`, to pop an `Exception` off the `Task`. Instead, we use the pattern of a nullable `Exception` to expose failure and encourage null propagation.

## Packaging an Envelope

To pack this all up we first stuff in the `Payload` and create our `TaskCompletionSource`. We'll wrap this together with a timeout and async callbacks for `onComplete` and `onFailure`.

```csharp
public static Envelope<TPayload> ToEnvelope<TPayload>(
    this TPayload payload,
    TimeSpan timeout,
    Func<Task> onCompleted,
    Func<Task> onFailure)
    where TPayload : notnull
    => new TaskCompletionSource()
        .CreateEnvelope(payload, timeout, onCompleted, onFailure);
```

Next, our `TaskCompletionSource` is used to form an `ExecutionMontior`. The job of the monitor is to maintain the timeout and execute either `onCompleted` or `onFailure`

```csharp
private static Envelope<TPayload> CreateEnvelope<TPayload>(
    this TaskCompletionSource taskSource,
    TPayload payload,
    TimeSpan timeout,
    Func<Task> onCompleted,
    Func<Task> onFailure)
    where TPayload : notnull
    => new Envelope<TPayload>(
        Payload: payload,
        TaskSource: taskSource,
        Execution: taskSource.CreateExecutionMonitor(timeout, onCompleted, onFailure));

private static Task CreateExecutionMonitor(this TaskCompletionSource source,
    TimeSpan timeout,
    Func<Task> onCompleted,
    Func<Task> onFailure)
{
    return AsyncExecutionMonitor(source.Task, timeout, onCompleted, onFailure);

    async Task AsyncExecutionMonitor(
        Task completion,
        TimeSpan timeout,
        Func<Task> onCompleted,
        Func<Task> onFailure)
    {
        if (await completion.TryWaitAsync(timeout))
            await onCompleted();
        else
            await onFailure();
    }
}

public static async Task<bool> TryWaitAsync(this Task task, TimeSpan timeout)
{
    await Task.WhenAny(task, Task.Delay(timeout));
    return task.IsCompletedSuccessfully;
}
```

The async monitor applies a global timeout to the future we're maintaining for the processing work. We cannot *Cancel* the work when the timeout has expired. This is a direct consequence of not having direct control, coupling, to the work being performed. The best we can do in this position is to *Fail* on our side and move on.

Within `TryWaitAsync` we can see how the `TrySetException` from within the `Envelope` is rerouted into a bool consequence. It simply leverages `WhenAny` and only considers if the `Task` was successful. We know that any exception captured by the call to `Fail(ex)` sets the failure on the `Envelope` itself.

## Completion

Now with `Envelope<TPayload>` in hand.

Think for a moment; How, within our broadcast framework, do we determine if a *Message* is *Complete*? We can't simply say the *Originating* message is *Complete* when only one *Consumer* finishes. And what happens if 1 out of 20 *Consumers* fail? A reasonable default would be to wait for all *Consumers* and assume failure if any single *Message* fails. This follows the pattern of at-most-once delivery, although, we could extend to dedicated retrying of failures and more complex strategies. Now, recall we have a dynamic *Consumer Set* that we don't have direct knowledge of, so how do we manage the broadcast?

The [`BroadcastBlock`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.broadcastblock-1?view=net-6.0) we used for our original implementation in [Part 1](https://concurrentflows.hashnode.dev/broadcast-messaging-in-memory) doesn't expose it's *Consumers* in any way. We don't see how many there are, nor even if all *Consumers* have been offered the `Envelope`. We'll need more control for this and that's where we'll leverage [`Channels`](https://learn.microsoft.com/en-us/dotnet/core/extensions/channels)

# ChannelSource

To control our broadcast we need knowledge and control over "Subscribers"/*Consumers* of our `Source`. The first step is to create our own variant of the TPL Dataflow `Link`. This is a structure representing the connection of a `Source` and `Sink`.

```csharp
public record struct SinkLink(Guid LinkId) : IDisposable
{
    private Action<Guid>? unlink = default;
    private bool disposed = false;

    public SinkLink(Guid linkId, Action<Guid> unlink)
        : this(linkId)
        => this.unlink = unlink;

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (disposing && !disposed)
            unlink?.Invoke(LinkId);
        disposed = true;
        unlink = null;
    }
}
```

Our `SinkLink` is identifiable by its `LinkId` and it holds a reference that allows it to `Unlink` from the source via its `LinkId`.

Next, we'll capture the concept of many outgoing *Messages* in the form of an `Outbox`. This allows us to maintain record of all *Messages* broadcast and when they have *Completed*

```csharp
public interface IOutbox<TPayload> where TPayload : notnull
{
    void Complete();
    Envelope<TPayload> GetEnvelope();
}
```

This `Outbox` is intended to be created, then *Messages* submitted and finally `Completed`.

```csharp
internal sealed class EnvelopeOutbox<TPayload> : IOutbox<TPayload>
    where TPayload : notnull
{
    private readonly Envelope<TPayload> originator;
    private readonly Action<Guid> destructor;
    private readonly ConcurrentDictionary<Guid, Envelope<TPayload>> pool = new();
    private readonly ConcurrentBag<Exception> failures = new();

    private volatile int leaseCount = 0;
    private volatile bool complete = false;

    public EnvelopeOutbox(
        Envelope<TPayload> originator,
        Action<Guid> destructor)
    {
        this.originator = originator;
        this.destructor = destructor;
    }

    public void Complete()
        => complete = true;

    public Envelope<TPayload> GetEnvelope()
    {
        Interlocked.Increment(ref leaseCount);

        var id = Guid.NewGuid();
        var envelope = originator.Payload.ToEnvelope(
            TimeSpan.FromSeconds(30),
            () => EnvelopeDestructorAsync(id),
            () => EnvelopeDestructorAsync(id));

        pool[id] = envelope;
        return envelope;
    }

    private async Task EnvelopeDestructorAsync(Guid id)
    {
        Interlocked.Decrement(ref leaseCount);

        var envelope = pool[id];
        failures.MaybeAdd(envelope?.Failure);

        if (!ReadyForClosure) return;

        await CloseOutPool(id);
    }

    private async Task CloseOutPool(Guid id)
    {
        await Task.WhenAll(pool.Select(async e => await e.Value));

        if (failures.Any())
            originator.Fail(new AggregateException(failures));
        else
            originator.Complete();

        destructor(id);
    }

    private bool ReadyForClosure
        => leaseCount <= 0 && complete;
}
```

From this implementation we can see several things:

1. Creation is dependent on an *Originating Message* and a `Destructor` to handle the finalization of the `Outbox`
    
2. Each time a *Message* is requested we increment the internal `leaseCount`
    
3. The outgoing *Messages* are connected such that they are destructed, decrement the `leaseCount` and give us an opportunity to *Close* the `Outbox` via `EnvelopeDestructorAsync`
    
4. Once the `Outbox` is *Complete* and the `leaseCount` reaches zero we aggregate any `Exceptions` calling `Complete` or `Fail` on the *Originator's Message*
    
5. Finally, the `Source` provided `Outbox Destructor` can be executed.
    

All of this allows our `Source` to broadcast and maintain knowledge of all outgoing *Messages*. From the previous we can see we need to define two *Factory Delegates*

```csharp
delegate IChannelSink<TPayload> SinkFactory<TPayload>()
    where TPayload : notnull;

delegate IOutbox<TPayload> OutboxFactory<TPayload>(
    Envelope<TPayload> originator,
    Action<Guid> destructor)
    where TPayload : notnull;
```

First, the `SinkFactory` will be responsible for creating a new `ChannelSink<TPayload>`. We can register this delegate and inject it if we ever find ourselves dynamically creating new *Consumers*. But for now, we'll implement it within our `Source`. Secondly, we have the `OutboxFactory` that creates an `Outbox` each time we need to broadcast a *Message*.

The advantage of having these delegates is that it allows us to place their implementation close to the source of their use. The `SinkFactory` will be held within the `ChannelSource`, allowing us to place the new `Sink` within the collection of targets held by the `Source`. The `OutboxFactory`, while not entirely necessary, is a simple abstraction of the `Outbox` constructor that allows us to create one on demand and mock its implementation for testing.

## Diving in

Now let's dive right into the implementation of the `Source`, we have:

```csharp
internal sealed class ChannelSource<TPayload>
    : IChannelSource<TPayload>,
    ILinkableSource<TPayload>
    where TPayload : notnull
{
    private readonly OutboxFactory<TPayload> factory;
    private readonly ConcurrentDictionary<Guid, ChannelWriter<Envelope<TPayload>>> channels = new();
    private readonly ConcurrentDictionary<Guid, IOutbox<TPayload>> outboundMsgs = new();

    public ChannelSource(OutboxFactory<TPayload> outboxFactory)
        => this.factory = outboxFactory;

    public async ValueTask<bool> SendAsync(Envelope<TPayload> envelope, CancellationToken cancelToken = default)
    {
        cancelToken.ThrowIfCancellationRequested();
        var outboxId = Guid.NewGuid();
        var outbox = factory(envelope, (id) => outboundMsgs.Remove(id, out _));
        outboundMsgs.TryAdd(outboxId, outbox);
        var writers = channels.Values;

        var writing = writers.Select(async writer =>
        {
            await Task.Yield();
            var outbound = outbox.GetEnvelope();
            return writer.TryWrite(outbound);
        });
        outbox.Complete();
        var results = await Task.WhenAll(writing);

        return results.All(s => s);
    }

    public SinkFactory<TPayload> SinkFactory
        => () =>
        {
            var linkId = Guid.NewGuid();
            var sinkLink = new SinkLink(linkId, id => channels.Remove(id, out _));
            var channel = Channel.CreateUnbounded<Envelope<TPayload>>();
            channels.TryAdd(linkId, channel);
            var channelSink = new ChannelSink<TPayload>(channel, sinkLink);
            return channelSink;
        };
}
```

Breaking this down we have two concurrent collections, each keyed with a UUID. The first, `channels` is dedicated to holding all the targets our `Source` is going to broadcast to. While `outboundMsgs` will maintain an `Outbox` for all *Messages* in flight

```csharp
private readonly ConcurrentDictionary<Guid, ChannelWriter<Envelope<TPayload>>> channels = new();
private readonly ConcurrentDictionary<Guid, IOutbox<TPayload>> outboundMsgs = new();
```

Then we have an implementation of the factory delegate `SinkFactory<TPayload>`. This delegate implantation lives within the `Source` in order to link the new `Sink` to the `Source` via the `SinkLink`. Additionally, the destructor we pass enables removal of the target from the private set.

```csharp
public SinkFactory<TPayload> SinkFactory
    => () =>
    {
        var linkId = Guid.NewGuid();
        var sinkLink = new SinkLink(linkId, id => channels.Remove(id, out _));
        var channel = Channel.CreateUnbounded<Envelope<TPayload>>();
        channels.TryAdd(linkId, channel);
        var channelSink = new ChannelSink<TPayload>(channel, sinkLink);
        return channelSink;
    };
```

And finally, the real work is done within `SendAsync(..)`. First, we check the `cancelToken`. Then, we iterate through all targets. The syncronuous `TryWrite` is used since we know these targets are unbound `Sinks` and we've side stepped any idea of `Channel` completion. However, there's no need to wait for any single `TryWrite` so the operation yields to the iteration and we asyncronuously wait for all writes to complete. Finally, we return success or failure simply based on whether all writes were successful or not.

```csharp
public async ValueTask<bool> SendAsync(
    Envelope<TPayload> envelope, 
    CancellationToken cancelToken = default)
{
    cancelToken.ThrowIfCancellationRequested();
    var outboxId = Guid.NewGuid();
    var outbox = factory(envelope, (id) => outboundMsgs.Remove(id, out _));
    outboundMsgs.TryAdd(outboxId, outbox);
    var writers = channels.Values;

    var writing = writers.Select(async writer =>
    {
        await Task.Yield();
        var outbound = outbox.GetEnvelope();
        return writer.TryWrite(outbound);
    });
    outbox.Complete();
    var results = await Task.WhenAll(writing);

    return results.All(s => s);
}
```

### 💥Well that was easy 🤣

# ChannelSink

The last core piece of Part 2; the `ChannelSink`. A much simpler implementation, all we need to do is continuously read from the `Channel` and handle a standard call to `Dispose`.

```csharp
internal sealed class ChannelSink<TPayload>
    : IChannelSink<TPayload>,
    IDisposable
    where TPayload : notnull
{
    private readonly ChannelReader<Envelope<TPayload>> reader;
    private readonly IDisposable sinkLink;
    private bool disposed = false;

    public ChannelSink(
        ChannelReader<Envelope<TPayload>> reader,
        IDisposable sinkLink)
    {
        this.reader = reader;
        this.sinkLink = sinkLink;
    }

    public IAsyncEnumerable<Envelope<TPayload>> ConsumeAsync(CancellationToken cancelToken = default)
        => reader.ReadAllAsync(cancelToken);

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!disposed && disposing)
            sinkLink.Dispose();
        disposed = true;
    }
}
```

# Registration - I didn't forget

Of course we can't forget the full implementation of our `IServiceCollection` extension

```csharp
public static IServiceCollection AddMsgChannel<TPayload>(this IServiceCollection services)
    where TPayload : notnull
    => services
        .SetSingleton<OutboxFactory<TPayload>>(_
        => (originator, destructor)
            => new EnvelopeOutbox<TPayload>(originator, destructor))
        .SetSingleton<SinkFactory<TPayload>>(sp =>
        {
            var source = sp.GetRequiredService<IChannelSource<TPayload>>();
            var linkable = source as ILinkableSource<TPayload>
                ?? throw new ArgumentException($"Source {source.GetType().Name} must be linkable", nameof(source));
            return linkable.SinkFactory;
        })
        .SetSingleton<IChannelSource<TPayload>, ChannelSource<TPayload>>()
        .AddTransient<IChannelSink<TPayload>>(sp => sp.GetRequiredService<SinkFactory<TPayload>>().Invoke());
```

# Closing Up

[Part 1](https://concurrentflows.hashnode.dev/broadcast-messaging-in-memory) saw the most basic broadcast implementation. That got us on the right track splitting *Origination* and *Consumption*. Now, while maintaining separation, we've allowed the concept of **Completion** and **Acknowledgement** to be introduced. At the end of Part 2 we have succeeded in ***passing our test!***. But more than that, we can now broadcast a message from multiple *Producers* to multiple *Consumers* and allow a positive or negative acknowledgement to flow back to the *Producer* via the *Originating Message*

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1746728350241/81c68156-d072-4ea7-a961-9cc52440b3ba.png align="center")

🤯Soo much cooler visually! But seriously with this in place, the next step towards our Async Mediator is a small jump and we're well on the right path.
