• About

Brian Richardson's Blog

  • Event Streaming in Microsoft Orleans

    March 20th, 2023

    Setting up streaming in Orleans was quite a headache, so I’ll share my experience and explain why code has to be just so for it to work reliably.

    What is event streaming?

    Traditional messaging platforms involve broadcast messaging onto a bus, where it is picked up by any number of backend services for processing (usually only one at a time, but fan-outs are also supported). The streaming model involves publish-subscribe between any two actors. For example, in the application I’ve been demonstrating, there are two entities that store different data, Elections and Competitions. Now, when a competition is added, it affects both the Election and the Competition. The Competition itself must be created, but it also must be added to the Competitions collection in the Election.

    The way we do this is we have one actor handle the incoming event, and resend it to any additional actors involved in the transaction. So, in this case, we want the CompetitionGrain to handle the initial event, and then publish the relevant events to our parent actor, like so:

    public async Task Add(ElectionId electionId, Name name, Description? description)
        {
            var @event = new Vote.Competition.Added
            {
                Id = this.GetPrimaryKey(),
                ElectionId = electionId,
                Name = name,
                Description = description
            };
            RaiseEvent(@event);
            _publishToElection = this.GetStreamProvider("StreamProvider").GetStream<object>("Election", electionId);
            await _publishToElection.OnNextAsync(@event);
            await ConfirmEvents();
        }
    

    Knowing that we can create a stream to any actor within the application space, we create a stream to the parent actor, whose ID we know, as well as its type. In the parent stream, we need to create a subscription to receive the event. Subscription persist for the entire lifetime of the application, so you only need to subscribe once, in OnActivatedAsync. This method is called multiple times through the lifetime of the actor, however, so you need to be sure to only subscribe once:

    public override async Task OnActivateAsync(CancellationToken cancellationToken)
        {
            var streamProvider = this.GetStreamProvider("StreamProvider");
            _publishToApplication = streamProvider.GetStream<object>("Application", "OlympiaVotes");
            _subscribeToElections = streamProvider.GetStream<object>("Election", this.GetPrimaryKey());
            var handles = await _subscribeToElections.GetAllSubscriptionHandles();
            foreach (var handle in handles)
                await handle.ResumeAsync(EventHandler);
            if (handles.Count == 0)
                await _subscribeToElections.SubscribeAsync(EventHandler);
            await base.OnActivateAsync(cancellationToken);
        }
    

    *Do not subscribe every time you activate!* The subscriptions are persisted into Table Storage and exist for the lifetime of both actors. Here, we get the existing subscription handles and only subscribe if there are no existing handles.

    We only unsubscribe when the actor is removed from its parent:

    public async Task Remove()
        {
            if (_subscribeToElections != null)
            {
                var handles = await _subscribeToElections.GetAllSubscriptionHandles();
                foreach (var handle in handles)
                    await handle.UnsubscribeAsync(); 
            }
            var @event = new Vote.Election.Deleted {Id = this.GetPrimaryKey()};
            RaiseEvent(@event);
            if (_publishToApplication != null) await _publishToApplication.OnNextAsync(@event);
        }
    

    There’s no point in receiving events once we’ve orphaned the actor.

    Streaming is easy enough to configure. I’ve used Event Hub for event streaming, but you can also use Azure Storage:

    var host = Host.CreateDefaultBuilder(args)
        .UseOrleans(silo =>
        {
            #if DEBUG
            silo.UseLocalhostClustering()
                .AddMemoryGrainStorageAsDefault()
                .AddMemoryGrainStorage("PubSubStore")
                .AddMemoryStreams("StreamProvider")
            #else
            var ehConnection =
                new EventHubConnection("xxx", "default");
            silo.UseKubernetesHosting()
                .UseKubeMembership()
                .AddStreaming()
                .AddEventHubStreams("StreamProvider", configurator =>
                {
                    configurator.UseAzureTableCheckpointer(options => options.Configure(cp =>
                    {
                        cp.ConfigureTableServiceClient(new Uri("xxx"),
                            new DefaultAzureCredential());
                    }));
                    configurator.ConfigureEventHub(ob =>
                        ob.Configure(eh => eh.ConfigureEventHubConnection(ehConnection, "$Default")));
                })
                .AddAzureTableGrainStorageAsDefault(tables => tables.Configure(options =>
                {
                    options.ConfigureTableServiceClient(new Uri("xxx"), new DefaultAzureCredential());
                }))
                .AddRedisGrainStorage("PubSubStore", options => options.Configure(redis =>
                {
                    redis.ConnectionString = "xxx";
                }))
                .ConfigureEndpoints(11111, 30000, listenOnAnyHostAddress: true)
            #endif
                .AddLogStorageBasedLogConsistencyProviderAsDefault()
                .ConfigureLogging(logging => logging.AddConsole());
            silo.Services.AddSerializer(serializer =>
                serializer.AddNewtonsoftJsonSerializer(
                    isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));
        })
        .Build();
    
    host.Run();
    

    I’ve used Event Hub to actually send the events, and Redis to store the current subscription state. Redis is a good choice here because there is no need for persistence beyond the lifetime of the application. Memory is not an option because there are multiple pods running. In addition to the PubSub store, we also need to store checkpoint data to avoid the need to go all the way back to the beginning of the event stream. This is stored in Azure Tables.

    This configuration works, and allows me to restart one or both components of the application without disrupting the subscriptions. If you haven’t got this right, you will find error messages about not finding a current subscriber, and that the silo is dropping the message on the floor. If you run into this, ensure that a) you have subscribed at least once, b) you have subscribed only once, c) you have resumed the subscription on every activation and d) you do not unsubscribe until you are finished with the actor.

    Orleans lifetimes are tricky things. The actors are effectively immortal, but can be activated multiple times. Upon deactivation, a grain with state will be persisted and any active subscriptions persisted. If a message is sent to an inactive actor, it will be activated (whereupon you will resume the subscription). This may take some effort to fully understand, but it is the essence of event-driven architecture.

    Advertisement
  • Event Sourcing with Microsoft Orleans

    March 16th, 2023

    After much research, I’ve created the beginnings of an application using Microsoft Orleans. As I’ve resolved a lot of issues in the process, and figured out a couple of neat tricks, I’m going to record the results here. The application is a simple ranked choice voting system for the organization. It will use Azure Active Directory to identify voters, and allow an Election to be organized into Competitions with Candidates.

    The solution is structured as follows:

    Each of these projects is small and has a purpose. It’s typical of larger solutions to have many solution folders and projects. Let’s discuss each of them in turn.

    OlympiaVotes.Domain.Vote

    This is the state of our JournaledGrains. It is very similar to the domain model presented in my previous article on Event Sourcing with Apache Pulsar, but we use overloads of Apply instead of named updates, e.g. UpdateTitle. This project must be made of Serializable classes (see Serialization, below).

    OlympiaVotes.Orleans.GrainInterfaces

    We store the interfaces for our grains here so that we can reference them independently of the application code found in the grains.

    OlympiaVotes.Orleans.Grains

    This is the bulk of our application code. The implementations of all grain interfaces, as well as the necessary annotations, are here.

    OlympiaVotes.Service.Vote

    This is a Worker Service that hosts the Orleans silo.

    OlympiaVotes.Test.Vote

    A test project for the domain objects to ensure that domain objects process events correctly.

    OlympiaVotes.Event

    A project containing all events to be produced and consumed by the application.

    OlympiaVotes

    A three-project hosted Blazor WASM application.

    I won’t focus much on the domain classes, since there are many good resources on how to do DDD.

    Configuring Orleans

    Orleans does a lot of work under the hood, but that requires that everything be set up just so before launching the application. There are two applications that use Orleans code: OlympiaVotes.Server and OlympiaVotes.Service.Vote. The silo is hosted in OlympiaVotes.Service.Vote and the Blazor server project is the Orleans client. Let’s look at how each of these are configured:

    using Orleans.Serialization;
    
    var host = Host.CreateDefaultBuilder(args)
        .UseOrleans(silo =>
        {
            silo.UseLocalhostClustering()
                .AddMemoryGrainStorage("MemoryStorage")
                .AddLogStorageBasedLogConsistencyProvider()
                .AddMemoryStreams("StreamProvider")
                .AddMemoryGrainStorage("PubSubStore")
                .ConfigureLogging(logging => logging.AddConsole());
            silo.Services.AddSerializer(serializer =>
                serializer.AddNewtonsoftJsonSerializer(
                    isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));
        })
        .Build();
    
    host.Run();
    

    The Worker Service is relatively straightforward in that it only hosts the Orleans silo. This is the entire Worker Service displayed above. As this is only a development project, I have not configured the infrastructure, and will post on that later. So, we use the localhost cluster. We add the required storage and log providers, as well as the necessary stores for setting up event streaming.

    The client code is setup in OlympiaVotes.Server as follows:

    builder.Host.UseOrleansClient(client =>
    {
        client.UseLocalhostClustering();
        client.AddMemoryStreams("StreamProvider");
        client.Services.AddSerializer(serializer =>
            serializer.AddNewtonsoftJsonSerializer(
                isSupported: type => type.Namespace?.StartsWith("OlympiaVotes") == true));
    });
    

    Once again, we specify localhost clustering, and we add the other end of the streams provider. We use Newtonsoft JSON as it is a more capable serializer/deserializer. We can specify that Orleans use it for both client and silo. I’ll talk a bit more about the choice of Newtonsoft under Serialization, below.

    Code Generation

    Orleans will need to generate the code that finds your grain classes and instantiates them. To do this, you will need to add the package Microsoft.Orleans.CodeGeneration to your Grains and GrainInterfaces projects. Additionally add the Microsoft.Orleans.Serialization.NewtonsoftJson package as well to allow configuration of Newtonsoft serializers.

    Building the project should generate the necessary code for Orleans. Starting up the Worker Service should provide a console log of the Orlean silo startup with no errors.

    Serialization

    Serialization is by far the trickiest aspect of making the Orleans framework work. All projects containing classes that need to be serialized must reference the Microsoft.Orleans.Serialization package. You can then either annotate your message classes with GenerateSerializerAttribute and annotate each property with IdAttribute, or you can just annotate the class with SerializableAttribute and mark the properties and constructors with JsonPropertyAttribute and JsonConstructorAttribute respectively. I much prefer the latter as it requires fewer attributes and Newtonsoft is a much better serializer in my opinion.

    LogConsistencyProvider

    The LogConsistencyProvider is used to aggregate the events that lead to the current state.

    StorageProvider

    The StorageProvider is used to store the current state of the object.

    Let’s look at the Election grain object to see how these annotations are used:

    [LogConsistencyProvider(ProviderName = "LogStorage")]
    [StorageProvider(ProviderName = "MemoryStorage")]
    public class ElectionGrain : JournaledGrain<Election>, IElectionGrain
    {
        private IAsyncStream<object>? _stream;
        public override Task OnActivateAsync(CancellationToken cancellationToken)
        {
            var streamProvider = this.GetStreamProvider("StreamProvider");
            _stream = streamProvider.GetStream<object>("Election", "OlympiaVotes");
            return Task.CompletedTask;
        }
    
        public async Task Create(ElectionId id, Title title, Description? description)
        {
            var @event = new Vote.Election.Created
            {
                Id = this.GetPrimaryKey(),
                Title = title,
                Description = description
            };
            RaiseEvent(@event);
            if (_stream != null) await _stream.OnNextAsync(@event);
            await ConfirmEvents();
        }
    
        public async Task UpdateTitle(Title title)
        {
            RaiseEvent(new Vote.Election.TitleUpdated
            {
                Id = this.GetPrimaryKey(),
                Title = title
            });
            await ConfirmEvents();
        }
    
        public async Task UpdateDescription(Description? description)
        {
            RaiseEvent(new Vote.Election.DescriptionUpdated
            {
                Id = this.GetPrimaryKey(),
                Description = description
            });
            await ConfirmEvents();
        }
    
        public Task<IList<Competition>> GetCompetitions(CompetitionId? competitionId = null)
        {
            var competitions = State.Competitions ?? new List<Competition>();
            if (competitionId != null)
                competitions = competitions.Where(c => c.Id == competitionId).ToList();
            return Task.FromResult(competitions);
        }
    }
    

    Note the call to _stream.OnNextAsync. This will cause the object being sent to the stream to appear in a different grain, as follows:

    [LogConsistencyProvider(ProviderName = "LogStorage")]
    [StorageProvider(ProviderName = "MemoryStorage")]
    public class ApplicationGrain : JournaledGrain<Application>, IApplicationGrain
    {
        private IAsyncStream<object>? _electionEvents;
        
        public override async Task OnActivateAsync(CancellationToken cancellationToken)
        {
            var streamProvider = this.GetStreamProvider("StreamProvider");
            _electionEvents = streamProvider.GetStream<object>("Election", "OlympiaVotes");
            await _electionEvents.SubscribeAsync(async events =>
                {
                    foreach (var @event in events)
                        RaiseEvent(@event.Item);
                    await ConfirmEvents();
                });
            await base.OnActivateAsync(cancellationToken);
        }
        
        public Task<IList<Election>> GetElections(ElectionId? electionId = null)
        {
            var elections =  electionId == null ? State.Elections : State.Elections.Where(x => x.Id == electionId).ToList();
            return Task.FromResult(elections);
        }
    }
    

    Some notes about streams:
    * Ensure that your producer and consumer both use the same namespace and id.
    * Streams are typed. You should type them to the base class of your event/domain classes. You can simply use object if you don’t have a common base class.
    * You must subscribe to a stream and provide a message handler
    * A simple stream consumer can simply call RaiseEvent with the same argument

    This is all the pieces. It will probably take a fair bit of fiddling to get it all connected, but once it is, your productivity will soar. We can now do event sourcing and projection as easily as calling some methods on our grains. All of the log consistency and storage is taken care of for us.

  • Intro to Microsoft Orleans

    March 15th, 2023

    Things are constantly evolving in application architecture. No sooner than I had completed my blog post about Event Sourcing with Apache Pulsar than Microsoft dropped their own Event-Driven Architecture solution, Microsoft Orleans, into my lap. I’m still in the process of migrating my existing code from the framework I’d adapted from Mr. Zimarev’s book, but all of Alexey’s code is gone now, replaced completely by the Orleans framework. (I even replaced the ValueObject implementation with the recommended implementation from Microsoft).

    Surprisingly, very little of the application had to be rewritten. The domain objects remained almost verbatim; they will serve as the GrainState for our JournaledGrains. The worker service disappeared entirely. The web application will now both serve the Blazor client application, and host the Orleans runtime. We will continue to use HTTP client/server semantics for the Blazor application, which allows us to simply replace our web API calls to our custom Pulsar framework with calls to the Orleans framework:

    [HttpPost]
    public async Task<IActionResult> Issued(Event.Vote.Ballot.Issued @event) => HandleEvent(@event)

    to

    [HttpPost]
    public async Task<IActionResult> Issued(Event.Vote.Ballot.Issued @event)
    {
    var grain = _grainFactory.GetGrain(Guid.NewGuid());
    await grain.Issue(ElectionId.FromGuid(@event.ElectionId));
    return Ok(grain.GetPrimaryKey());
    }


    This could probably be abstracted into a one-liner like the previous code as well with a bit of effort, but I’m ok with 3 lines for now. Those 3 lines are quite powerful. With some configuration and attributes, I can replace all of my projection code and the PulsarService with the Orleans framework, and it will handle all the persistence, both the read models and the event log.

    The actual Grain itself isn’t very complicated, and it should be very reminiscent of the original EventFramework code:

    public class BallotGrain : JournaledGrain<Ballot>, IBallotGrain
    {
        public async Task Vote(CompetitionId competitionId, CandidateId candidateId, int? rank)
        {
            RaiseEvent(new Vote.Ballot.Voted
            {
                Id = this.GetPrimaryKey(),
                CompetitionId = competitionId,
                CandidateId = candidateId,
                Rank = rank
            });
            await ConfirmEvents();
        }
    
        public async Task Issue(ElectionId electionId)
        {
            RaiseEvent(new Vote.Ballot.Issued
            {
                Id = this.GetPrimaryKey(),
                ElectionId = electionId,
                IssuedAt = DateTimeOffset.Now
            });
            await ConfirmEvents();
        }
    
        public async Task Cast(ElectionId electionId)
        {
            RaiseEvent(new Vote.Ballot.Cast
            {
                Id = this.GetPrimaryKey(),
                ElectionId = electionId,
                CastAt = DateTimeOffset.Now
            });
            await ConfirmEvents();
        }
    }
    

    Finally, our domain objects now have an overloaded Apply method, which allows us to respond to events:

    public class Ballot
    {
        public BallotId? Id { get; private set; }
        public ElectionId? ElectionId { get; private set; }
        public IssuedAt? IssuedAt { get; private set; }
        public CastAt? CastAt { get; private set; }
        public Votes? Votes { get; private set; } 
        
        public void Apply(Event.Vote.Ballot.Issued @event)
        {
            Id = BallotId.FromGuid(@event.Id);
            ElectionId = ElectionId.FromGuid(@event.ElectionId);
            IssuedAt = IssuedAt.FromDateTimeOffset(@event.IssuedAt);
            Votes = Votes.New();
        }
    
        public void Apply(Event.Vote.Ballot.Voted @event)
        {
            if (CastAt != null)
                throw new Exception("This ballot has already been cast");
            Votes ??= Votes.New();
            var competitionId = CompetitionId.FromGuid(@event.CompetitionId);
            var candidateId = CandidateId.FromGuid(@event.CandidateId);
            Votes.Add(competitionId, candidateId);
            Votes[competitionId][candidateId] = @event.Rank;
        }
    
        public void Apply(Event.Vote.Ballot.Cast @event)
        {
            if (CastAt != null)
                throw new Exception("This ballot has already been cast");
            CastAt = CastAt.FromDateTimeOffset(@event.CastAt);
        }
    }
    

    Looking at them again, it’s hard to call them domain objects any more. Still, the original code from the EventFramework is there under all of the Apply overloads. All of our work from Event Storming and DDD has been salvaged. What we are not doing is maintaining child collections, though with some effort we could do that too. We would just respond to the appropriate events with code that updates a local collection. Provided the collection is serializable, it will be persisted complete with its child objects. I’m not convinced that this does us any good, however.

    This is my initial foray into Microsoft Orleans. My main concern is with how tied to Microsoft technologies this is. While persistence is automagic, it is also tied to Azure Tables and Queues. It is not costly, but it is not immediately replaced with something else, either. I’ve not even scratched the surface here, but I wanted to share my experience of migrating an existing Event Sourced system to the Orleans framework. I look forward to completing and testing this application, then exploring deployment to Kubernetes which is a supported configuration of the Orleans cluster. I’ll post those results later on.

  • Event Sourcing with Apache Pulsar

    February 14th, 2023

    Finally, the long-awaited second post on Apache Pulsar! I’ve spent some time modifying my code to work with StreamNative.io managed Pulsar, and this solution is quite elegant I think. I can’t take full credit for it, though, as the original source material (written in .NET Core 3.1) was written by Alexey Zimarev in his excellent book, [Hands-on Domain-Driven Design with .NET Core](https://www.amazon.com/Hands-Domain-Driven-Design-NET-ebook/dp/B07C5WSR9B). I have modified much of the original code to use .NET 6 features and to use databases other than RavenDB and Event Store. I won’t rewrite the book here, but I’d like to hit on some important parts of the solution.

    Event Sourcing could be viewed as an “extreme” form of DDD, where our aggregates are truly aggregates of the events that built them. At first glance, this seems to be a pointless endeavour. After all, Pulsar doesn’t even have KSQL, so using it as a query source seems like it would have poor performance without all of the features like indexing and partitioning. In a sense, you would be right, but you’re only seeing half the picture.

    In an Event-Sourced system, we can subscribe to the events using a second listener, whose job it is to process those events and “project” the results back to a document database like Mongo, Raven, or Cosmos. This is as good a place as any to start, so let’s start with the code for the SubscriptionManager:

    using EventFramework.EventSourcing;
    using EventFramework.SharedKernel;
    using Microsoft.Extensions.Hosting;
    using Newtonsoft.Json;
    using Pulsar.Client.Api;
    using Pulsar.Client.Common;
    
    namespace EventFramework.Pulsar;
    
    public class SubscriptionManager : BackgroundService
    {
        private readonly string _subscriptionName;
        private readonly PulsarClient _pulsarClient;
        private readonly string _tenant;
        private readonly string _namespace;
        private readonly CancellationTokenSource _cancellationTokenSource;
        private readonly ISubscription[] _subscriptions;
        
        public SubscriptionManager(
            string subscriptionName,
            PulsarClient pulsarClient,
            string tenant,
            string @namespace,
            CancellationTokenSource cancellationTokenSource,
            params ISubscription[] subscriptions)
        {
            _subscriptionName = subscriptionName;
            _pulsarClient = pulsarClient;
            _tenant = tenant;
            _namespace = @namespace;
            _cancellationTokenSource = cancellationTokenSource;
            _subscriptions = subscriptions; 
        }
    
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            foreach (var subscription in _subscriptions)
            {
    
                await Task.Factory.StartNew(async () =>
                {
                    var consumer = await _pulsarClient.NewConsumer(Schema.STRING())
                        .TopicsPattern($"persistent://{_tenant}/{_namespace}/.*")
                        .SubscriptionName($"{_subscriptionName}-{subscription.Name}")
                        .SubscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                        .SubscribeAsync();
                    while (!stoppingToken.IsCancellationRequested)
                    {
                        try
                        {
                            var message = await consumer.ReceiveAsync(_cancellationTokenSource.Token);
                            var clrType = message.Properties["ClrType"];
                            var type = Type.GetType(clrType) ?? throw new Exception($"Couldn't load CLR type '{clrType}'");
                            var json = message.GetValue();
                            var @event = (IId?) JsonConvert.DeserializeObject(json, type) ??
                                         throw new Exception("Couldn't deserialize event");
                            await subscription.Project(@event);
                            await consumer.AcknowledgeAsync(message.MessageId);
                        }
                        catch (Exception)
                        {
                            // TODO: handle exception
                        }
                    }
                }, _cancellationTokenSource.Token);
            }
        }
    }
    

    SubscriptionManager is a BackgroundService that connects to Pulsar and subscribes to all topics in a particular namespace. It should be noted that you must use the F# community Pulsar driver, and not the official Apache one to get this functionality. I have designed the system to have an analogous namespace for each C# namespace. Each aggregate instance will have its own topic, of the form <EntityType>.<EntityId>, e.g., Election.ABDJCHDJ111. As you can see, this code is rather straightforward. It creates the subscription, deserializes incoming JSON messages, and projects them into MongoDB.

    The Projection class is as follows:

    using EventFramework.EventSourcing;
    using EventFramework.SharedKernel;
    using MongoDB.Driver;
    
    namespace EventFramework.MongoDB;
    
    public class Projection : ISubscription
    {
        private readonly IClientSession _session;
        public string Name { get; }
        private readonly Projector _projector;
        
        public Projection(IClientSession session, Projector projector, string? name = null)
        {
            _session = session;
            _projector = projector;
            // you really should provide a name, as the subscription will reset every time the application restarts otherwise
            Name = name ?? Guid.NewGuid().ToString();
        }
        
        public async Task Project(IId id)
        {
            var handler = _projector.Invoke(_session, id);
            if (handler == null) return;
            await handler();
        }
    }
    
    public delegate Func<Task>? Projector(IClientSession session, IId id);
    

    As you can see, this is simply a flyweight object instance in front of a Func<Task> to make it easier to call the Project() method. Where do these Func<Task>s come from? Here’s a simple one from a simple voting application I am working on:

    using EventFramework.MongoDB;
    using EventFramework.SharedKernel;
    using MongoDB.Driver;
    
    namespace Service.Vote.Voter.Projections;
    
    public static class VoterProjection
    {
        private const string DatabaseName = "votes";
        private const string CollectionName = "voters";
    
        public static Func<Task> GetHandler(IClientSession session, IId @event)
        {
            return @event switch
            {
                Event.Vote.Voter.Registered registered => () => HandleRegistered(session, registered),
                Event.Vote.Voter.NameUpdated nameUpdated => () => HandleNameUpdated(session, nameUpdated),
                Event.Vote.Voter.EmailUpdated emailUpdated => () => HandleEmailUpdated(session, emailUpdated),
                _ => () => Task.CompletedTask
            };
        }
    
        static Task HandleRegistered(IClientSession session, Event.Vote.Voter.Registered @event)
        {
            var voters = session.Client.GetDatabase(DatabaseName)
                .GetCollection<ReadModel.Vote.Voter>(CollectionName);
            return voters.UpsertItem(
                @event.Id ?? throw new Exception("Voter Id is required"),
                _ => Task.CompletedTask,
                () => Task.FromResult(new ReadModel.Vote.Voter
                {
                    Id = @event.Id,
                    Name = @event.VoterEmail,
                    Email = @event.VoterEmail
                }));
        }
    
        static Task HandleNameUpdated(IClientSession session, Event.Vote.Voter.NameUpdated @event)
        {
            var voters = session.Client.GetDatabase(DatabaseName)
                .GetCollection<ReadModel.Vote.Voter>(CollectionName);
            return voters.Update(
                @event.Id ?? throw new Exception("Voter Id is required"),
                voter =>
                {
                    voter.Name = @event.Name;
                    return Task.CompletedTask;
                });
        }
    
        static Task HandleEmailUpdated(IClientSession session, Event.Vote.Voter.EmailUpdated @event)
        {
            var voters = session.Client.GetDatabase(DatabaseName)
                .GetCollection<ReadModel.Vote.Voter>(CollectionName);
            return voters.Update(
                @event.Id ?? throw new Exception("Voter Id is required"),
                voter =>
                {
                    voter.Email = @event.Email;
                    return Task.CompletedTask;
                });
        }
    }
    

    Aha! Now we are into something that looks like real work. This class writes data into MongoDB for us to use. This is accomplished by the use of the extension methods UpsertItem(), Update(), and Delete(). Here they are:

    namespace EventFramework.MongoDB;
    
    public static class Extensions
    {
        public static async Task Update<T>(
            this IMongoCollection<T> collection,
            string id,
            Func<T, Task> update) where T : IId, new()
        { 
            var item = await collection.AsQueryable().SingleOrDefaultAsync(x => x.Id == id);
            if (item == null)
                throw new Exception($"Unable to update {typeof(T).Name}: not found");
            await update(item);
            await collection.ReplaceOneAsync(Builders<T>.Filter.Where(x => x.Id == id), item);
        }
    
        public static async Task UpsertItem<T>(
            this IMongoCollection<T> collection,
            string id,
            Func<T, Task> update,
            Func<Task<T>> create) where T : IId, new()
        {
            var item = await collection.AsQueryable().SingleOrDefaultAsync(x => x.Id == id);
            if (item == null)
            {
                item = await create();
                await collection.InsertOneAsync(item);
            }
            else
            {
                await update(item);
                await collection.ReplaceOneAsync(Builders<T>.Filter.Where(x => x.Id == id), item, 
                    new ReplaceOptions { IsUpsert = true });
            }
        }
    
        public static async Task Delete<T>(
            this IMongoCollection<T> collection,
            string id) where T : IId
        {
            await collection.DeleteOneAsync(Builders<T>.Filter.Where(x => x.Id == id));
        }
    }
    

    And there’s the generic CRUD, the other half of the picture. By registering this SubscriptionManager as a HostedService within your application, you can easily keep the “read side” in sync with the “write side”. To be more precise, though, I will call the “read side” the “query store”, and the write site the “aggregate store”. The query store is obviously what is used by the web application to query and render data. The aggregate store provides long-term storage for the events that form our aggregates. Pulsar allows us to set the TTL and maximum size of a given namespace’s persistent storage. By setting both of these to -1, you can make a namespace persist the messages (events) permanently.

    OK, let’s look at the aggregate store in more detail:

    using System.Reflection;
    using System.Text;
    using EventFramework.EventSourcing;
    using EventFramework.SharedKernel;
    using Newtonsoft.Json;
    using Pulsar.Client.Api;
    using Pulsar.Client.Common;
    
    namespace EventFramework.Pulsar;
    
    public class PulsarAggregateStore : IAggregateStore
    {
    private readonly PulsarClient _pulsarClient;
    private readonly string _tenant;
    private readonly string _namespace;
    
    public PulsarAggregateStore(PulsarClient client, string tenant, string @namespace)
    {
        _pulsarClient = client;
        _tenant = tenant;
        _namespace = @namespace;
    }
    
    private string GetTopicName<T>(AggregateId<T> aggregateId) where T : AggregateRoot<T>
    {
        var @namespace = typeof(T).Namespace?.ToLower() ?? "default";
        var entityType = typeof(T).FullName?.ToLower().Replace($"{@namespace}.", "")
                         ?? throw new Exception("Unable to determine entity type");        
        return $"persistent://{_tenant}/{_namespace}/{entityType}.{aggregateId}";
    }
    
    public async Task<bool> Exists<T>(AggregateId<T> aggregateId) where T : AggregateRoot<T>
    {
        try
        {
            var topic = GetTopicName(aggregateId);
            await using var reader = await _pulsarClient.NewReader(Schema.STRING())
                .Topic(topic)
                .StartMessageId(MessageId.Earliest)
                .CreateAsync();
            return await reader.HasMessageAvailableAsync();
        }
        catch (Exception)
        {
            return false;
        }
    }
    
    public async Task Save<T>(T aggregate) where T : AggregateRoot<T>
    {
        var topic = GetTopicName(aggregate.Id);
        await using var producer = await _pulsarClient.NewProducer(Schema.STRING())
            .Topic(topic)
            .ProducerName(Guid.NewGuid().ToString())
            .CreateAsync();
        foreach (var change in aggregate.GetChanges())
        {
            var json = JsonConvert.SerializeObject(change,
                new JsonSerializerSettings {TypeNameHandling = TypeNameHandling.Objects});
            var message = producer.NewMessage(json)
                .WithProperties(new Dictionary<string, string>
                {
                    {
                        "ClrType",
                        change.GetType().AssemblyQualifiedName ?? throw new Exception("Unable to determine ClrType")
                    }
                });
            await producer.SendAsync(message);
        }
        aggregate.ClearChanges();
    }
    
    public async Task<T?> Load<T>(AggregateId<T> aggregateId) where T : AggregateRoot<T>
    {
        var cts = new CancellationTokenSource();
        var ctor = typeof(T).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic,
            new[] {typeof(AggregateId<T>)});
        if (ctor == null)
            throw new Exception("No load constructor found");
        var loaded = (T?) ctor.Invoke(new object?[] {aggregateId}) ??
                     throw new Exception("Failed to construct aggregate");
        var topic = GetTopicName(aggregateId);
        var reader = await _pulsarClient.NewReader(Schema.STRING())
            .Topic(topic)
            .StartMessageId(MessageId.Earliest)
            .CreateAsync();
        var changes = new List<IId>();
        while (await reader.HasMessageAvailableAsync())
        {
            var message = await reader.ReadNextAsync(cts.Token);
            var type = Type.GetType(message.Properties["ClrType"])
                       ?? throw new Exception($"Failed to load CLR type '{message.Properties["ClrType"]}'");
            var change = (IId?) JsonConvert.DeserializeObject(Encoding.UTF8.GetString(message.Data), type);
            if (change == null)
                throw new Exception("Failed to deserialize change"); 
            changes.Add(change);
        }
        loaded.Load(changes);
        return loaded;
    }
    

    If you read this code, you will find it is pretty straight-forward. It provides the basic methods Exists(), Load(), and Store(). We can only load or store a single object at a time, by its primary key. As mentioned earlier, we cannot query the aggregate store as we can using KSQL, but this architecture doesn’t need to. About the only important point about this code is the existence of a “load” constructor. We need to initialize the object instance with some default values prior to loading, and we need to accept an AggregateId of the correct type. Here’s an example of a “load constructor”:

    // Used by PulsarService.Load()
        private Election(AggregateId<Election> id) : base(id)
        {
            Name = Name.FromString("Loading...");
            Description = Description.FromString("Loading...");
            StartDate = StartDate.FromDateTimeOffset(DateTimeOffset.Now);
            EndDate = EndDate.FromDateTimeOffset(DateTimeOffset.Now);
        }
    

    The constructor is located using reflection, so it can be made private. We simply give some sane default values that will not cause the system to crash if loading fails, and we make it easy for both the end-user and the developer to see that loading has failed. Note the use of the Value Object types Name and Description. These value objects are patterns from DDD. Here’s an example of Name:

    using EventFramework.EventSourcing;
    
    namespace Domain.Vote.Election;
    
    public class Name : Value<Name>
    {
        private string Value { get; }
    
        private Name(string? value)
        {
            if (string.IsNullOrWhiteSpace(value))
                throw new ArgumentNullException(nameof(value));
            Value = value;
        }
    
        public static Name FromString(string? value) => new(value);
    
        public static implicit operator string?(Name? name) => name?.Value;
    
        public override string ToString() => Value;
    }
    

    I’m going to include the code for Value<T> as well, because it does a lot of “magical” things that make working with value objects really easy. This one is 100% Alexey 🙂

    namespace EventFramework.EventSourcing;
    
    public class Value<T> where T : Value<T>
    {
        private static readonly Member[] Members = GetMembers().ToArray();
    
        public override bool Equals(object? obj)
        {
            if (obj is null) return false;
            if (ReferenceEquals(this, obj)) return true;
            return obj.GetType() == typeof(T) && Members.All(
                m =>
                {
                    var objValue = m.GetValue(obj);
                    var thisValue = m.GetValue(this);
                    return m.IsNonStringEnumerable
                        ? GetEnumerableValues(objValue).SequenceEqual(GetEnumerableValues(thisValue))
                        : objValue?.Equals(thisValue) ?? thisValue == null;
                });
        }
    
        private static IEnumerable<object?> GetEnumerableValues(object? obj)
        {
            if (obj == null) throw new ArgumentNullException(nameof(obj));
            var enumerator = ((IEnumerable) obj).GetEnumerator();
            while (enumerator.MoveNext()) yield return enumerator.Current;
            throw new ArgumentNullException(nameof(obj));
        }
    
        private static IEnumerable<Member> GetMembers()
        {
            var t = typeof(T);
            const BindingFlags flags = BindingFlags.Instance | BindingFlags.Public;
            while (t != typeof(object))
            {
                if (t == null) continue;
                foreach (var p in t.GetProperties(flags)) yield return new Member(p);
                foreach (var f in t.GetFields(flags)) yield return new Member(f);
                t = t.BaseType;
            }
        }
    
        public override int GetHashCode()
        {
            return CombineHashCodes(Members.Select(m => m.IsNonStringEnumerable
                        ? CombineHashCodes(GetEnumerableValues(m.GetValue(this)))
                        : m.GetValue(this)));
        }
    
        private static int CombineHashCodes(IEnumerable<object?> values)
        {
            return values.Aggregate(17, (current, value) => current * 59 + (value?.GetHashCode() ?? 0));
        }
    
        public static bool operator ==(Value<T>? left, Value<T>? right)
        {
            return Equals(left, right);
        }
    
        public static bool operator !=(Value<T>? left, Value<T>? right)
        {
            return !Equals(left, right);
        }
    
        public override string? ToString()
        {
            if (Members.Length == 1)
            {
                var m = Members[0];
                var value = m.GetValue(this);
                return m.IsNonStringEnumerable
                    ? $"{string.Join("|", GetEnumerableValues(value))}"
                    : value?.ToString();
            }
    
            var values = Members.Select(
                m =>
                {
                    var value = m.GetValue(this);
                    return m.IsNonStringEnumerable
                        ? $"{m.Name}:{string.Join("|", GetEnumerableValues(value))}"
                        : m.Type != typeof(string)
                            ? $"{m.Name}:{value}"
                            : value == null
                                ? $"{m.Name}:null"
                                : $"{m.Name}:\"{value}\"";
                });
            return $"{typeof(T).Name}[{string.Join("|", values)}]";
        }
    }
    
    internal struct Member
    {
        public readonly string Name;
        public readonly Func<object, object?> GetValue;
        public readonly bool IsNonStringEnumerable;
        public readonly Type Type;
    
        public Member(MemberInfo info)
        {
            switch (info)
            {
                case FieldInfo field:
                    Name = field.Name;
                    GetValue = obj => field.GetValue(obj);
                    IsNonStringEnumerable = typeof(IEnumerable).IsAssignableFrom(field.FieldType) &&
                        field.FieldType != typeof(string);
                    Type = field.FieldType;
                    break;
                case PropertyInfo prop:
                    Name = prop.Name;
                    GetValue = obj => prop.GetValue(obj);
                    IsNonStringEnumerable = typeof(IEnumerable).IsAssignableFrom(prop.PropertyType) &&
                        prop.PropertyType != typeof(string);
                    Type = prop.PropertyType;
                    break;
                default:
                    throw new ArgumentException("Member is not a field or property?", info.Name);
            }
        }
    }
    

    There’s a lot of code here, but it’s easy enough to summarize. This code simply makes value objects behave as you’d expect. When used in string context, they behave like strings. They are strongly typed so that you can’t just use any old string, you have to use one that’s gone through the type’s constructor, thus giving another extension point to add business rules. You can cut and paste this code and forget about it.

    Here’s a complete example of a domain class that extends AggregateRoot<T>:

    using EventFramework.EventSourcing;
    using EventFramework.SharedKernel;
    using Domain.Vote.Ballot;
    using Domain.Vote.Competition;
    using Domain.Vote.Voter;
    
    namespace Domain.Vote.Election;
    
    public class Election : AggregateRoot<Election>
    {
        public Name? Name { get; private set; }
        public Description? Description { get; private set; }
        public StartDate? StartDate { get; private set; }
        public EndDate? EndDate { get; private set; }
    
        public IDictionary<Competition.Competition, IList<Candidate.Candidate>> Competitions { get; }
            = new Dictionary<Competition.Competition, IList<Candidate.Candidate>>();
    
        public IList<Ballot.Ballot> Ballots { get; } = new List<Ballot.Ballot>();
        public IList<Voter.Voter> Voters { get; } = new List<Voter.Voter>();
    
        public Election(
            AggregateId<Election> id,
            Name name,
            Description description,
            StartDate startDate,
            EndDate? endDate = null) : base(id)
        {
            Apply(new Event.Vote.Election.Created
            {
                Id = id.ToString(),
                Name = name,
                Description = description,
                StartDate = startDate,
                EndDate = endDate
            });
        }
        
        // Used by PulsarService.Load()
        private Election(AggregateId<Election> id) : base(id)
        {
            Name = Name.FromString("Loading...");
            Description = Description.FromString("Loading...");
            StartDate = StartDate.FromDateTimeOffset(DateTimeOffset.Now);
            EndDate = EndDate.FromDateTimeOffset(DateTimeOffset.Now);
        }
    
        protected override void EnsureValidState()
        {
            if (string.IsNullOrWhiteSpace(Name))
                throw new InvalidElectionStateException("Name is required");
            if (string.IsNullOrWhiteSpace(Description))
                throw new InvalidElectionStateException("Description is required");
            if (StartDate == null)
                throw new InvalidElectionStateException("Start date is required");
        }
    
        public void SetName(Name name)
        {
            Apply(new Event.Vote.Election.NameUpdated
            {
                Id = Id.ToString(),
                Name = name
            });
        }
    
        public void SetDescription(Description description)
        {
            Apply(new Event.Vote.Election.DescriptionUpdated
            {
                Id = Id.ToString(),
                Description = description
            });
        }
    
        public void SetStartDate(StartDate startDate)
        {
            Apply(new Event.Vote.Election.StartDateUpdated
            {
                Id = Id.ToString(),
                StartDate = startDate
            });
        }
    
        public void SetEndDate(EndDate? endDate)
        {
            Apply(new Event.Vote.Election.EndDateUpdated
            {
                Id = Id.ToString(),
                EndDate = endDate
            });
        }
    
        public string RegisterVoter(AggregateId<Voter.Voter> voterId, Voter.Name name, Email email)
        {
            var ballotId = Ulid.NewUlid();
            if (Voters.Any(v => v.Email == email))
                throw new InvalidVoterStateException($"Voter with email {email} has already been registered");
            Apply(new Event.Vote.Voter.Registered
            {
                Id = voterId.ToString(),
                ElectionId = Id.ToString(),
                VoterName = name,
                VoterEmail = email
            });
            Apply(new Event.Vote.Ballot.Issued
            {
                Id = ballotId.ToString(),
                ElectionId = Id.ToString()
            });
            return ballotId.ToString();
        }
    
        public void AddCompetition(AggregateId<Competition.Competition> competitionId, Competition.Name name)
        {
            Apply(new Event.Vote.Competition.Added
            {
                Id = competitionId.ToString(),
                Name = name
            });
        }
    
        public void RemoveCompetition(AggregateId<Competition.Competition> competitionId)
        {
            Apply(new Event.Vote.Competition.Removed
            {
                Id = competitionId.ToString(),
                ElectionId = Id.ToString()
            });
        }
    
        protected override void When(IId? @event)
        {
            AggregateId<Competition.Competition> competitionId;
            AggregateId<Election> electionId;
            switch (@event)
            {
                case Event.Vote.Election.Created ev:
                    Id = ElectionId.FromString(ev.Id);
                    Name = Name.FromString(ev.Name);
                    Description = Description.FromString(ev.Description);
                    StartDate = StartDate.FromDateTimeOffset(ev.StartDate);
                    EndDate = ev.EndDate != null ? EndDate.FromDateTimeOffset(ev.EndDate) : null;
                    break;
                case Event.Vote.Competition.Added ev:
                    competitionId = CompetitionId.FromString(ev.Id);
                    electionId = ElectionId.FromString(ev.ElectionId);
                    var name = Competition.Name.FromString(ev.Name);
                    var competition = new Competition.Competition(electionId, competitionId, name);
                    Competitions.Add(competition, new List<Candidate.Candidate>());
                    break;
                case Event.Vote.Competition.Removed ev:
                    competitionId = CompetitionId.FromString(ev.Id);
                    Competitions.Remove(Competitions.Keys.First(c => c.Id == competitionId));
                    break;
                case Event.Vote.Voter.Registered ev:
                    var voter = new Voter.Voter(
                        VoterId.FromString(ev.Id),
                        Voter.Name.FromString(ev.VoterName),
                        Email.FromString(ev.VoterEmail),
                        Id);
                    Voters.Add(voter);
                    break;
                case Event.Vote.Ballot.Issued ev:
                    electionId = ElectionId.FromString(ev.ElectionId);
                    if (Id == electionId)
                    {
                        var ballot = new Ballot.Ballot(BallotId.FromString(ev.Id), Id);
                        Ballots.Add(ballot);
                    }
    
                    break;
                case Event.Vote.Election.NameUpdated ev:
                    Name = Name.FromString(ev.Name);
                    break;
                case Event.Vote.Election.DescriptionUpdated ev:
                    Description = Description.FromString(ev.Description);
                    break;
                case Event.Vote.Election.StartDateUpdated ev:
                    StartDate = StartDate.FromDateTimeOffset(ev.StartDate);
                    break;
                case Event.Vote.Election.EndDateUpdated ev:
                    EndDate = ev.EndDate == null ? null : EndDate.FromDateTimeOffset(ev.EndDate);
                    break;
            }
        }
    }
    

    Here is your event-sourced aggregate root. It responds to the events it is interested in, and calls the appropriate methods on the domain object itself. It has no dependencies other than the SharedKernel library, and other domain projects.

    Here’s the code for AggregateRoot<T>:

    namespace EventFramework.EventSourcing;
    
    public abstract class AggregateRoot<T> : IInternalEventHandler where T : AggregateRoot<T>
    {
        private readonly List<IId> _changes = new();
    
        public AggregateId<T> Id { get; protected set; }
        public int Version { get; protected set; } = -1;
    
        protected AggregateRoot(AggregateId<T> id)
        {
            Id = id;
        }
    
        protected abstract void EnsureValidState();
        protected abstract void When(IId? @event);
    
        public void Handle(IId @event, bool addChange = false)
        {
            When(@event);
            if (addChange)
                _changes.Add(@event);
        }
    
        protected void Apply(IId @event)
        {
            When(@event);
            EnsureValidState();
            _changes.Add(@event);
        }
    
        public IEnumerable<IId> GetChanges()
        {
            return _changes.AsEnumerable();
        }
    
        public void Load(IEnumerable<IId?> history)
        {
            foreach (var ev in history)
            {
                When(ev);
                Version++;
            }
        }
    
        public void ClearChanges()
        {
            _changes.Clear();
        }
    
        protected static void ApplyToEntity(IInternalEventHandler? entity, IId @event)
        {
            entity?.Handle(@event);
        }
    }
    

    Once again, this is 100% Alexey other than the formatting update to .NET 6. The EventSourcing.EventFramework package was taken almost verbatim other than updates to .NET 6. It provides convenience methods to get unpersisted changes, to load a set of changes from the aggregate store, and to clear the changes in memory.

    And herein lies the elegance. None of the code I have posted is particularly complex (with the exception of the Value Object code), yet the sum is so much greater than the parts. Here’s an example of an ApplicationService<T> that provides instructions to the framework on how to respond to events for a particular aggregate root:

    using EventFramework.EventSourcing;
    using Domain.Vote.Election;
    
    namespace OlympiaVotes.Service.Vote.Election;
    
    public class ElectionService : ApplicationService<Domain.Vote.Election.Election>
    {
        public ElectionService(IAggregateStore store) : base(store)
        {
            CreateWhen<Event.Vote.Election.Created>(
                ev => ElectionId.FromString(ev.Id),
                (ev, id) => Task.FromResult(new Domain.Vote.Election.Election
                (
                    id,
                    Name.FromString(ev.Name),
                    Description.FromString(ev.Description),
                    StartDate.FromDateTimeOffset(ev.StartDate),
                    EndDate.FromDateTimeOffset(ev.EndDate)
                )));
    
            UpdateWhen<Event.Vote.Election.NameUpdated>(
                ev => ElectionId.FromString(ev.Id),
                (election, ev) =>
                {
                    election.SetName(Name.FromString(ev.Name));
                    return Task.CompletedTask;
                });
    
            UpdateWhen<Event.Vote.Election.DescriptionUpdated>(
                ev => ElectionId.FromString(ev.Id),
                (election, ev) =>
                {
                    election.SetDescription(Description.FromString(ev.Description));
                    return Task.CompletedTask;
                });
    
            UpdateWhen<Event.Vote.Election.StartDateUpdated>(
                ev => ElectionId.FromString(ev.Id),
                (election, ev) =>
                {
                    election.SetStartDate(StartDate.FromDateTimeOffset(ev.StartDate));
                    return Task.CompletedTask;
                });
    
            UpdateWhen<Event.Vote.Election.EndDateUpdated>(
                ev => ElectionId.FromString(ev.Id),
                (election, ev) =>
                {
                    election.SetEndDate(EndDate.FromDateTimeOffset(ev.EndDate));
                    return Task.CompletedTask;
                });
        }
    }
    

    I like this declarative style. It’s easy to see what this constructor accomplishes. The ApplicationService<T> is a dependency of PulsarService<T>.

    About the only other major code point that should be looked at to gain a full understanding is the DI container setup:

    using System.Security.Cryptography.X509Certificates;
    using Azure.Identity;
    using Azure.Security.KeyVault.Certificates;
    using Azure.Security.KeyVault.Secrets;
    using EventFramework.EventSourcing;
    using EventFramework.MongoDB;
    using EventFramework.Pulsar;
    using MongoDB.Driver;
    using Newtonsoft.Json.Linq;
    using Domain.Vote.Ballot;
    using Domain.Vote.Candidate;
    using Domain.Vote.Competition;
    using Domain.Vote.Election;
    using Domain.Vote.Voter;
    using Service.Vote.Ballot;
    using Service.Vote.Ballot.Projections;
    using Service.Vote.Candidate;
    using Service.Vote.Candidate.Projections;
    using Service.Vote.Competition;
    using Service.Vote.Competition.Projections;
    using Service.Vote.Election;
    using Service.Vote.Election.Projections;
    using Service.Vote.Voter;
    using Service.Vote.Voter.Projections;
    using Pulsar.Client.Api;
    
    var host = Host.CreateDefaultBuilder(args);
    
    host.ConfigureServices((context, services) =>
    {
        // the root cancellation token source
        var cancellationTokenSource = new CancellationTokenSource();
        services.AddSingleton(cancellationTokenSource);
    
        // secrets and certificates from Azure Key Vault
        // requires configuration Azure.KeyVaultUrl in appsettings.{Environment}.json
        var secrets = new SecretClient(new Uri(context.Configuration.GetSection("Azure")["KeyVaultUrl"]),
            new DefaultAzureCredential());
        services.AddSingleton(secrets);
        var certificates = new CertificateClient(new Uri(context.Configuration.GetSection("Azure")["KeyVaultUrl"]),
            new DefaultAzureCredential());
        services.AddSingleton(certificates);
    
        // fetch the StreamNative OAuth token using client_credentials grant extracted from key file
        // requires the secrets Pulsar-ClientId and Pulsar-Client-Secret in Azure Key Vault
        // find the values for these secrets in the service account key JSON file
        var httpClient = new HttpClient();
        var response = httpClient.PostAsync(new Uri(context.Configuration.GetSection("Pulsar")["TokenUrl"]),
                new FormUrlEncodedContent(new Dictionary<string, string>
                {
                    {"grant_type", "client_credentials"},
                    {"client_id", secrets.GetSecret("Pulsar-ClientId-OlympiaVotes").Value.Value},
                    {"client_secret", secrets.GetSecret("Pulsar-ClientSecret-OlympiaVotes").Value.Value},
                    {"audience", context.Configuration.GetSection("Pulsar")["Audience"]}
                }))
            .GetAwaiter()
            .GetResult();
        var json = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
        var token = JObject.Parse(json)["access_token"]?.Value<string>();
    
        // Pulsar dependencies
        // requires creation of the Pulsar tenant and namespaces in StreamNative console
        var pulsar = new PulsarClientBuilder()
            .ServiceUrl(context.Configuration.GetSection("Pulsar")["ServiceUrl"])
            .Authentication(AuthenticationFactory.Token(token))
            .BuildAsync()
            .GetAwaiter()
            .GetResult();
        services.AddSingleton(pulsar);
        services.AddSingleton<IAggregateStore>(sp =>
            new PulsarAggregateStore(pulsar, "votes", "domain.vote"));
    
        // microservices
        services.AddSingleton<ApplicationService<Ballot>, BallotService>();
        services.AddSingleton<ApplicationService<Candidate>, CandidateService>();
        services.AddSingleton<ApplicationService<Competition>, CompetitionService>();
        services.AddSingleton<ApplicationService<Election>, ElectionService>();
        services.AddSingleton<ApplicationService<Voter>, VoterService>();
        services.AddHostedService(sp => new PulsarService<Ballot>(sp.GetRequiredService<PulsarClient>(),
            sp.GetRequiredService<ApplicationService<Ballot>>(),
            "votes"));
        services.AddHostedService(sp => new PulsarService<Candidate>(sp.GetRequiredService<PulsarClient>(),
            sp.GetRequiredService<ApplicationService<Candidate>>(),
            "votes"));
        services.AddHostedService(sp => new PulsarService<Competition>(sp.GetRequiredService<PulsarClient>(),
            sp.GetRequiredService<ApplicationService<Competition>>(),
            "votes"));
        services.AddHostedService(sp => new PulsarService<Election>(sp.GetRequiredService<PulsarClient>(),
            sp.GetRequiredService<ApplicationService<Election>>(),
            "votes"));
        services.AddHostedService(sp => new PulsarService<Voter>(sp.GetRequiredService<PulsarClient>(),
            sp.GetRequiredService<ApplicationService<Voter>>(),
            "votes"));
    
        // MongoDB dependencies
        // requires the secret MongoDb-Connection-String, found in the Atlas console
        // requires the certificate MongoDb-Client-Credential, downloaded from Atlas console, converted to PFX and uploaded
        // to Azure Key Vault
        var mongoSettings =
            MongoClientSettings.FromConnectionString(secrets.GetSecret("MongoDb-Connection-String").Value.Value);
        mongoSettings.SslSettings = new SslSettings
        {
            ClientCertificates = new List<X509Certificate2>
            {
                certificates.DownloadCertificate("MongoDb-Client-Credential").Value
            }
        };
        var mongoClient = new MongoClient(mongoSettings);
        services.AddSingleton<IMongoClient>(mongoClient);
        services.AddSingleton<IClientSession>(sp => sp.GetRequiredService<IMongoClient>().StartSession());
        services.AddScoped(sp => sp.GetRequiredService<IMongoClient>().StartSession());
    
        // subscription manager
        services.AddHostedService(sp => new SubscriptionManager(
            "votes", sp.GetRequiredService<PulsarClient>(),
            "votes", "domain.vote",
            sp.GetRequiredService<CancellationTokenSource>(),
            new Projection(sp.GetRequiredService<IClientSession>(), BallotProjection.GetHandler),
            new Projection(sp.GetRequiredService<IClientSession>(), CandidateProjection.GetHandler),
            new Projection(sp.GetRequiredService<IClientSession>(), CompetitionProjection.GetHandler),
            new Projection(sp.GetRequiredService<IClientSession>(), ElectionProjection.GetHandler),
            new Projection(sp.GetRequiredService<IClientSession>(), VoterProjection.GetHandler)));
    });
    
    await host.Build().RunAsync();
    

    One last thing I wanted to demonstrate is the use of the WebApiController that Alexey provided:

    using EventFramework.Pulsar;
    using EventFramework.SharedKernel;
    using Microsoft.AspNetCore.Mvc;
    using Newtonsoft.Json;
    using Pulsar.Client.Api;
    
    namespace EventFramework.WebApi;
    
    public abstract class WebApiController : ControllerBase
    {
        private readonly string _topic;
        protected PulsarClient PulsarClient { get; }
        
        protected WebApiController(PulsarClient pulsarClient, PulsarTopic topic)
        {
            _topic = topic;
            PulsarClient = pulsarClient;
        }
        
        protected async Task<IActionResult> HandleEvent(IId @event)
        {
            var producer = await PulsarClient.NewProducer(Schema.STRING())
                .Topic(_topic)
                .CreateAsync();
            var json = JsonConvert.SerializeObject(@event,
                new JsonSerializerSettings {TypeNameHandling = TypeNameHandling.Objects});
            var message = producer.NewMessage(json)
                .WithProperties(new Dictionary<string, string>
                {
                    // TODO: Access token
                    {"ClrType", @event.GetType().AssemblyQualifiedName ?? throw new Exception("Unknown CLR type")}
                });
            await producer.SendAsync(message);
            return Ok();
        }
    }
    

    The WebApiController is a simple controller base that provides the HandleEvent() method, which simply publishes the incoming message to the PulsarService input queue. Here’s a class that extends WebApiController. It’s wonderfully simple:

    using EventFramework.Pulsar;
    using EventFramework.WebApi;
    using Microsoft.AspNetCore.Authorization;
    using Microsoft.AspNetCore.Mvc;
    using Event;
    using Pulsar.Client.Api;
    
    namespace OlympiaVotes.Server.Controllers;
    
    [Authorize, ApiController, Route("api/[controller]")]
    public class ElectionController : WebApiController
    {
        public ElectionController(PulsarClient pulsarClient, PulsarTopic topic) : base(pulsarClient, topic)
        {
        }
    
        [HttpPost]
        public Task<IActionResult> Post([FromBody] Vote.Election.Created @event) => HandleEvent(@event);
        
        [HttpPut("name")]
        public Task<IActionResult> PutName([FromBody] Vote.Election.NameUpdated @event) => HandleEvent(@event);
        
        [HttpPut("description")]
        public Task<IActionResult> PutDescription([FromBody] Vote.Election.DescriptionUpdated @event) => HandleEvent(@event);
        
        [HttpPut("startDate")]
        public Task<IActionResult> PutStartDate([FromBody] Vote.Election.StartDateUpdated @event) => HandleEvent(@event);
        
        [HttpPut("endDate")]
        public Task<IActionResult> PutEndDate([FromBody] Vote.Election.EndDateUpdated @event) => HandleEvent(@event);
    }
    

    Those are all the components of an Event Sourced system I can think of right now. I hope this code helps you on your Pulsar journey and starts you thinking about Event Sourced systems. I believe that it is a robust architecture that provides elegant solutions to many common application concerns such as logging and auditing. The separation of query store and aggregate store allows a high-performance system without sacrificing visibility. If you haven’t read Alexey’s book yet, I think it’s an important primer to understand this content. I have barely done justice to what he has written. If you aren’t a DDD expert, I think that’s the book for you, even if you don’t know C#.

  • cloud-init on Azure VMs

    February 3rd, 2023

    I’ve run into a bit of frustration in Azure when working in multi-domain environments. Our organization uses 4 different TLDs, only one of which is the Active Directory domain. This makes it kind of annoying to configure domain-joined Linux VMs that aren’t in the primary Active Directory domain. This is a little unclear, so let me give an example.

    In Contoso Organization, there are several business units. Among them are business unit A (which uses domain businessA.net) and business unit B (which uses domain businessB.net). But there is a single Active Directory domain, businessA.net. Joining a Linux machine to businessA.net is no problem. But joining the Linux machine to businessB.net is a little more involved when the machine is built in Azure, due to the functioning of cloud-init. Luckily, it’s possible to tell cloud-init to do what we want, but it has to be done _at the time of provisioning_ or it won’t work.

    Under normal circumstances, you’d update your netplan file to include the correct search domain. But in Azure, your netplan is overwritten every boot by cloud-init. So unless cloud-init knows what you want in your netplan file, you’ll just keep wiping out your changes every reboot. You’d also have to set your hostname to the FQDN (i.e. hostname.businessB.net) that you are registering in DNS.

    So, how to accomplish this? First, you will need to set the config data in your Azure VM deployment (under Advanced tab) to a valid cloud-init config file (see https://cloud-init.readthedocs.io). Ensure it starts with #cloud-config or it won’t be processed. The following config file works for businessB.net:

    #cloud-config
    preserve_hostname: false
    fqdn: hostname.businessb.net
    prefer_fqdn_over_hostname: true
    manage_resolv_conf: true
    resolv_conf:
      nameservers:
        - 127.0.0.53
      domain: businessb.net
      searchdomains:
        - businessb.net
    
    

    Some quick notes about this. First, note the use of preserve_hostname:false and prefer_fqdn_over_hostname:true. This will cause cloud-init to use the fqdn as the hostname, and won’t prevent the existing hostname (i.e. the Azure VM name) from being overwritten.

    If only it were so simple! After booting the machine for the first time, you will see that the hostname is indeed hostname.businessb.net, but resolv.conf is still the default resolv.conf created by Azure. It’s like it didn’t read our resolv_conf configuration at all! In fact, this is exactly what has happened. resolv_conf module is not enabled by default in /etc/cloud/cloud.cfg, and must be added to the cloud_config_modules section.

    But even that is not enough. On Ubuntu, the resolv_conf module is not verified, so you also have to tell cloud-init that you’re ok with that (it seems to do what I’d expect…). You can do so by adding a section unverified_modules underneath cloud_final_modules, and add a single entry, resolv_conf (Ensure it’s valid YAML!)

    At this point, the system should be ready to accept the complete configuration from cloud-init, but the initial setup has already run. We will need to tell cloud-init to rerun the initial configuration, by executing the following commands in order:

    # cloud-init clean --logs
    # cloud-init init --local
    # cloud-init init
    # cloud-init modules
    

    WARNING! This will regenerate your system SSH keys, so make sure you’re prepared for that. Do this immediately after you boot for the first time.

    At this point, resolv.conf should have the correct contents:

    nameserver 127.0.0.53
    domain businessb.net
    search businessb.net
    

    It will also contain a comment saying that it won’t do this again. Note that we’re using 127.0.0.53, the systemd-resolved. You can feel free to change this and disable systemd-resolved.

    That’s a bit about cloud-config, and a problem common enough for me to write about.

  • SOS From Canada

    January 28th, 2023

    I promised myself I wouldn’t use my blog for political reasons, so I’m sorry for this article. If you’re just here for my tech musings, feel free to skip this one. But if you have the time, please read. I promise it won’t be too extreme unless you’ve got a weird definition of “extreme”.

    This weekend marks the one year anniversary of the Freedom Convoy protest that provoked the dictator Justin Trudeau into invoking the Emergencies Act, disarming the population and heavily censoring the media. Now, the Canadian media publishes COVID “hit pieces” daily without the ability to comment. Hence, the broken promise. It is simply not enough to “dislike” corrupt media. We must be allowed to speak, and if Trudeau has his way, I will be silent.

    But as long as I have breath and an Internet connection, there will be no silencing me. The tyranny has gone way too far. No more police thugs. No more media censorship and lies. No more neurotic Ottawa elites wagging the dog. The world needs to shame Justin Trudeau for his love of the Chinese “basic dictatorship”. And certain Canadians need to stop bowing to Ottawa elites, regardless of how neurotic and how much gaslighting and emotional blackmail they use. There are a great many Canadians who think the way I do – hardly a “fringe minority” as Trudeau derides us.

    Every day I endure Trudeau’s hatred of my love of Freedom. And every day I see the hatred burning him away. I will endure this, but not without help. If not people, then God. Deride me, dismiss me and ignore me. But you will not silence me.

  • Packing up the Perl – it’s Groovy Time!

    January 28th, 2023

    A little while ago, I had a simple glue script to write. It wasn’t complicated, but it needed to do a number of things: a YAML config file, command line switches, connection to multiple database types, SFTP connectivity and PGP encryption. I don’t often think of myself as old, but after writing the script above in Perl, it was clear to me that my sysadmin scripting needed some updating. I’d dabbled in Powershell, have a solid foundation in bash, but I’ve always felt most comfortable in Perl for sysadmin tasks. I’ve written one or two in C#, but C# isn’t a scripting language, which I think is essential for sysadmin scripting.

    So I put together some requirements for SysAdmin scripting:

    1. Ubiquitous – it can’t be difficult to find or install on any system, regardless of OS or hardware
    2. Human readable – a decent developer should be able to grok sysadmin scripts, even if it isn’t their strength
    3. Versatile – good availability of reliable, maintained packages
    4. Scriptable – easy to integrate into system shell, regardless of OS

    There’s probably some other intangibles, but I think this is a fair list of the features that draw me to Perl when I think of sysadmin and glue scripts.

    Let’s be a little critical of Perl for a moment, and see how well it actually succeeds at the above. It’s ubiquitous enough, easily installed on Linux, MacOS and Windows (WSL). It is arguably human-readable given a good Perl developer, though it can easily be anything but. To be fair, though, I think most languages are like that. It’s most important that the syntactic sugar exist, not that every developer use it. It is definitely quite versatile. I’ve used Perl for everything from simple GUI applications to database scripts to parsers. And it is scriptable. Put #!/usr/bin/perl at the top of your script and chmod +x. Voila, an executable, human-readable file.

    So fine, it does what it’s supposed to do. Still. 20 years later. There’s a reason it’s called the Camel book, though. As described in the book, Camels smell a bit. And Perl is starting to smell. The CPAN archive is full of unmaintained projects by anonymous authors. No one knows it any more. I mentioned Perl to a coworker, and got a quizzical look in return. It’s time to move on to a more supported and maintained ecosystem.

    By a happy coincidence, I am studying Groovy as I need to write some load testing scripts for JMeter. And as I go through the learning material, it’s more and more clear to me that Groovy is not only a viable sysadmin language, it’s almost a natural successor to Perl. Let’s quickly assess the requirements:

    1. Ubiquity – yes, Java is everywhere, and anywhere Java can be installed, Groovy can be installed
    2. Human-readable – I’d argue much more so than Perl
    3. Versatile – Using Grapes, the entire Maven repository is available. Maven seems to be far better managed than CPAN.
    4. Scriptable – #!/usr/bin/perl becomes #!/usr/bin/env groovy and just like that, you have a new sysadmin language

    Add to it good multi-platform IDE support (groovyConsole, IntelliJ, Neovim, VS Code) and I think we have a winner. There are many language features to make Groovy your new favourite script language. I know it is likely to be mine.

  • Review – TIDAL Music

    January 24th, 2023

    I’ve become disillusioned with the big-name music distributors, and I wanted to find a good replacement without sacrificing quality or selection. Price was not a concern provided it was competitive. Enter TIDAL Music, a high-quality music service for $29 CAD/mo for a family. This is slightly higher than other services, but a premium subscription assures that more money is being paid to the artist than other services.

    In terms of offerings, I was able to find all of my (somewhat) obscure music and far more. A large percentage of the content is available in “Master” quality, a treat if you have a nice sound system or a good pair of headphones. There’s also a smattering of Dolby Atmos and Spatial audio. If you’re used to the high-quality offerings on Prime and Apple, you won’t be disappointed by the fidelity of TIDAL tracks.

    For me, perhaps the most important feature of a music service is its ability to function as a DJ throughout the day. TIDAL makes this really easy. It provides the standard “artist” radios with non-stop streams based on a particular artist’s style. It also provides a “track” radio that does the same thing based on a specific track. But perhaps the most important feature for me is the daily curated music. TIDAL seems to be really smart. I spend my mornings going through the Discovery mix. From the input I provide through the discovery mix (liking tracks, albums and artists), I get a curated set of mixes of the distinct styles of music within my collection.

    The day it separated my music into dance music and industrial/EBM, I was happy. It seemed intelligent to keep the distinct styles separate. Two days later, there were 2 new categories: 80’s Synthpop and German Metal. Truly impressive, in my opinion. I thought Apple was pretty good at choosing music, but I find that TIDAL is the best DJ of them all.

    I’d definitely recommend thinking about switching if you’re tired of your current music provider, or just want to support the little guy and put a few more bucks in the artists’ pockets.

  • Stellaris – The first 40 years

    January 11th, 2023

    Happy New Year! I hope you had a relaxing and enjoyable holidays. My second Pulsar post is still in the works, but I haven’t written anything in a while, so I’m going to put up what I’ve been doing lately: working on my Stellaris opening. I’ve got a few hours under my belt, and this strategy is highly adaptable and resilient.

    2200-2210

    The goal by the end of 2210 is to have both of your guaranteed habitable planets founded. You’ll need a second science ship to ensure you find both in time, and the second science ship will need a leader. On console (where I’m playing) you still pay energy for leaders, so selling 100 food will give you a leg up in saving for the scientist. Selling off your excess monthly food will take care of the rest and ensure you have two science ships immediately. If you are playing a derivative of the Earth democracy, as I am, you will find your habitable planets in Sirius and Alpha Centauri. Sirius is a binary system and Alpha Centauri is a trinary system. Usually you can pick them out pretty easily. During this time, you will want to take the Discovery tradition, and also the To Boldly Go tradition. Once you have taken those two in Discovery, proceed to take Expansion, Colonization Fever, and A New Life. If you have enough unity (and you should make it a priority unless you start with a ton of it), you can have Colonization Fever taken before your first planet is complete, giving you an extra pop.

    2210-2220

    The goal by the end of 2220 is to have 20 corvettes and 100 monthly science. It’s tempting to reduce or delay expansion until you have these corvettes, because if you don’t build them, the AI will eat you for breakfast. However, generous use of the market should allow you to buy enough alloys to continue expanding at the pace of your influence, and still buy the 17 ships you need prior to 2220. To get to 100 monthly science, you will need one more research lab. This itself requires more goods, which in turn require more minerals. You’ll need planetside minerals from your first two planets, and one more farm. I build only as many cities as I need in the new colony (housing is the key thing here) until the planetary administration is in place.

    Now that the two production centres have been added, it’s extremely important to improve both goods and alloys production. At the first opportunity, build another industrial district, preferably on one of the new planets. Industrial districts really suck up the minerals, so ensure you have enough of a surplus before building it. Once your goods production is high enough, you’ll want the additional research lab. The second building slot is unfortunately going to have to go to an administrative building. If you are not playing a megacorp, you’ll need to build the administrative building before the research lab.

    Use the market to manage your surpluses and deficits. While you can’t overcome huge deficits, selling off some surplus goods and food can give you a little extra energy. Every little bit helps here. The 2220 deadline for your 20 corvettes is pretty hard if anyone else is nearby. Even the least aggressive empires will attack you for being so weak. I once had a pacifist empire offer to protect me, and then in 2220, offer me forced vassalization. So, get those ships up, and don’t stop expanding to do so.

    2220-2240

    So, awesome. You got your ships in time, you have 2 new planets well underway, and you’re sciencing away, looking for cool new systems to expand to, looking for your precursor empire and other goodies. Guess what? You have to do it again. Our goal by the end of 2240 is 200 science and 40 corvettes. If you neglect to build the second fleet of corvettes, you will also suffer, because odds are, someone else will. Thankfully it’s a little easier to get the additional naval capacity by taking the Superiority tradition, but you can still get there the old-fashioned way by building a Stronghold and 2 anchorages. In fact, I do both. I avoid taking Superiority early on, though, because there’s bigger fish to fry.

    After taking Colonization Fever and A New Life from the Expansion tree, I move to the Prosperity tree, because Earth is quickly going to run out of jobs without Interstellar Franchising. As a megacorp, I don’t have the early unity to afford taking the additional tradition – Interstellar Franchising must be taken with haste. That said, early unity is critical. The timing on some of these traditions in this opening cut very close – Colonization Fever is often taken just barely before the colony is finished. Earth is on the verge of running out of districts by the time I take Interstellar Franchising. Losing unity to unemployment and unhappiness is simply not going to be tolerated.

    To get to 200 science, you’ll need a research lab on both new planets. Again, make sure you’re building planetside minerals and industrial districts to build up your goods production. Make sure you keep buying alloys and push hard for the second fleet of corvettes. You can’t really breathe until you have them.

    General Advice

    Stellaris is a game of patience. Planets take a long time to grow, and it’s very easy to overbuild. I try not to build new districts and buildings until there’s only 1 job available. This way I know that all of the districts I’ve built to-date have been filled, and I won’t get weird shifting of workers between all the possible resources they could be working. Additionally, this will keep the sprawl to a minimum, always a good thing, because every administrative building you build takes up a valuable building slot and consumes valuable goods.

    The market is your best friend. You’ll never get a perfect distribution of resources, so you’ll need to trade for the optimal solution. I try to keep my food surplus between 0-10 and my goods surplus between 0-5. Anything more can be sold to buy alloys. I also try not to have much more than 500 goods or food on hand unless I’m saving for colonies. It really doesn’t serve any purpose than to back up your income. If you have 15k food and goods at the end of the game, you’re doing it wrong. Again, you can sell off food in quantities of 500 to keep your reserves low. I’d add that you can really make bank off the market if you get the Galactic Market (market fee -10%) and take the market fee reduction in the Diplomacy tradition (market fee -10%). At only 10% market fee, you trade with great efficiency, and with a global market, being a net exporter is awesome. You can also have some fun by messing with prices. If you have the cash and no need to buy (say) alloys, you can push the price of alloys through the roof and make it painful for empires that must buy it.

    Kick the AI’s ass with science. If you can hold them off early on by being an unattractive target (and make nice with diplomacy), you will get a ton of early game bonuses by researching anomalies and excavating archaeology sites. However, you still need to survey all of your nearby systems, so you’ll need to balance the need to keep moving with the need to research some of the higher-level anomalies. I usually use 300 days as a cut-off. If it’s more than 300 days, I leave it for later (or when I have very little to do). By the end of 40 years, you should start pulling ahead of the AI, and be well-positioned to transition into the middle game with a lot of options available to you. At very least, you will win your fight for survival. There’s a good chance you will be able to target 300 science and a fleet of destroyers for 2260.

    Starbase placement is key. You don’t have many starbases, so you need to make the best use of them. You won’t get starholds for a while, so you’ll need a combination of starbases and trade hubs that capture all of your planets. Be extremely mindful of the gap. If there’s gaps in your trade protection, you will invite pirates, and that will suck for you because you can’t afford the ships or the loss of income. So make sure there’s no gaps. You can put starbases beside each other, use trade hubs to expand your trade range and guns/missiles/hangar bays to extend trade protection range. There’s usually a configuration for your starting planet that will provide 100% trade protection without the need for patrols. I make it a policy not to expand to a planet until the trade network is in place.

    That’s about all the concrete tips I can offer about the early game. It’s not hard, but it’s important that you have 20 ships in 20 years and 40 ships in 40 years to stave off AI aggression. It’s also important to ensure you can afford each and every research lab you build. You actually don’t need that many to win the game, so be patient and make sure all of your districts are fully staffed before adding the research labs. I hope these help! My latest game has me with 9 planets and 25 destroyers before 2260 (in addition to the 40 corvettes), with 330 science to boot. I’d kick my neighbour’s ass except that we like each other and I’m making good money off of him. Good strategy leads to good RNG, and truly enjoyable games.

  • Apache Pulsar – NextGen Kafka?

    December 10th, 2022

    Some buzz came my way recently about Apache Pulsar, specifically Apache Pulsar hosted by StreamNative. Given that it has a free offering with no CC requirement, I decided to have a look. I’ll start by saying this blog post can’t possibly do it justice, but I’ll try to cover the basics at least. The documentation is reasonably good, and enough time spent going through them should get you the answers you need. I’ll walk through a simple application using Pulsar, and add some commentary at the end.

    I chose to connect a Blazor WASM application to Pulsar, since that’s what most of my client apps will be. There’s a bit of disappointment there, since it’s not possible to connect the WASM client directly to Pulsar and thus avoid delays associated with proxying. Still, that’s a lot to ask for, so it’s not a huge deal. Proxying through the server API is more secure, anyway since credentials don’t have to be sent to the client side.

    Create the application using dotnet new blazorwasm --hosted -n HelloPulsar -o HelloPulsar. Open the resulting solution file in your favourite IDE and clean it up by removing the Weather Forecast page, cleaning up navigation, etc. In the server component, add the Pulsar.Client NuGet package. This package is produced by the F# community, and attempts to mimic the Java API verbatim, as opposed to the official .NET client, DotPulsar, which doesn’t. Add the following initialization to Program.cs:

    var pulsarClient = await pulsarBuilder
        .ServiceUrl("pulsar+ssl://<my-cluster>.<my-org-namespace>.snio.cloud:6651")
        .Authentication(AuthenticationFactoryOAuth2.ClientCredentials(
            new Uri("https://auth.streamnative.cloud/"),
            "urn:sn:pulsar:<my-namespace>:<my-instance>",
            new Uri("file:///path/to/my/keyfile.json")))
        .BuildAsync();
    builder.Services.AddSingleton(pulsarClient);
    

    This will add the PulsarClient singleton to the application to allow us to create producers. Note that the service account used for authentication must have producer access to the tenant, namespace, or topic.

    Now, in a controller within the server API (say, PulsarController), create a method that says hello:

        [HttpPost("Hello")]
        public async Task<IActionResult> Hello([FromBody] string? name = "world")
        {
            var greeting = $"Hello, {name}!";
            var producer = await _pulsarClient.NewProducer(Schema.STRING())
                .Topic("persistent://public/default/hello")
                .CreateAsync(); 
            await producer.SendAsync(greeting);
            return Ok();
        }
    

    This will send a greeting using the posted string to Pulsar. You will need to create the topic first in the StreamNative portal. You can verify that the message was received in the StreamNative portal by creating a subscription and peeking the message.

    Ok, this works. But why would I want to choose it over Kafka? I can see some glimmers of why, and I’ll be keeping an eye on things to make a choice. The killer feature for me, I think, would be to use my own OAuth server for authentication. There’s some indication this is possible, though not yet with the StreamNative offering.

    The community seems kind of small, and the StreamNative product rather minimal at present. On the other hand, if Pulsar offers more features (especially the custom OAuth server) it could be worth migrating new projects from Kafka. More to come…

1 2 3 4
Next Page→

Blog at WordPress.com.

Privacy & Cookies: This site uses cookies. By continuing to use this website, you agree to their use.
To find out more, including how to control cookies, see here: Cookie Policy
 

Loading Comments...
 

    • Follow Following
      • Brian Richardson's Blog
      • Already have a WordPress.com account? Log in now.
      • Brian Richardson's Blog
      • Edit Site
      • Follow Following
      • Sign up
      • Log in
      • Report this content
      • View site in Reader
      • Manage subscriptions
      • Collapse this bar