Event Streaming in Microsoft Orleans

Setting up streaming in Orleans was quite a headache, so I’ll share my experience and explain why code has to be just so for it to work reliably.

What is event streaming?

Traditional messaging platforms involve broadcast messaging onto a bus, where it is picked up by any number of backend services for processing (usually only one at a time, but fan-outs are also supported). The streaming model involves publish-subscribe between any two actors. For example, in the application I’ve been demonstrating, there are two entities that store different data, Elections and Competitions. Now, when a competition is added, it affects both the Election and the Competition. The Competition itself must be created, but it also must be added to the Competitions collection in the Election.

The way we do this is we have one actor handle the incoming event, and resend it to any additional actors involved in the transaction. So, in this case, we want the CompetitionGrain to handle the initial event, and then publish the relevant events to our parent actor, like so:

public async Task Add(ElectionId electionId, Name name, Description? description)
    {
        var @event = new Vote.Competition.Added
        {
            Id = this.GetPrimaryKey(),
            ElectionId = electionId,
            Name = name,
            Description = description
        };
        RaiseEvent(@event);
        _publishToElection = this.GetStreamProvider("StreamProvider").GetStream<object>("Election", electionId);
        await _publishToElection.OnNextAsync(@event);
        await ConfirmEvents();
    }

Knowing that we can create a stream to any actor within the application space, we create a stream to the parent actor, whose ID we know, as well as its type. In the parent stream, we need to create a subscription to receive the event. Subscription persist for the entire lifetime of the application, so you only need to subscribe once, in OnActivatedAsync. This method is called multiple times through the lifetime of the actor, however, so you need to be sure to only subscribe once:

public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        _publishToApplication = streamProvider.GetStream<object>("Application", "OlympiaVotes");
        _subscribeToElections = streamProvider.GetStream<object>("Election", this.GetPrimaryKey());
        var handles = await _subscribeToElections.GetAllSubscriptionHandles();
        foreach (var handle in handles)
            await handle.ResumeAsync(EventHandler);
        if (handles.Count == 0)
            await _subscribeToElections.SubscribeAsync(EventHandler);
        await base.OnActivateAsync(cancellationToken);
    }

*Do not subscribe every time you activate!* The subscriptions are persisted into Table Storage and exist for the lifetime of both actors. Here, we get the existing subscription handles and only subscribe if there are no existing handles.

We only unsubscribe when the actor is removed from its parent:

public async Task Remove()
    {
        if (_subscribeToElections != null)
        {
            var handles = await _subscribeToElections.GetAllSubscriptionHandles();
            foreach (var handle in handles)
                await handle.UnsubscribeAsync(); 
        }
        var @event = new Vote.Election.Deleted {Id = this.GetPrimaryKey()};
        RaiseEvent(@event);
        if (_publishToApplication != null) await _publishToApplication.OnNextAsync(@event);
    }

There’s no point in receiving events once we’ve orphaned the actor.

Streaming is easy enough to configure. I’ve used Event Hub for event streaming, but you can also use Azure Storage:

var host = Host.CreateDefaultBuilder(args)
    .UseOrleans(silo =>
    {
        #if DEBUG
        silo.UseLocalhostClustering()
            .AddMemoryGrainStorageAsDefault()
            .AddMemoryGrainStorage("PubSubStore")
            .AddMemoryStreams("StreamProvider")
        #else
        var ehConnection =
            new EventHubConnection("xxx", "default");
        silo.UseKubernetesHosting()
            .UseKubeMembership()
            .AddStreaming()
            .AddEventHubStreams("StreamProvider", configurator =>
            {
                configurator.UseAzureTableCheckpointer(options => options.Configure(cp =>
                {
                    cp.ConfigureTableServiceClient(new Uri("xxx"),
                        new DefaultAzureCredential());
                }));
                configurator.ConfigureEventHub(ob =>
                    ob.Configure(eh => eh.ConfigureEventHubConnection(ehConnection, "$Default")));
            })
            .AddAzureTableGrainStorageAsDefault(tables => tables.Configure(options =>
            {
                options.ConfigureTableServiceClient(new Uri("xxx"), new DefaultAzureCredential());
            }))
            .AddRedisGrainStorage("PubSubStore", options => options.Configure(redis =>
            {
                redis.ConnectionString = "xxx";
            }))
            .ConfigureEndpoints(11111, 30000, listenOnAnyHostAddress: true)
        #endif
            .AddLogStorageBasedLogConsistencyProviderAsDefault()
            .ConfigureLogging(logging => logging.AddConsole());
        silo.Services.AddSerializer(serializer =>
            serializer.AddNewtonsoftJsonSerializer(
                isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));
    })
    .Build();

host.Run();

I’ve used Event Hub to actually send the events, and Redis to store the current subscription state. Redis is a good choice here because there is no need for persistence beyond the lifetime of the application. Memory is not an option because there are multiple pods running. In addition to the PubSub store, we also need to store checkpoint data to avoid the need to go all the way back to the beginning of the event stream. This is stored in Azure Tables.

This configuration works, and allows me to restart one or both components of the application without disrupting the subscriptions. If you haven’t got this right, you will find error messages about not finding a current subscriber, and that the silo is dropping the message on the floor. If you run into this, ensure that a) you have subscribed at least once, b) you have subscribed only once, c) you have resumed the subscription on every activation and d) you do not unsubscribe until you are finished with the actor.

Orleans lifetimes are tricky things. The actors are effectively immortal, but can be activated multiple times. Upon deactivation, a grain with state will be persisted and any active subscriptions persisted. If a message is sent to an inactive actor, it will be activated (whereupon you will resume the subscription). This may take some effort to fully understand, but it is the essence of event-driven architecture.

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: