Event Sourcing with Microsoft Orleans

After much research, I’ve created the beginnings of an application using Microsoft Orleans. As I’ve resolved a lot of issues in the process, and figured out a couple of neat tricks, I’m going to record the results here. The application is a simple ranked choice voting system for the organization. It will use Azure Active Directory to identify voters, and allow an Election to be organized into Competitions with Candidates.

The solution is structured as follows:

Each of these projects is small and has a purpose. It’s typical of larger solutions to have many solution folders and projects. Let’s discuss each of them in turn.

OlympiaVotes.Domain.Vote

This is the state of our JournaledGrains. It is very similar to the domain model presented in my previous article on Event Sourcing with Apache Pulsar, but we use overloads of Apply instead of named updates, e.g. UpdateTitle. This project must be made of Serializable classes (see Serialization, below).

OlympiaVotes.Orleans.GrainInterfaces

We store the interfaces for our grains here so that we can reference them independently of the application code found in the grains.

OlympiaVotes.Orleans.Grains

This is the bulk of our application code. The implementations of all grain interfaces, as well as the necessary annotations, are here.

OlympiaVotes.Service.Vote

This is a Worker Service that hosts the Orleans silo.

OlympiaVotes.Test.Vote

A test project for the domain objects to ensure that domain objects process events correctly.

OlympiaVotes.Event

A project containing all events to be produced and consumed by the application.

OlympiaVotes

A three-project hosted Blazor WASM application.

I won’t focus much on the domain classes, since there are many good resources on how to do DDD.

Configuring Orleans

Orleans does a lot of work under the hood, but that requires that everything be set up just so before launching the application. There are two applications that use Orleans code: OlympiaVotes.Server and OlympiaVotes.Service.Vote. The silo is hosted in OlympiaVotes.Service.Vote and the Blazor server project is the Orleans client. Let’s look at how each of these are configured:

using Orleans.Serialization;

var host = Host.CreateDefaultBuilder(args)
    .UseOrleans(silo =>
    {
        silo.UseLocalhostClustering()
            .AddMemoryGrainStorage("MemoryStorage")
            .AddLogStorageBasedLogConsistencyProvider()
            .AddMemoryStreams("StreamProvider")
            .AddMemoryGrainStorage("PubSubStore")
            .ConfigureLogging(logging => logging.AddConsole());
        silo.Services.AddSerializer(serializer =>
            serializer.AddNewtonsoftJsonSerializer(
                isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));
    })
    .Build();

host.Run();

The Worker Service is relatively straightforward in that it only hosts the Orleans silo. This is the entire Worker Service displayed above. As this is only a development project, I have not configured the infrastructure, and will post on that later. So, we use the localhost cluster. We add the required storage and log providers, as well as the necessary stores for setting up event streaming.

The client code is setup in OlympiaVotes.Server as follows:

builder.Host.UseOrleansClient(client =>
{
    client.UseLocalhostClustering();
    client.AddMemoryStreams("StreamProvider");
    client.Services.AddSerializer(serializer =>
        serializer.AddNewtonsoftJsonSerializer(
            isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));
});

Once again, we specify localhost clustering, and we add the other end of the streams provider. We use Newtonsoft JSON as it is a more capable serializer/deserializer. We can specify that Orleans use it for both client and silo. I’ll talk a bit more about the choice of Newtonsoft under Serialization, below.

Code Generation

Orleans will need to generate the code that finds your grain classes and instantiates them. To do this, you will need to add the package Microsoft.Orleans.CodeGeneration to your Grains and GrainInterfaces projects. Additionally add the Microsoft.Orleans.Serialization.NewtonsoftJson package as well to allow configuration of Newtonsoft serializers.

Building the project should generate the necessary code for Orleans. Starting up the Worker Service should provide a console log of the Orlean silo startup with no errors.

Serialization

Serialization is by far the trickiest aspect of making the Orleans framework work. All projects containing classes that need to be serialized must reference the Microsoft.Orleans.Serialization package. You can then either annotate your message classes with GenerateSerializerAttribute and annotate each property with IdAttribute, or you can just annotate the class with SerializableAttribute and mark the properties and constructors with JsonPropertyAttribute and JsonConstructorAttribute respectively. I much prefer the latter as it requires fewer attributes and Newtonsoft is a much better serializer in my opinion.

LogConsistencyProvider

The LogConsistencyProvider is used to aggregate the events that lead to the current state.

StorageProvider

The StorageProvider is used to store the current state of the object.

Let’s look at the Election grain object to see how these annotations are used:

[LogConsistencyProvider(ProviderName = "LogStorage")]
[StorageProvider(ProviderName = "MemoryStorage")]
public class ElectionGrain : JournaledGrain<Election>, IElectionGrain
{
    private IAsyncStream<object>? _stream;
    public override Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        _stream = streamProvider.GetStream<object>("Election", "OlympiaVotes");
        return Task.CompletedTask;
    }

    public async Task Create(ElectionId id, Title title, Description? description)
    {
        var @event = new Vote.Election.Created
        {
            Id = this.GetPrimaryKey(),
            Title = title,
            Description = description
        };
        RaiseEvent(@event);
        if (_stream != null) await _stream.OnNextAsync(@event);
        await ConfirmEvents();
    }

    public async Task UpdateTitle(Title title)
    {
        RaiseEvent(new Vote.Election.TitleUpdated
        {
            Id = this.GetPrimaryKey(),
            Title = title
        });
        await ConfirmEvents();
    }

    public async Task UpdateDescription(Description? description)
    {
        RaiseEvent(new Vote.Election.DescriptionUpdated
        {
            Id = this.GetPrimaryKey(),
            Description = description
        });
        await ConfirmEvents();
    }

    public Task<IList<Competition>> GetCompetitions(CompetitionId? competitionId = null)
    {
        var competitions = State.Competitions ?? new List<Competition>();
        if (competitionId != null)
            competitions = competitions.Where(c => c.Id == competitionId).ToList();
        return Task.FromResult(competitions);
    }
}

Note the call to _stream.OnNextAsync. This will cause the object being sent to the stream to appear in a different grain, as follows:

[LogConsistencyProvider(ProviderName = "LogStorage")]
[StorageProvider(ProviderName = "MemoryStorage")]
public class ApplicationGrain : JournaledGrain<Application>, IApplicationGrain
{
    private IAsyncStream<object>? _electionEvents;
    
    public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        _electionEvents = streamProvider.GetStream<object>("Election", "OlympiaVotes");
        await _electionEvents.SubscribeAsync(async events =>
            {
                foreach (var @event in events)
                    RaiseEvent(@event.Item);
                await ConfirmEvents();
            });
        await base.OnActivateAsync(cancellationToken);
    }
    
    public Task<IList<Election>> GetElections(ElectionId? electionId = null)
    {
        var elections =  electionId == null ? State.Elections : State.Elections.Where(x => x.Id == electionId).ToList();
        return Task.FromResult(elections);
    }
}

Some notes about streams:
* Ensure that your producer and consumer both use the same namespace and id.
* Streams are typed. You should type them to the base class of your event/domain classes. You can simply use object if you don’t have a common base class.
* You must subscribe to a stream and provide a message handler
* A simple stream consumer can simply call RaiseEvent with the same argument

This is all the pieces. It will probably take a fair bit of fiddling to get it all connected, but once it is, your productivity will soar. We can now do event sourcing and projection as easily as calling some methods on our grains. All of the log consistency and storage is taken care of for us.

,

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: