A Simple Hosted Actor System
Turning a Web of Handlers into a Mini Actor System
Overview
Take an AspNet Core Api, for example, we have chosen for this; a pattern of Queries/Commands and Handlers. We're going to explore this pattern and see what we can add to the body of knowledge. When implementing the Command/Query/Handler pattern we often end up with many small classes and specialized handlers. This is great for testing and can lead to excellent code organization and separation of concerns. But when integrating with Controllers we find the many handlers need to be injected and we tend towards the mediator pattern. One great implementation of this is MediatR, or you can go a similar route taken by one of the great maintainers of SimpleInjector, who proposes a simple query dispatcher in the great post Meanwhile... on the query side of my architecture. While that post is dated all the way back to 2011, it's still a fantastic reference for what I refer to as Application Scale CQRS
.
CQRS At Different Levels
The Command Query Responsibility Segregation principles can be applied at various levels of your architecture. At the Systems Level
we see Event Sourcing and a separation of read/write data models. But at the Application Scale
we see a breakdown of classes. From monolithic Services
and Repositories
to single purpose Handlers
. This provides a great separation of concerns and extensibility while keeping code cohesive within a single domain.
The Dispatch Issue
One common problem we find in the Mediator Pattern is the need to dynamically invoke un-instantiated handlers at runtime. Even with the super fast SimpleInjector this can take time especially if we're invoking a complex web of handlers to accomplish a given task. So the question becomes how can we retain the benefits of Application Scale CQRS
while avoiding the runtime costs of instantiation of handlers.
The Actor Pattern
The Actor Pattern represents a shift in thinking. Instead of request scoped handlers I propose a pool of interconnected communicating handlers. While Matt Ferderer
defines the properties of an actor to include:
- Store Data
- Receive messages from other actors
- Pass messages to other actors
- Create additional child actors
I'm going to take a different angle.
Our Goals: A simple hosted actor system
I'm going to redefine an Actor
a little a differently for our api specific use case, an Actor
should:
- Live outside the request stream
- Be able to receive messages asynchronously
- Return a result asynchronously
- Communicate with other actors in the system without instantiating them
- Finally, be decoupled from the calling code
In order to achieve these goals we're going to combine elements of the Mediator Pattern
and the Actor Pattern
. And just for examples sake we're going to focus on the Query side of CQRS, the Command side is much simpler and maybe the topic of a future post.
Our Solution
The Basic Infrastrucuture
First we start with the basic infrastructure that will make up our system. You surely have seen many iterations of IQuery
, IQueryHandler
, etc. However, we're going to change them slightly to fit with our Actor
approach. For starters we define an ActorQuery
as such:
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
public record ActorQuery<TPayload, TAnswer>(TPayload Payload);
}
Note in this record we define the payload that we will submit to the actor system and we define the type of answer we expect back. We intentionally use the term Answer
to distinguish it form a typical Task
Result
. This aligns more so with the concept of; Every query has an answer.
Next we define our base hosted QueryActor
this actor will be resoponsible for reciveing the input payload and emitting the answer to the query
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
public abstract class QueryActor<TQuery>
: BackgroundService
{
protected readonly ChannelReader<KeyValuePair<Guid, TQuery>> queryReader;
protected readonly ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter;
public QueryActor(
ChannelReader<KeyValuePair<Guid, TQuery>> queryReader,
ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter)
{
this.queryReader = queryReader ?? throw new ArgumentNullException(nameof(queryReader));
this.answerWriter = answerWriter ?? throw new ArgumentNullException(nameof(answerWriter));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested && !queryReader.Completion.IsCompleted)
{
var messageReady = await queryReader.WaitToReadAsync(stoppingToken);
if (messageReady)
{
await foreach (var query in queryReader.ReadAllAsync(stoppingToken))
{
await HandleAsync(query, stoppingToken);
}
}
}
}
public abstract Task HandleAsync(KeyValuePair<Guid, TQuery> query, CancellationToken stoppingToken);
}
}
Our base QueryActor
takes a ChannelReader
that accepts the input query keyed with a unique identifier. This unique identifier will be used to connect Answers
with Queries
. We also make use of the dynamic type and an unconstrained query type parameter to allow this base Actor to handle any query and any answer. This approach is shown now in the what connects our Actor System together: the AnswerStream
.
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
public interface IAnswerStream : IHostedService
{
ValueTask<dynamic> SubmitQuery<TQuery>(TQuery query);
}
}
First we've defined a simple interface to interact with our Actor System. This interface allows us to submit a query and await a result from Actor System. Notice also that we e're extending the IHostedService
interface to allow this stream to live outside the request scope and serve it's primary purpose of matching Answers
to Queries
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
public class AnswerStream : BackgroundService, IAnswerStream
{
private readonly IServiceProvider serviceProvider;
private readonly ChannelReader<KeyValuePair<Guid, dynamic>> answerReader;
public AnswerStream(
IServiceProvider serviceProvider,
ChannelReader<KeyValuePair<Guid, dynamic>> answerReader)
{
this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
this.answerReader = answerReader ?? throw new ArgumentNullException(nameof(answerReader));
QueryResults = new ConcurrentDictionary<Guid, TaskCompletionSource<dynamic>>();
}
private ConcurrentDictionary<Guid, TaskCompletionSource<dynamic>> QueryResults { get; }
public async ValueTask<dynamic> SubmitQuery<TQuery>(TQuery query)
{
var writer = serviceProvider.GetRequiredService<ChannelWriter<KeyValuePair<Guid, TQuery>>>();
var queryId = Guid.NewGuid();
var resultSource = new TaskCompletionSource<dynamic>();
QueryResults.TryAdd(queryId, resultSource);
await writer.WriteAsync(new KeyValuePair<Guid, TQuery>(queryId, query));
return await resultSource.Task;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested && !answerReader.Completion.IsCompleted)
{
var resultsReady = await answerReader.WaitToReadAsync(stoppingToken);
if (resultsReady)
await foreach (var result in answerReader.ReadAllAsync(stoppingToken))
if (QueryResults.TryRemove(result.Key, out var resultSource))
resultSource.TrySetResult(result.Value);
}
}
}
}
We see in this implementation the only use of runtime resolution is the instantiation of a singleton ChannelWriter
to input the supplied query into our Actor System. This is a low cost operation since the ChannelWriter
will be held and made available as a singleton in our ServiceProvider
. We see also, that when a query is submitted it is asigned an unquie identifer that will later be used to match it up with its results, it is also associated with a TaskCompletionSource<TAnswer>
and this associated Task
is returned to the caller to await the results from our AnswerStream
. The ExecuteAsync
method continuously waits for results to match up with assigned queries and manages the ConcurrentDictionary
that stores the associated TaskCompletionSources
. And again, we use dynamic for the answer type to allow this AnswerStream
to handle any query and any Answer
. More could be done here to better manage the ConcurrentDictionary
but this is just basic working sample.
The Last piece of infrastructure we need is way to register our actor system with the ServiceProvider
. To accomplish that we create an extension method to add all the necessary bit and pieces to our DI.
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Infrastructure
{
public static class RegistrationExtensions
{
public static IServiceCollection AddActor<TActor, TQuery>(this IServiceCollection services)
where TActor : QueryActor<TQuery>
{
services.AddHostedService<TActor>();
var inputChannel = Channel.CreateUnbounded<KeyValuePair<Guid, TQuery>>();
services.AddSingleton(inputChannel.Writer);
services.AddSingleton(inputChannel.Reader);
return services;
}
public static IServiceCollection AddAnswerStream(this IServiceCollection services)
{
services.AddSingleton<IAnswerStream, AnswerStream>();
services.AddHostedService(sp => sp.GetRequiredService<IAnswerStream>());
var answerChannel = Channel.CreateUnbounded<KeyValuePair<Guid, dynamic>>();
services.AddSingleton(answerChannel.Writer);
services.AddSingleton(answerChannel.Reader);
return services;
}
}
}
This extension registers:
- Our
AnswerStream
- Our input
Channels
- Our answer
Channels
- And of course our
Actor
implementation.
Sample Demo
The Input Side
For demonstration of this system we are going to query for the factorial of a given number. And as an example of utilizing multiple types we'll return an input message. We start with the api queries that is received through our RESTish interface.
namespace ConcurrentFlows.HostedActorSystem.Queries.Api
{
public record GetFactorialApiQuery([FromQuery] int Message);
public record GetMessageApiQuery([FromQuery]string Message);
}
Notice how we're not only making use of the new Record
types here but also Record
model binding in our controller.
namespace ConcurrentFlows.HostedActorSystem.Controllers
{
[ApiController]
[Route("[controller]")]
public class SampleController : ControllerBase
{
private readonly IAnswerStream answerStream;
public SampleController(IAnswerStream answerStream)
=> this.answerStream = answerStream ?? throw new ArgumentNullException(nameof(answerStream));
[HttpGet]
public async Task<IActionResult> GetFactorial([FromQuery] GetFactorialApiQuery query)
{
var actorQuery = new GetFactorialActorQuery(query.Message);
var result = await answerStream.SubmitQuery(actorQuery);
return Ok(result);
}
[HttpGet("message")]
public async Task<IActionResult> ReturnMessage([FromQuery] GetMessageApiQuery query)
{
var actorQuery = new GetMessageActorQuery(query.Message);
var result = await answerStream.SubmitQuery(actorQuery);
return Ok(result);
}
}
}
Our controller takes the input api query, translates it to an ActorQuery
and submits it to our system. It also gets injected with an AnswerStream
that will provide access to the Actor System and the result from the query.
Interconnected Actors
The factorial query in particular is anActorQuery
that is a system of two interconnected actors that are invoked with these two queries.
public record GetFactorialActorQuery(int Payload) : ActorQuery<int, int>(Payload);
public record GetReverseRangeActorQuery(int Payload) : ActorQuery<int, IAsyncEnumerable<int>>(Payload);
These queries invoke two interconnected actors which cooperate to produce a result. The deepest Actor is the ReverseRangeQueryActor
. This Actor has the responsibility of producing a range of int's in reverse order.
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Actors
{
public class GetReverseRangeQueryActor : QueryActor<GetReverseRangeActorQuery>
{
public GetReverseRangeQueryActor(
ChannelReader<KeyValuePair<Guid, GetReverseRangeActorQuery>> queryReader,
ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter)
: base(queryReader, answerWriter)
{
}
public override async Task HandleAsync(KeyValuePair<Guid, GetReverseRangeActorQuery> query, CancellationToken stoppingToken)
{
var range = Enumerable.Range(1, query.Value.Payload).Reverse().ToAsyncEnumerable();
var answer = new KeyValuePair<Guid, dynamic>(query.Key, range);
await answerWriter.WriteAsync(answer);
}
}
}
Notice how this Actor keys the results with the same key provided in the input.
The next Actor up is the FactorialQueryActor
. This Actor takes the input value from the query, the range provided by the ReverseRangeQueryActor
and computes the factorial.
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Actors
{
public class GetFactorialQueryActor : QueryActor<GetFactorialActorQuery>
{
private readonly IAnswerStream actorStream;
public GetFactorialQueryActor(
ChannelReader<KeyValuePair<Guid, GetFactorialActorQuery>> queryReader,
ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter,
IAnswerStream actorStream)
: base(queryReader, answerWriter)
{
this.actorStream = actorStream ?? throw new ArgumentNullException(nameof(actorStream));
}
public override async Task HandleAsync(KeyValuePair<Guid, GetFactorialActorQuery> query, CancellationToken stoppingToken)
{
var rangeQuery = new GetReverseRangeActorQuery(query.Value.Payload - 1);
IAsyncEnumerable<int> result = await actorStream.SubmitQuery(rangeQuery);
var factorial = await result.AggregateAsync(query.Value.Payload, (x, y) => x * y);
var answer = new KeyValuePair<Guid, dynamic>(query.Key, factorial);
await answerWriter.WriteAsync(answer);
}
}
}
Notice how these Actors are connected using the very same AnswerStream
setup that's used to communicate from the controller to the Actor System. The AnswerStream
integrates the Actor System so that all Actors can communicate asynchronously without any knowledge of each other except for the defined Query types, just like our standard Application Level CQRS. This leaves us completely decoupled by the queries.
Our Other Actor
The GetMessageActorQuery
simply returns the input message.
namespace ConcurrentFlows.HostedActorSystem.ActorSystem.Actors
{
public class GetMessageQueryActor : QueryActor<GetMessageActorQuery>
{
public GetMessageQueryActor(
ChannelReader<KeyValuePair<Guid, GetMessageActorQuery>> queryReader,
ChannelWriter<KeyValuePair<Guid, dynamic>> answerWriter)
: base(queryReader, answerWriter)
{
}
public override async Task HandleAsync(KeyValuePair<Guid, GetMessageActorQuery> query, CancellationToken stoppingToken)
{
await answerWriter.WriteAsync(new KeyValuePair<Guid, dynamic>(query.Key, query.Value.Payload));
}
}
}
ConfigureServices: The last piece of the puzzle
The last final piece to show is the simple registration of all our actors.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "ConcurrentFlows.HostedActorSystem", Version = "v1" });
});
services.AddAnswerStream();
services.AddActor<GetFactorialQueryActor, GetFactorialActorQuery>();
services.AddActor<GetReverseRangeQueryActor, GetReverseRangeActorQuery>();
services.AddActor<GetMessageQueryActor, GetMessageActorQuery>();
}
And now with everything in place we can test our simple Hosted Actor System and get a result!
Summary
How cool was that! I don't know about you but that simple answer is pretty satisfying! We've created a simple Hosted Actor System to replace our request scoped Handlers
. Our Actors communicate completely asynchronously and without knowledge of one another through our AnswerStream
. These Actors live outside the request scope along with our AnswerStream
. Always ready and able to process incoming requests in an asynchronous decoupled manner. Kinda beautiful in a way! Now I'm not saying go out and replace your architecture with this style but only to consider this simple Hosted Actor pattern when the problem space warrants it.
You can find all code GitHub ConcurrentFlows.HashNode