Decoupled Process Management
Toward a Message Driven Architecture
Background
Imagine, if you will, that in the course of designing a new api you have the requirement to implement a complex process. The approach presented here will produce a completely message driven architecture that can be implemented and integrated with any messaging infrastructure that you choose. This post will cover one way in which you decouple your complex process into a sequence of phases decoupled from the calling code by a messaging system. This system is similar to a pervious post about a simple actor system. We're also going to extend the concept presented in that post and marry the concepts with ideas presented in the post message routing.
Challenge
The challenge here is to derive a generic Process Management System
. This system should be capable of driving a user defined process and do so in completely decoupled message driven way.
Solution
Infrastructure
First we'll cover the infrastructure of our solution. These components will lay the ground work for our sample.
The Messages
This implementation is going to be completely message driven and independent of the request context. The Process Handlers
will live as BackgroundServices
and always be alive and ready to process messages. For this system to work first we need to define some base Message
records.
public record ProcessStartMessage<TInput>(TInput Input);
public record ProcessStartedMessage(Guid ProcessId);
public record ProcessEndedMessage(Guid ProcessId, ProcessPhase Phase);
public record ProcessActivity;
public record ProcessMessage<TInput>(Guid ProcessId, ProcessPhase Phase, TInput Input);
public record ProcessMessage<TInput, TActivity>(Guid ProcessId, ProcessPhase Phase, TInput Input, TActivity Activity)
: ProcessMessage<TInput>(ProcessId, Phase, Input)
where TActivity : ProcessActivity
where TInput : ProcessInput;
public record ProcessInput;
public record ProcessPhase(string Phase)
{
public static ProcessPhase Completed = new ProcessPhase(nameof(Completed));
public static ProcessPhase Failed = new ProcessPhase(nameof(Failed));
}
Let's go through each message and base type.
ProcessStartMessage
: This message signals theProcessStartHandler
to begin the defined process, this message is used as a type argument constarint and should be the base class of a derived process specific message as we'll see in amoment.ProcessStartedMessage
: This is the message that will be published when the process begins and provides the id assigned to the process, so that calling code can use the process id to query information about the state of the process. In this post we'll leave most of the calling code out and focus on theProcess Management System
ProcessEndedMessage
: This message signals the calling code that the process has ended, it provides the process id and the phase of the process when the process stopped. This incorporation of thePhase
can tell the client whether the process aborted or completed successfully.ProcessMessage<TInput>
: This is base of all process messages, this is what we'll use to constrain ourPhaseTransitionHandler
such that it has access to the properties we need.ProcessMessage<TInput, TActivity>
: This message along with anActivity
type will be used throughout the process lifetime. TheActivity
parameter is associated with theProcessMessage
with a particular activity that's being carried out. This message also carries along with it the full input object, the current phase of process and the id of the process to which it's associated.ProcessActivity
andProcessInput
: are just constraint records to be derived from for anActivity
orInput
respectively.ProcessPhase
: Process Phase defines the high level general state of the process. For example a derivedProcessPhase
could be:Validation
,StageRollback
,StageInsert
. We can derive and define our process in terms of phases. Each phase may generate N messages with N activities. In the base class we define two generalPhases
:Completed
andFailed
these two phases are made common to all derived phase sets. I've chose this style of defining phases instead of something like enumeration because personally I don't enums and this record gives us a better constraint since we cannot derive from a base enum.
A process is a set of messages that are initiated by phase transition. A high level view of a process can be seen in this figure
As you can see each phase emits N messages to be collected by the Process Phase Transition Handler
. This handler then determines if a phase transition is necessary and if so generates the next phase of messages. If the Process Phase Transition Handler
determines the process is Complete
a completed message is emitted and the process is over.
The Message System
Let's now look at the communication stream. This stream abstracts our underlying messaging system into a simple interface with just the methods we need. We could implement this interface with: Channels
, Observables
, Kafka
, Azure Servicebus
, SignalR
, Azure Pub/Sub
or any other messaging system we choose so long as this interface is met.
public interface IMessageSystemReader<TMessage>
{
public ValueTask<bool> MessageReady(CancellationToken token = default);
public IAsyncEnumerable<TMessage> ReadAllAsync(CancellationToken token = default);
public Task Completion { get; }
}
public interface IMessageSystemWriter<TMessage>
{
public ValueTask WriteAsync(TMessage message, CancellationToken token = default);
public Task Shutdown();
}
Using the Interface Segregation
principle we break this interface down into two, one reader and one writer. These interfaces also define a way to shutdown the stream and and notify the client that the stream is now closed.
In this post we'll use channels as our underlying transport.
public class MessageSystem<TMessage>
: IMessageSystemReader<TMessage>,
IMessageSystemWriter<TMessage>
{
private ChannelReader<TMessage> reader;
private ChannelWriter<TMessage> writer;
public MessageSystem()
{
var channel = Channel.CreateUnbounded<TMessage>();
reader = channel.Reader;
writer = channel.Writer;
}
public Task Completion => reader.Completion;
public ValueTask<bool> MessageReady(CancellationToken token = default)
=> reader.WaitToReadAsync(token);
public IAsyncEnumerable<TMessage> ReadAllAsync(CancellationToken token = default)
=> reader.ReadAllAsync(token);
public ValueTask WriteAsync(TMessage message, CancellationToken token = default)
=> writer.WriteAsync(message, token);
public Task Shutdown()
{
writer.Complete();
return reader.Completion;
}
}
Message System Extensions
As you can see the implementation is just a simple wrapper around a pair of Channel
reader/writers.
Next let's add some helper methods to this interface such that all we have to worry about in our Handler
code is the logic necessary to handle our defined process.
public static class MessagingExtensions
{
public static async IAsyncEnumerable<TMessage> ContinuousWaitAndReadAllAsync<TMessage>(
this IMessageSystemReader<TMessage> reader,
[EnumeratorCancellation] CancellationToken token = default)
{
while (!reader.Completion.IsCompleted && !token.IsCancellationRequested)
{
var readerReady = await reader.MessageReady(token);
if (readerReady)
await foreach (var message in reader.ReadAllAsync(token))
yield return message;
}
}
public static async ValueTask RouteMessageByTypeAsync(this IWriterProvider provider, object message)
{
var writer = provider.RequestWriter(message);
await writer.WriteAsync((dynamic)message);
}
public static async ValueTask<bool> SendIfEnding(this ProcessPhase phase, IWriterProvider provider, Func<ProcessEndedMessage> endedMessageFatory)
{
if (phase == ProcessPhase.Completed || phase == ProcessPhase.Failed)
{
await provider.RouteMessageByTypeAsync(endedMessageFatory());
return true;
}
return false;
}
public static bool ContainsActivity<TActivity>(this IEnumerable<dynamic> messages)
where TActivity : ProcessActivity
=> messages.Any(msg => msg.Activity.GetType() == typeof(TActivity));
}
We have one method that wraps the reader waiting and receiving into one easy async enumerator. Then we have the writer method that dispatches a message by it's type. And we have one extension to determine if the process has completed or failed and send out the ended message. And we also have a method to determine what activies exist in the current set of messages. This begs the question; Ok, well what's the WriterProvider
?
The Writer Provider
public class WriterProvider : IWriterProvider
{
private readonly IServiceProvider serviceProvider;
public WriterProvider(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}
public dynamic RequestWriter(object message)
{
var writerType = typeof(IMessageSystemWriter<>).MakeGenericType(message.GetType());
dynamic writer = serviceProvider.GetService(writerType);
if (writer is null)
throw new ArgumentException($"Message Writer not registered for {message.GetType().Name}");
return writer;
}
}
You may ask, why do we not just use a TMessage
type argument instead of message.GetType()
, well this is done because TMessage
could be a base type and we want the Writer
we get to be strongly typed to the actual message that's being sent. Our WriterProvider
simply hides the IServiceProvider
and uses it to retrieve the correct writer. Remember now that the writer type can be implemented with nearly any transport that can at least satisfy the basic interface. So if we change transports all our code doesn't need to change only the implementation of IMessageSystem<TMessage>
needs to change.
Dynamic is chosen for the return type because, remember, we used message.GetType()
for the generic type we got from the IServiceProvider
. We want to preserve that type and not constrain it to a TMessage
which may be a base type.
Process Start
Next let's see how we start a process. This is an abstract class that implements the vast majority of functionality any derived handler would need. The responsibilities of this handler are quite simple. It lives as a BackgorundService
and listens for a particularly defined TStartMessage
. Once a message is received it responds with a defined overridden StartedMessageFactory
result. Then it emits the first Phase
messages down the line to the next part of the system.
public abstract class ProcessStartHandler<TStartMessage, TInput>
: BackgroundService
where TStartMessage : ProcessStartMessage<TInput>
where TInput : ProcessInput
{
private readonly IMessageSystemReader<TStartMessage> startReader;
private readonly IWriterProvider writerProvider;
private readonly IMessageFactory<TInput> messageFactory;
private readonly ProcessPhase startPhase;
public ProcessStartHandler(
IWriterProvider writerProvider,
IMessageFactory<TInput> messageFactory,
IMessageSystemReader<TStartMessage> startReader,
ProcessPhase startPhase)
{
this.startReader = startReader ?? throw new ArgumentNullException(nameof(startReader));
this.writerProvider = writerProvider ?? throw new ArgumentNullException(nameof(writerProvider));
this.messageFactory = messageFactory ?? throw new ArgumentNullException(nameof(messageFactory));
this.startPhase = startPhase ?? throw new ArgumentNullException(nameof(startPhase));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in startReader.ContinuousWaitAndReadAllAsync(stoppingToken))
{
var processId = Guid.NewGuid();
await SendStartedMessage(processId);
await SendFirstPhaseMessages(processId, message);
}
}
private async ValueTask SendStartedMessage(Guid processId)
{
var message = StartedMessageFactory(processId);
await writerProvider.RouteMessageByTypeAsync(message);
}
private async ValueTask SendFirstPhaseMessages(Guid processId, TStartMessage message)
{
await foreach (var newMessage in messageFactory[startPhase](processId, startPhase, message.Input, Enumerable.Empty<object>()))
{
await writerProvider.RouteMessageByTypeAsync(newMessage);
}
}
protected abstract ProcessStartedMessage StartedMessageFactory(Guid processId);
}
With our handy extension methods in use for both reading and writing our code in here is straight to the point: Listen for start, Emit started with ProcessId
& Emit first phase messages. Now you may be asking; What's the message factory and why is it needed?
Well we don't want our handlers to have too much responsibility. The start handler already covers a lot of functionality and it shouldn't be responsible for knowing how to format/create/generate the first phase messages. If it were to be responsible for that we could end up with a huge list of dependencies and even new type parameters to deal with. This would be unsatisfactory, so we delegate to a single purpose object: the message factory.
The Message Factory
public interface IMessageFactory<TInput>
: IReadOnlyDictionary<ProcessPhase, Func<ProcessPhase, ProcessInput, IEnumerable<object>, IAsyncEnumerable<object>>>
where TInput : ProcessInput
{
}
public abstract class MessageFactory<TInput>
: Dictionary<ProcessPhase, Func<Guid, ProcessPhase, TInput, IEnumerable<object>, IAsyncEnumerable<object>>>,
IProcessMessageFactory<TInput>
where TInput : ProcessInput
{
}
As you can see this is a generic interface that simply needs the type of TInput
and the rest is quite simple. Based on a ProcessPhase
key we get a Func
that takes the current phase, the ProcessInput
and a collection that is describing the currently received messages for this process. With these inputs it generates a batch of messages to be sent out. When this interface is implemented each phase of the implementing process must be accounted for.
We also have a handy abstract class for implementations to inherit from such that all they are concerned with is defining what messages come from specific keys.
Phase Transitioning
Next, we're going to look at the next abstract BackgroundService
, this is the only other operating service in this system. This Handler listens for any of the predefined TMessage
, collects them and uses both the MessageFactory
we've seen and the upcoming PhaseTransitions
factory.
public abstract class PhaseTransitionHandler<TMessage, TInput>
: BackgroundService
where TMessage : ProcessMessage<TInput>
where TInput : ProcessInput
{
private readonly IWriterProvider writerProvider;
private readonly IPhaseTransitions<TInput> phaseTransitions;
private readonly IMessageFactory<TInput> messageFactory;
private readonly IMessageSystemReader<TMessage> reader;
private ConcurrentDictionary<Guid, ICollection<object>> MessagesReceived = new ConcurrentDictionary<Guid, ICollection<object>>();
public PhaseTransitionHandler(
IWriterProvider writerProvider,
IPhaseTransitions<TInput> phaseTransitions,
IMessageFactory<TInput> messageFactory,
IMessageSystemReader<TMessage> reader)
{
this.writerProvider = writerProvider ?? throw new ArgumentNullException(nameof(writerProvider));
this.phaseTransitions = phaseTransitions ?? throw new ArgumentNullException(nameof(phaseTransitions));
this.messageFactory = messageFactory ?? throw new ArgumentNullException(nameof(messageFactory));
this.reader = reader ?? throw new ArgumentNullException(nameof(reader));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in reader.ContinuousWaitAndReadAllAsync(stoppingToken))
{
if (MessagesReceived.TryGetValue(message.ProcessId, out var messages))
messages.Add(message);
else
MessagesReceived[message.ProcessId] = new List<object>() { message };
var nextPhase = phaseTransitions[message.Phase](message.Phase, message.Input, MessagesReceived[message.ProcessId]);
if (await nextPhase.SendIfEnding(writerProvider, () => EndedMessageFactory(message.ProcessId, nextPhase)))
{
MessagesReceived.Remove(message.ProcessId, out var _);
await CompletedEvent(nextPhase, message);
}
else if (ShouldTransition(message.Phase, nextPhase))
{
await PhaseTransitionEvent(nextPhase, message);
await foreach (var newMessage in messageFactory[nextPhase](message.ProcessId, nextPhase, message.Input, MessagesReceived[message.ProcessId]))
{
await writerProvider.RouteMessageByTypeAsync(newMessage);
}
}
}
}
protected abstract ProcessEndedMessage EndedMessageFactory(Guid processId, ProcessPhase phase);
protected virtual ValueTask CompletedEvent(ProcessPhase phase, TMessage message)
=> ValueTask.CompletedTask;
protected virtual ValueTask PhaseTransitionEvent(ProcessPhase phase, TMessage message)
=> ValueTask.CompletedTask;
private bool ShouldTransition(ProcessPhase currentPhase, ProcessPhase newPhase)
=> !currentPhase.Equals(newPhase);
}
This handler listens for any of defined TMessage
types and collects them by ProcessId
. Each time a message is received and stored the PhaseTranstions
component is used to check whether or not to transition to a new phase. If a Phase transition is required the handler then uses the MessageFactory
to produce and then send the next batch of messages. This handler also provides hooks into the process for both the CompletedEvent
and the PhaseTransitionEvent
such that inheriters can perform any extra logic they might want on those two events.
The Phase Transition Dictionary
Similar to the MessageFactory
our PhaseTransition
dictionary is keyed by ProcessPhase
and produces a function that takes: the current phase, the original input, a collection of returned message and produces a ProcessPhase
if the phase produced is different from the current phase, the PhaseTransitionHandler
responds appropriately by moving on to the next batch of messages.
public interface IPhaseTransitions<TInput>
: IReadOnlyDictionary<ProcessPhase, Func<ProcessPhase, TInput, IEnumerable<object>, ProcessPhase>>
where TInput : ProcessInput
{
}
public abstract class PhaseTransitions<TInput>
: Dictionary<ProcessPhase, Func<ProcessPhase, TInput, IEnumerable<object>, ProcessPhase>>,
IPhaseTransitions<TInput>
where TInput : ProcessInput
{
}
Again we have a handy abstract base class that inherits a Dictionary
so all the implementation has to do is fill itself with the correct Phase Transitions.
The Worker
So now we have a whole system intended to produce messages based on the phase of a process. But we still need something to process those messages and do the actual work those messages entail. Enter the Command Handler
, we could call this an Actor but to fit the terminology used here we'll just refer to it as a Handler. I already posted about Query Actors which take a query and then return a response. Since our process is completely async we won't be using the request/response architecture described in that post. Instead our command handler will process the incoming message or command and send out a response.
public abstract class CommandHandler<TMesssage> : BackgroundService
{
protected readonly IMessageSystemReader<TMesssage> reader;
public CommandHandler(IMessageSystemReader<TMesssage> reader)
{
this.reader = reader ?? throw new ArgumentNullException(nameof(reader));
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var command in reader.ContinuousWaitAndReadAllAsync(stoppingToken))
{
await HandleAsync(command, stoppingToken);
}
}
public abstract ValueTask HandleAsync(TMesssage command, CancellationToken stoppingToken);
}
The Registrations
Finally we need to register all the components with our DI. This is done through a set of extension methods
public static class RegistrationExtensions
{
public static IServiceCollection AddMessageSystem<TMessage>(this IServiceCollection services)
{
var messageSystem = new MessageSystem<TMessage>();
services.AddSingleton<IMessageSystemWriter<TMessage>>(messageSystem);
services.AddSingleton<IMessageSystemReader<TMessage>>(messageSystem);
return services;
}
public static IServiceCollection AddCommandHandler<TActor, TCommand>(this IServiceCollection services)
where TActor : CommandHandler<TCommand>
=> services
.AddHostedService<TActor>()
.AddMessageSystem<TCommand>();
public static IServiceCollection AddWriterProvider(this IServiceCollection services)
=> services.AddSingleton<IWriterProvider, WriterProvider>();
public static IServiceCollection AddProcessStartHandler<THandler, TMessageFactory, TStartMessage, TEndMessage, TInput>(this IServiceCollection services)
where THandler : ProcessStartHandler<TStartMessage, TInput>
where TMessageFactory : MessageFactory<TInput>
where TStartMessage : ProcessStartMessage<TInput>
where TInput : ProcessInput
=> services
.AddHostedService<THandler>()
.AddSingleton<IMessageFactory<TInput>, TMessageFactory>()
.AddMessageSystem<TStartMessage>()
.AddMessageSystem<TEndMessage>();
public static IServiceCollection AddPhaseTransitionHandler<THandler, TPhaseTransitions, TMessage, TInput>(this IServiceCollection services)
where THandler : PhaseTransitionHandler<TMessage, TInput>
where TPhaseTransitions : PhaseTransitions<TInput>
where TMessage : ProcessMessage<TInput>
where TInput : ProcessInput
=> services
.AddHostedService<THandler>()
.AddSingleton<IPhaseTransitions<TInput>, TPhaseTransitions>()
.AddMessageSystem<TMessage>();
}
These extension methods are all we need to register our entire Process Management System
.
The first registers the implementation of our Message System
based on a defined message type parameter.
The next, registers our Command Handler
and it's associated Message System
The next just registers our Writer Provider
.
Then we register the Process Start Handler
along with the Message Factory
and the appropriate input and output channels.
And, finally we register the Phase Transition Handler
, the Phase Transitions Dictionary
and it's input channels.
Sample Demo
Overview
Now that we have all the infrastructure in place it's time to make use of it. For this example we'll be doing a basic "Hello World" type process. We'll define all our messages, phases, and handlers. The goal is to have process that has two steps: Validate the input, Respond with a message.
The Messages
The messages for our sample define the ways in which downstream consumers will respond and how we expect them to act. We define our process as a series of messages.
This sample process will first validate our input with some simple validation, then it will write out the message to consumers. This process extends the ProcessPhase
by defining two additional phases of our process Validation
and SayHello
.
public record SayHelloInput(string Name)
: ProcessInput;
public record SayHelloProcessStartMessage(SayHelloInput Input)
: ProcessStartMessage<SayHelloInput>(Input);
public record SayHelloProcessStartedMessage(Guid ProcessId)
: ProcessStartedMessage(ProcessId);
public record SayHelloEndedMessage(Guid ProcessId, ProcessPhase Phase)
: ProcessEndedMessage(ProcessId, Phase);
public record SayHelloProcessMessage<TActivity>(Guid ProcessId, ProcessPhase Phase, SayHelloInput Input, TActivity Activity)
: ProcessMessage<SayHelloInput, TActivity>(ProcessId, Phase, Input, Activity)
where TActivity : ProcessActivity;
public record SayHelloValidation(SayHelloInput Input)
: ProcessActivity;
public record SayHelloValidationSuccess
: ProcessActivity;
public record SayHelloValidationFailure
: ProcessActivity;
public record SayHelloResponseActivity(string Input)
: ProcessActivity;
public record SayHelloCompletedActivity
: ProcessActivity;
public record SayHelloResponseMessage(string message);
public record SayHelloPhases(string Phase)
: ProcessPhase(Phase)
{
public static ProcessPhase Validation = new ProcessPhase(nameof(Validation));
public static ProcessPhase SayHello = new ProcessPhase(nameof(SayHello));
}
These messages, activities and inputs define our Say Hello
process. We start with the start message that carries the user input into our process stream. We have a derived started message to signal the client that the process has started and a derived ended message to signal that the process has ended in a particular phase. Next we have a derived process message that will be used to carry out our process doing specified activities. We have defined four activities for this process SayHelloValidation
SayHelloValidationSuccess
, SayHelloValidationFailure
and SayHelloRepsonse
. These activities will be coordinated by our PhaseTransition
handler and our eventual message factory we'll see in a moment. And finally we defined the phases of our process as Validation
and SayHello
, remember our base record ProcessPhase
already defines Completed
and Failed
for us so all we need to do is define the working phases of our process.
The Process Handlers
Next we define our derived handlers for our specific process, first the Start Handler
public class SayHelloStartHandler : ProcessStartHandler<SayHelloProcessStartMessage, SayHelloInput>
{
public SayHelloStartHandler(
IWriterProvider writerProvider,
IMessageFactory<SayHelloInput> messageFactory,
IMessageSystemReader<SayHelloProcessStartMessage> startReader)
: base(writerProvider, messageFactory, startReader, SayHelloPhases.Validation)
{
}
protected override ProcessStartedMessage StartedMessageFactory(Guid processId)
=> new SayHelloProcessStartedMessage(processId);
}
Since we don't need any additional functionality, all we do is specify the working types and implement the message factory and constructor and we're done with this!
Then for our concreate Message Factory
public class SayHelloMessageFactory : MessageFactory<SayHelloInput>
{
public SayHelloMessageFactory()
{
this[SayHelloPhases.Validation] = (processId, phase, input, messages) => ValidationPhaseMessages(processId, phase, input, messages);
this[SayHelloPhases.SayHello] = (processId, phase, input, messages) => SayHelloPhaseMessages(processId, phase, input, messages);
}
public IAsyncEnumerable<object> ValidationPhaseMessages(Guid processId, ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
=> new[]
{
new SayHelloProcessMessage<SayHelloValidation>(processId, phase, input, new SayHelloValidation(input))
}.ToAsyncEnumerable();
public IAsyncEnumerable<object> SayHelloPhaseMessages(Guid processId, ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
=> new[]
{
new SayHelloProcessMessage<SayHelloResponseActivity>(processId, phase, input, new SayHelloResponseActivity(input.Name))
}.ToAsyncEnumerable();
}
Since we only have two phases and each phase only produces one message our Message Factory
is quite simple in this case.
Next we define our Phase Transition Handler
public class SayHelloPhaseTransistionsHandler : PhaseTransitionHandler<SayHelloProcessMessage<ProcessActivity>, SayHelloInput>
{
public SayHelloPhaseTransistionsHandler(
IWriterProvider writerProvider,
IPhaseTransitions<SayHelloInput> phaseTransitions,
IMessageFactory<SayHelloInput> messageFactory,
IMessageSystemReader<SayHelloProcessMessage<ProcessActivity>> reader)
: base(writerProvider, phaseTransitions, messageFactory, reader)
{
}
protected override ProcessEndedMessage EndedMessageFactory(Guid processId, ProcessPhase phase)
=> new SayHelloEndedMessage(processId, phase);
}
And again no special behavior is needed so all we're doing is setting type arguments, and implementing the constructor and ended message factory.
Finally, our Phase Transitions
gets defined.
public class SayHelloPhaseTransitions : PhaseTransitions<SayHelloInput>
{
public SayHelloPhaseTransitions()
{
this[SayHelloPhases.Validation] = (phase, input, messages) => ValidationPhaseTransition(phase, input, messages);
this[SayHelloPhases.SayHello] = (phase, input, messages) => SayHelloPhaseTransition(phase, input, messages);
}
public ProcessPhase ValidationPhaseTransition(ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
{
if (currentMessages.ContainsActivity<SayHelloValidationSuccess>())
return SayHelloPhases.SayHello;
else if (currentMessages.ContainsActivity<SayHelloValidationFailure>())
return SayHelloPhases.Failed;
return phase;
}
public ProcessPhase SayHelloPhaseTransition(ProcessPhase phase, SayHelloInput input, IEnumerable<dynamic> currentMessages)
{
if (currentMessages.ContainsActivity<SayHelloCompletedActivity>())
return SayHelloPhases.Completed;
return phase;
}
}
Note how we use the state provided to compute the next phase of the process, if validation fails we move directly to failed, if validation succeeds we continue on to the next phase. Once the SayHelloCompletedActivity
has been emitted we mark the process as Completed
.
The Workers
First we implement our validation handler. The validation we're doing here is to simply check that the string is not too long.
public class SayHelloValidationHandler
: CommandHandler<SayHelloProcessMessage<SayHelloValidation>>
{
private readonly IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> successWriter;
private readonly IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> failureWriter;
public SayHelloValidationHandler(
IMessageSystemReader<SayHelloProcessMessage<SayHelloValidation>> reader,
IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> successWriter,
IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> failureWriter)
: base(reader)
{
this.successWriter = successWriter ?? throw new ArgumentNullException(nameof(successWriter));
this.failureWriter = failureWriter ?? throw new ArgumentNullException(nameof(failureWriter));
}
public override async ValueTask HandleAsync(SayHelloProcessMessage<SayHelloValidation> command, CancellationToken stoppingToken)
{
var input = command.Input.Name;
if (input.Length > 10)
await failureWriter.WriteAsync(
new SayHelloProcessMessage<ProcessActivity>(command.ProcessId, command.Phase, command.Input, new SayHelloValidationFailure()));
else
await successWriter.WriteAsync(
new SayHelloProcessMessage<ProcessActivity>(command.ProcessId, command.Phase, command.Input, new SayHelloValidationSuccess()));
}
}
Notice how we only need to implement the HandleAsync
method and are provided with read command from our Message System
. If validation passes we emit a successful activity, if validation fails we emit a failed activity and the Phase Transition Dictionary
and Handler
will take it from there.
Next we define our Say Hello Handler
public class SayHelloResponseHandler
: CommandHandler<SayHelloProcessMessage<SayHelloResponseActivity>>
{
private readonly IMessageSystemWriter<SayHelloResponseMessage> responseWriter;
private readonly IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> completedWriter;
public SayHelloResponseHandler(
IMessageSystemReader<SayHelloProcessMessage<SayHelloResponseActivity>> reader,
IMessageSystemWriter<SayHelloResponseMessage> responseWriter,
IMessageSystemWriter<SayHelloProcessMessage<ProcessActivity>> completedWriter)
: base(reader)
{
this.responseWriter = responseWriter ?? throw new ArgumentNullException(nameof(responseWriter));
this.completedWriter = completedWriter ?? throw new ArgumentNullException(nameof(completedWriter));
}
public override async ValueTask HandleAsync(SayHelloProcessMessage<SayHelloResponseActivity> command, CancellationToken stoppingToken)
{
var message = $"Hello there, {command.Input.Name}";
await responseWriter.WriteAsync(new SayHelloResponseMessage(message));
await completedWriter.WriteAsync(new SayHelloProcessMessage<ProcessActivity>(command.ProcessId, command.Phase, command.Input, new SayHelloCompletedActivity()));
}
}
Again all we're doing is implementing the HandleAsync
method with our logic which is to simply emit a response message and emit a completed message back to our infrastructure.
The Registrations
Next we register all our process's components with simple extension method.
public static class RegistrationExtensions
{
public static IServiceCollection AddSayHelloProcess(this IServiceCollection services)
=> services
.AddWriterProvider()
.AddProcessStartHandler<SayHelloStartHandler, SayHelloMessageFactory, SayHelloProcessStartMessage, SayHelloEndedMessage, SayHelloInput>()
.AddPhaseTransitionHandler<SayHelloPhaseTransistionsHandler, SayHelloPhaseTransitions, SayHelloProcessMessage<ProcessActivity>, SayHelloInput>()
.AddCommandHandler<SayHelloValidationHandler, SayHelloProcessMessage<SayHelloValidation>>()
.AddCommandHandler<SayHelloResponseHandler, SayHelloProcessMessage<SayHelloResponseActivity>>()
.AddMessageSystem<SayHelloResponseMessage>()
.AddMessageSystem<SayHelloProcessStartedMessage>();
}
Note how we add dedicated Message Systems
for our Started Message
and our ResponseMessage
The Client Code
We'll keep our client code simple and not respond to any failure and have it simply timeout if a successful message is not received, in this post we're not interested in elaborate clients. We're simply proving the orchestration of handlers works.
[ApiController]
[Route("[controller]")]
public class SampleController : ControllerBase
{
private readonly IMessageSystemWriter<SayHelloProcessStartMessage> startWriter;
private readonly IMessageSystemReader<SayHelloResponseMessage> responseReader;
public SampleController(
IMessageSystemWriter<SayHelloProcessStartMessage> startWriter,
IMessageSystemReader<SayHelloResponseMessage> responseReader)
{
this.startWriter = startWriter ?? throw new ArgumentNullException(nameof(startWriter));
this.responseReader = responseReader ?? throw new ArgumentNullException(nameof(responseReader));
}
[HttpGet]
public async ValueTask<SayHelloResponseMessage> Get(string name)
{
var startMessage = new SayHelloProcessStartMessage(new SayHelloInput(name));
await startWriter.WriteAsync(startMessage);
using var readTokenSource = new CancellationTokenSource();
readTokenSource.CancelAfter(TimeSpan.FromSeconds(1));
return await responseReader.ContinuousWaitAndReadAllAsync(readTokenSource.Token).FirstAsync();
}
}
Here we take a writer for the Start Message
and pass our input. We also take a reader for the output and provide a CancellationToken
to timeout the request if anything fails. We also don't distinguish between Response Messages
for individual requests, we could identify them by Process Id
but we'll leave that for next time.
And finally we can try everything out and receive a response!
Summary
Here we've defined a generic infrastructure to orchestrate complex processes based on two handlers, two dictionaries and a set of messages. The beautiful thing about this system is that it can fundamentally be used with any underlying transport. Complex process are broken down into simple phases, messages and transitions.
Well I hoped you enjoyed my latest. As always the full code can be found at ConcurrentFlows.HashNode GitHub