Finally, the long-awaited second post on Apache Pulsar! I’ve spent some time modifying my code to work with StreamNative.io managed Pulsar, and this solution is quite elegant I think. I can’t take full credit for it, though, as the original source material (written in .NET Core 3.1) was written by Alexey Zimarev in his excellent book, [Hands-on Domain-Driven Design with .NET Core](https://www.amazon.com/Hands-Domain-Driven-Design-NET-ebook/dp/B07C5WSR9B). I have modified much of the original code to use .NET 6 features and to use databases other than RavenDB and Event Store. I won’t rewrite the book here, but I’d like to hit on some important parts of the solution.
Event Sourcing could be viewed as an “extreme” form of DDD, where our aggregates are truly aggregates of the events that built them. At first glance, this seems to be a pointless endeavour. After all, Pulsar doesn’t even have KSQL, so using it as a query source seems like it would have poor performance without all of the features like indexing and partitioning. In a sense, you would be right, but you’re only seeing half the picture.
In an Event-Sourced system, we can subscribe to the events using a second listener, whose job it is to process those events and “project” the results back to a document database like Mongo, Raven, or Cosmos. This is as good a place as any to start, so let’s start with the code for the SubscriptionManager
:
using EventFramework.EventSourcing;
using EventFramework.SharedKernel;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Pulsar.Client.Api;
using Pulsar.Client.Common;
namespace EventFramework.Pulsar;
public class SubscriptionManager : BackgroundService
{
private readonly string _subscriptionName;
private readonly PulsarClient _pulsarClient;
private readonly string _tenant;
private readonly string _namespace;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly ISubscription[] _subscriptions;
public SubscriptionManager(
string subscriptionName,
PulsarClient pulsarClient,
string tenant,
string @namespace,
CancellationTokenSource cancellationTokenSource,
params ISubscription[] subscriptions)
{
_subscriptionName = subscriptionName;
_pulsarClient = pulsarClient;
_tenant = tenant;
_namespace = @namespace;
_cancellationTokenSource = cancellationTokenSource;
_subscriptions = subscriptions;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
foreach (var subscription in _subscriptions)
{
await Task.Factory.StartNew(async () =>
{
var consumer = await _pulsarClient.NewConsumer(Schema.STRING())
.TopicsPattern($"persistent://{_tenant}/{_namespace}/.*")
.SubscriptionName($"{_subscriptionName}-{subscription.Name}")
.SubscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.SubscribeAsync();
while (!stoppingToken.IsCancellationRequested)
{
try
{
var message = await consumer.ReceiveAsync(_cancellationTokenSource.Token);
var clrType = message.Properties["ClrType"];
var type = Type.GetType(clrType) ?? throw new Exception($"Couldn't load CLR type '{clrType}'");
var json = message.GetValue();
var @event = (IId?) JsonConvert.DeserializeObject(json, type) ??
throw new Exception("Couldn't deserialize event");
await subscription.Project(@event);
await consumer.AcknowledgeAsync(message.MessageId);
}
catch (Exception)
{
// TODO: handle exception
}
}
}, _cancellationTokenSource.Token);
}
}
}
SubscriptionManager
is a BackgroundService
that connects to Pulsar and subscribes to all topics in a particular namespace. It should be noted that you must use the F# community Pulsar driver, and not the official Apache one to get this functionality. I have designed the system to have an analogous namespace for each C# namespace. Each aggregate instance will have its own topic, of the form <EntityType>.<EntityId>
, e.g., Election.ABDJCHDJ111
. As you can see, this code is rather straightforward. It creates the subscription, deserializes incoming JSON messages, and projects them into MongoDB.
The Projection
class is as follows:
using EventFramework.EventSourcing;
using EventFramework.SharedKernel;
using MongoDB.Driver;
namespace EventFramework.MongoDB;
public class Projection : ISubscription
{
private readonly IClientSession _session;
public string Name { get; }
private readonly Projector _projector;
public Projection(IClientSession session, Projector projector, string? name = null)
{
_session = session;
_projector = projector;
// you really should provide a name, as the subscription will reset every time the application restarts otherwise
Name = name ?? Guid.NewGuid().ToString();
}
public async Task Project(IId id)
{
var handler = _projector.Invoke(_session, id);
if (handler == null) return;
await handler();
}
}
public delegate Func<Task>? Projector(IClientSession session, IId id);
As you can see, this is simply a flyweight object instance in front of a Func<Task>
to make it easier to call the Project()
method. Where do these Func<Task>
s come from? Here’s a simple one from a simple voting application I am working on:
using EventFramework.MongoDB;
using EventFramework.SharedKernel;
using MongoDB.Driver;
namespace Service.Vote.Voter.Projections;
public static class VoterProjection
{
private const string DatabaseName = "votes";
private const string CollectionName = "voters";
public static Func<Task> GetHandler(IClientSession session, IId @event)
{
return @event switch
{
Event.Vote.Voter.Registered registered => () => HandleRegistered(session, registered),
Event.Vote.Voter.NameUpdated nameUpdated => () => HandleNameUpdated(session, nameUpdated),
Event.Vote.Voter.EmailUpdated emailUpdated => () => HandleEmailUpdated(session, emailUpdated),
_ => () => Task.CompletedTask
};
}
static Task HandleRegistered(IClientSession session, Event.Vote.Voter.Registered @event)
{
var voters = session.Client.GetDatabase(DatabaseName)
.GetCollection<ReadModel.Vote.Voter>(CollectionName);
return voters.UpsertItem(
@event.Id ?? throw new Exception("Voter Id is required"),
_ => Task.CompletedTask,
() => Task.FromResult(new ReadModel.Vote.Voter
{
Id = @event.Id,
Name = @event.VoterEmail,
Email = @event.VoterEmail
}));
}
static Task HandleNameUpdated(IClientSession session, Event.Vote.Voter.NameUpdated @event)
{
var voters = session.Client.GetDatabase(DatabaseName)
.GetCollection<ReadModel.Vote.Voter>(CollectionName);
return voters.Update(
@event.Id ?? throw new Exception("Voter Id is required"),
voter =>
{
voter.Name = @event.Name;
return Task.CompletedTask;
});
}
static Task HandleEmailUpdated(IClientSession session, Event.Vote.Voter.EmailUpdated @event)
{
var voters = session.Client.GetDatabase(DatabaseName)
.GetCollection<ReadModel.Vote.Voter>(CollectionName);
return voters.Update(
@event.Id ?? throw new Exception("Voter Id is required"),
voter =>
{
voter.Email = @event.Email;
return Task.CompletedTask;
});
}
}
Aha! Now we are into something that looks like real work. This class writes data into MongoDB for us to use. This is accomplished by the use of the extension methods UpsertItem()
, Update()
, and Delete()
. Here they are:
namespace EventFramework.MongoDB;
public static class Extensions
{
public static async Task Update<T>(
this IMongoCollection<T> collection,
string id,
Func<T, Task> update) where T : IId, new()
{
var item = await collection.AsQueryable().SingleOrDefaultAsync(x => x.Id == id);
if (item == null)
throw new Exception($"Unable to update {typeof(T).Name}: not found");
await update(item);
await collection.ReplaceOneAsync(Builders<T>.Filter.Where(x => x.Id == id), item);
}
public static async Task UpsertItem<T>(
this IMongoCollection<T> collection,
string id,
Func<T, Task> update,
Func<Task<T>> create) where T : IId, new()
{
var item = await collection.AsQueryable().SingleOrDefaultAsync(x => x.Id == id);
if (item == null)
{
item = await create();
await collection.InsertOneAsync(item);
}
else
{
await update(item);
await collection.ReplaceOneAsync(Builders<T>.Filter.Where(x => x.Id == id), item,
new ReplaceOptions { IsUpsert = true });
}
}
public static async Task Delete<T>(
this IMongoCollection<T> collection,
string id) where T : IId
{
await collection.DeleteOneAsync(Builders<T>.Filter.Where(x => x.Id == id));
}
}
And there’s the generic CRUD, the other half of the picture. By registering this SubscriptionManager
as a HostedService
within your application, you can easily keep the “read side” in sync with the “write side”. To be more precise, though, I will call the “read side” the “query store”, and the write site the “aggregate store”. The query store is obviously what is used by the web application to query and render data. The aggregate store provides long-term storage for the events that form our aggregates. Pulsar allows us to set the TTL and maximum size of a given namespace’s persistent storage. By setting both of these to -1, you can make a namespace persist the messages (events) permanently.
OK, let’s look at the aggregate store in more detail:
using System.Reflection;
using System.Text;
using EventFramework.EventSourcing;
using EventFramework.SharedKernel;
using Newtonsoft.Json;
using Pulsar.Client.Api;
using Pulsar.Client.Common;
namespace EventFramework.Pulsar;
public class PulsarAggregateStore : IAggregateStore
{
private readonly PulsarClient _pulsarClient;
private readonly string _tenant;
private readonly string _namespace;
public PulsarAggregateStore(PulsarClient client, string tenant, string @namespace)
{
_pulsarClient = client;
_tenant = tenant;
_namespace = @namespace;
}
private string GetTopicName<T>(AggregateId<T> aggregateId) where T : AggregateRoot<T>
{
var @namespace = typeof(T).Namespace?.ToLower() ?? "default";
var entityType = typeof(T).FullName?.ToLower().Replace($"{@namespace}.", "")
?? throw new Exception("Unable to determine entity type");
return $"persistent://{_tenant}/{_namespace}/{entityType}.{aggregateId}";
}
public async Task<bool> Exists<T>(AggregateId<T> aggregateId) where T : AggregateRoot<T>
{
try
{
var topic = GetTopicName(aggregateId);
await using var reader = await _pulsarClient.NewReader(Schema.STRING())
.Topic(topic)
.StartMessageId(MessageId.Earliest)
.CreateAsync();
return await reader.HasMessageAvailableAsync();
}
catch (Exception)
{
return false;
}
}
public async Task Save<T>(T aggregate) where T : AggregateRoot<T>
{
var topic = GetTopicName(aggregate.Id);
await using var producer = await _pulsarClient.NewProducer(Schema.STRING())
.Topic(topic)
.ProducerName(Guid.NewGuid().ToString())
.CreateAsync();
foreach (var change in aggregate.GetChanges())
{
var json = JsonConvert.SerializeObject(change,
new JsonSerializerSettings {TypeNameHandling = TypeNameHandling.Objects});
var message = producer.NewMessage(json)
.WithProperties(new Dictionary<string, string>
{
{
"ClrType",
change.GetType().AssemblyQualifiedName ?? throw new Exception("Unable to determine ClrType")
}
});
await producer.SendAsync(message);
}
aggregate.ClearChanges();
}
public async Task<T?> Load<T>(AggregateId<T> aggregateId) where T : AggregateRoot<T>
{
var cts = new CancellationTokenSource();
var ctor = typeof(T).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic,
new[] {typeof(AggregateId<T>)});
if (ctor == null)
throw new Exception("No load constructor found");
var loaded = (T?) ctor.Invoke(new object?[] {aggregateId}) ??
throw new Exception("Failed to construct aggregate");
var topic = GetTopicName(aggregateId);
var reader = await _pulsarClient.NewReader(Schema.STRING())
.Topic(topic)
.StartMessageId(MessageId.Earliest)
.CreateAsync();
var changes = new List<IId>();
while (await reader.HasMessageAvailableAsync())
{
var message = await reader.ReadNextAsync(cts.Token);
var type = Type.GetType(message.Properties["ClrType"])
?? throw new Exception($"Failed to load CLR type '{message.Properties["ClrType"]}'");
var change = (IId?) JsonConvert.DeserializeObject(Encoding.UTF8.GetString(message.Data), type);
if (change == null)
throw new Exception("Failed to deserialize change");
changes.Add(change);
}
loaded.Load(changes);
return loaded;
}
If you read this code, you will find it is pretty straight-forward. It provides the basic methods Exists()
, Load()
, and Store()
. We can only load or store a single object at a time, by its primary key. As mentioned earlier, we cannot query the aggregate store as we can using KSQL, but this architecture doesn’t need to. About the only important point about this code is the existence of a “load” constructor. We need to initialize the object instance with some default values prior to loading, and we need to accept an AggregateId
of the correct type. Here’s an example of a “load constructor”:
// Used by PulsarService.Load()
private Election(AggregateId<Election> id) : base(id)
{
Name = Name.FromString("Loading...");
Description = Description.FromString("Loading...");
StartDate = StartDate.FromDateTimeOffset(DateTimeOffset.Now);
EndDate = EndDate.FromDateTimeOffset(DateTimeOffset.Now);
}
The constructor is located using reflection, so it can be made private. We simply give some sane default values that will not cause the system to crash if loading fails, and we make it easy for both the end-user and the developer to see that loading has failed. Note the use of the Value Object types Name
and Description
. These value objects are patterns from DDD. Here’s an example of Name
:
using EventFramework.EventSourcing;
namespace Domain.Vote.Election;
public class Name : Value<Name>
{
private string Value { get; }
private Name(string? value)
{
if (string.IsNullOrWhiteSpace(value))
throw new ArgumentNullException(nameof(value));
Value = value;
}
public static Name FromString(string? value) => new(value);
public static implicit operator string?(Name? name) => name?.Value;
public override string ToString() => Value;
}
I’m going to include the code for Value<T>
as well, because it does a lot of “magical” things that make working with value objects really easy. This one is 100% Alexey 🙂
namespace EventFramework.EventSourcing;
public class Value<T> where T : Value<T>
{
private static readonly Member[] Members = GetMembers().ToArray();
public override bool Equals(object? obj)
{
if (obj is null) return false;
if (ReferenceEquals(this, obj)) return true;
return obj.GetType() == typeof(T) && Members.All(
m =>
{
var objValue = m.GetValue(obj);
var thisValue = m.GetValue(this);
return m.IsNonStringEnumerable
? GetEnumerableValues(objValue).SequenceEqual(GetEnumerableValues(thisValue))
: objValue?.Equals(thisValue) ?? thisValue == null;
});
}
private static IEnumerable<object?> GetEnumerableValues(object? obj)
{
if (obj == null) throw new ArgumentNullException(nameof(obj));
var enumerator = ((IEnumerable) obj).GetEnumerator();
while (enumerator.MoveNext()) yield return enumerator.Current;
throw new ArgumentNullException(nameof(obj));
}
private static IEnumerable<Member> GetMembers()
{
var t = typeof(T);
const BindingFlags flags = BindingFlags.Instance | BindingFlags.Public;
while (t != typeof(object))
{
if (t == null) continue;
foreach (var p in t.GetProperties(flags)) yield return new Member(p);
foreach (var f in t.GetFields(flags)) yield return new Member(f);
t = t.BaseType;
}
}
public override int GetHashCode()
{
return CombineHashCodes(Members.Select(m => m.IsNonStringEnumerable
? CombineHashCodes(GetEnumerableValues(m.GetValue(this)))
: m.GetValue(this)));
}
private static int CombineHashCodes(IEnumerable<object?> values)
{
return values.Aggregate(17, (current, value) => current * 59 + (value?.GetHashCode() ?? 0));
}
public static bool operator ==(Value<T>? left, Value<T>? right)
{
return Equals(left, right);
}
public static bool operator !=(Value<T>? left, Value<T>? right)
{
return !Equals(left, right);
}
public override string? ToString()
{
if (Members.Length == 1)
{
var m = Members[0];
var value = m.GetValue(this);
return m.IsNonStringEnumerable
? $"{string.Join("|", GetEnumerableValues(value))}"
: value?.ToString();
}
var values = Members.Select(
m =>
{
var value = m.GetValue(this);
return m.IsNonStringEnumerable
? $"{m.Name}:{string.Join("|", GetEnumerableValues(value))}"
: m.Type != typeof(string)
? $"{m.Name}:{value}"
: value == null
? $"{m.Name}:null"
: $"{m.Name}:\"{value}\"";
});
return $"{typeof(T).Name}[{string.Join("|", values)}]";
}
}
internal struct Member
{
public readonly string Name;
public readonly Func<object, object?> GetValue;
public readonly bool IsNonStringEnumerable;
public readonly Type Type;
public Member(MemberInfo info)
{
switch (info)
{
case FieldInfo field:
Name = field.Name;
GetValue = obj => field.GetValue(obj);
IsNonStringEnumerable = typeof(IEnumerable).IsAssignableFrom(field.FieldType) &&
field.FieldType != typeof(string);
Type = field.FieldType;
break;
case PropertyInfo prop:
Name = prop.Name;
GetValue = obj => prop.GetValue(obj);
IsNonStringEnumerable = typeof(IEnumerable).IsAssignableFrom(prop.PropertyType) &&
prop.PropertyType != typeof(string);
Type = prop.PropertyType;
break;
default:
throw new ArgumentException("Member is not a field or property?", info.Name);
}
}
}
There’s a lot of code here, but it’s easy enough to summarize. This code simply makes value objects behave as you’d expect. When used in string context, they behave like strings. They are strongly typed so that you can’t just use any old string, you have to use one that’s gone through the type’s constructor, thus giving another extension point to add business rules. You can cut and paste this code and forget about it.
Here’s a complete example of a domain class that extends AggregateRoot<T>
:
using EventFramework.EventSourcing;
using EventFramework.SharedKernel;
using Domain.Vote.Ballot;
using Domain.Vote.Competition;
using Domain.Vote.Voter;
namespace Domain.Vote.Election;
public class Election : AggregateRoot<Election>
{
public Name? Name { get; private set; }
public Description? Description { get; private set; }
public StartDate? StartDate { get; private set; }
public EndDate? EndDate { get; private set; }
public IDictionary<Competition.Competition, IList<Candidate.Candidate>> Competitions { get; }
= new Dictionary<Competition.Competition, IList<Candidate.Candidate>>();
public IList<Ballot.Ballot> Ballots { get; } = new List<Ballot.Ballot>();
public IList<Voter.Voter> Voters { get; } = new List<Voter.Voter>();
public Election(
AggregateId<Election> id,
Name name,
Description description,
StartDate startDate,
EndDate? endDate = null) : base(id)
{
Apply(new Event.Vote.Election.Created
{
Id = id.ToString(),
Name = name,
Description = description,
StartDate = startDate,
EndDate = endDate
});
}
// Used by PulsarService.Load()
private Election(AggregateId<Election> id) : base(id)
{
Name = Name.FromString("Loading...");
Description = Description.FromString("Loading...");
StartDate = StartDate.FromDateTimeOffset(DateTimeOffset.Now);
EndDate = EndDate.FromDateTimeOffset(DateTimeOffset.Now);
}
protected override void EnsureValidState()
{
if (string.IsNullOrWhiteSpace(Name))
throw new InvalidElectionStateException("Name is required");
if (string.IsNullOrWhiteSpace(Description))
throw new InvalidElectionStateException("Description is required");
if (StartDate == null)
throw new InvalidElectionStateException("Start date is required");
}
public void SetName(Name name)
{
Apply(new Event.Vote.Election.NameUpdated
{
Id = Id.ToString(),
Name = name
});
}
public void SetDescription(Description description)
{
Apply(new Event.Vote.Election.DescriptionUpdated
{
Id = Id.ToString(),
Description = description
});
}
public void SetStartDate(StartDate startDate)
{
Apply(new Event.Vote.Election.StartDateUpdated
{
Id = Id.ToString(),
StartDate = startDate
});
}
public void SetEndDate(EndDate? endDate)
{
Apply(new Event.Vote.Election.EndDateUpdated
{
Id = Id.ToString(),
EndDate = endDate
});
}
public string RegisterVoter(AggregateId<Voter.Voter> voterId, Voter.Name name, Email email)
{
var ballotId = Ulid.NewUlid();
if (Voters.Any(v => v.Email == email))
throw new InvalidVoterStateException($"Voter with email {email} has already been registered");
Apply(new Event.Vote.Voter.Registered
{
Id = voterId.ToString(),
ElectionId = Id.ToString(),
VoterName = name,
VoterEmail = email
});
Apply(new Event.Vote.Ballot.Issued
{
Id = ballotId.ToString(),
ElectionId = Id.ToString()
});
return ballotId.ToString();
}
public void AddCompetition(AggregateId<Competition.Competition> competitionId, Competition.Name name)
{
Apply(new Event.Vote.Competition.Added
{
Id = competitionId.ToString(),
Name = name
});
}
public void RemoveCompetition(AggregateId<Competition.Competition> competitionId)
{
Apply(new Event.Vote.Competition.Removed
{
Id = competitionId.ToString(),
ElectionId = Id.ToString()
});
}
protected override void When(IId? @event)
{
AggregateId<Competition.Competition> competitionId;
AggregateId<Election> electionId;
switch (@event)
{
case Event.Vote.Election.Created ev:
Id = ElectionId.FromString(ev.Id);
Name = Name.FromString(ev.Name);
Description = Description.FromString(ev.Description);
StartDate = StartDate.FromDateTimeOffset(ev.StartDate);
EndDate = ev.EndDate != null ? EndDate.FromDateTimeOffset(ev.EndDate) : null;
break;
case Event.Vote.Competition.Added ev:
competitionId = CompetitionId.FromString(ev.Id);
electionId = ElectionId.FromString(ev.ElectionId);
var name = Competition.Name.FromString(ev.Name);
var competition = new Competition.Competition(electionId, competitionId, name);
Competitions.Add(competition, new List<Candidate.Candidate>());
break;
case Event.Vote.Competition.Removed ev:
competitionId = CompetitionId.FromString(ev.Id);
Competitions.Remove(Competitions.Keys.First(c => c.Id == competitionId));
break;
case Event.Vote.Voter.Registered ev:
var voter = new Voter.Voter(
VoterId.FromString(ev.Id),
Voter.Name.FromString(ev.VoterName),
Email.FromString(ev.VoterEmail),
Id);
Voters.Add(voter);
break;
case Event.Vote.Ballot.Issued ev:
electionId = ElectionId.FromString(ev.ElectionId);
if (Id == electionId)
{
var ballot = new Ballot.Ballot(BallotId.FromString(ev.Id), Id);
Ballots.Add(ballot);
}
break;
case Event.Vote.Election.NameUpdated ev:
Name = Name.FromString(ev.Name);
break;
case Event.Vote.Election.DescriptionUpdated ev:
Description = Description.FromString(ev.Description);
break;
case Event.Vote.Election.StartDateUpdated ev:
StartDate = StartDate.FromDateTimeOffset(ev.StartDate);
break;
case Event.Vote.Election.EndDateUpdated ev:
EndDate = ev.EndDate == null ? null : EndDate.FromDateTimeOffset(ev.EndDate);
break;
}
}
}
Here is your event-sourced aggregate root. It responds to the events it is interested in, and calls the appropriate methods on the domain object itself. It has no dependencies other than the SharedKernel
library, and other domain projects.
Here’s the code for AggregateRoot<T>
:
namespace EventFramework.EventSourcing;
public abstract class AggregateRoot<T> : IInternalEventHandler where T : AggregateRoot<T>
{
private readonly List<IId> _changes = new();
public AggregateId<T> Id { get; protected set; }
public int Version { get; protected set; } = -1;
protected AggregateRoot(AggregateId<T> id)
{
Id = id;
}
protected abstract void EnsureValidState();
protected abstract void When(IId? @event);
public void Handle(IId @event, bool addChange = false)
{
When(@event);
if (addChange)
_changes.Add(@event);
}
protected void Apply(IId @event)
{
When(@event);
EnsureValidState();
_changes.Add(@event);
}
public IEnumerable<IId> GetChanges()
{
return _changes.AsEnumerable();
}
public void Load(IEnumerable<IId?> history)
{
foreach (var ev in history)
{
When(ev);
Version++;
}
}
public void ClearChanges()
{
_changes.Clear();
}
protected static void ApplyToEntity(IInternalEventHandler? entity, IId @event)
{
entity?.Handle(@event);
}
}
Once again, this is 100% Alexey other than the formatting update to .NET 6. The EventSourcing.EventFramework
package was taken almost verbatim other than updates to .NET 6. It provides convenience methods to get unpersisted changes, to load a set of changes from the aggregate store, and to clear the changes in memory.
And herein lies the elegance. None of the code I have posted is particularly complex (with the exception of the Value Object code), yet the sum is so much greater than the parts. Here’s an example of an ApplicationService<T>
that provides instructions to the framework on how to respond to events for a particular aggregate root:
using EventFramework.EventSourcing;
using Domain.Vote.Election;
namespace OlympiaVotes.Service.Vote.Election;
public class ElectionService : ApplicationService<Domain.Vote.Election.Election>
{
public ElectionService(IAggregateStore store) : base(store)
{
CreateWhen<Event.Vote.Election.Created>(
ev => ElectionId.FromString(ev.Id),
(ev, id) => Task.FromResult(new Domain.Vote.Election.Election
(
id,
Name.FromString(ev.Name),
Description.FromString(ev.Description),
StartDate.FromDateTimeOffset(ev.StartDate),
EndDate.FromDateTimeOffset(ev.EndDate)
)));
UpdateWhen<Event.Vote.Election.NameUpdated>(
ev => ElectionId.FromString(ev.Id),
(election, ev) =>
{
election.SetName(Name.FromString(ev.Name));
return Task.CompletedTask;
});
UpdateWhen<Event.Vote.Election.DescriptionUpdated>(
ev => ElectionId.FromString(ev.Id),
(election, ev) =>
{
election.SetDescription(Description.FromString(ev.Description));
return Task.CompletedTask;
});
UpdateWhen<Event.Vote.Election.StartDateUpdated>(
ev => ElectionId.FromString(ev.Id),
(election, ev) =>
{
election.SetStartDate(StartDate.FromDateTimeOffset(ev.StartDate));
return Task.CompletedTask;
});
UpdateWhen<Event.Vote.Election.EndDateUpdated>(
ev => ElectionId.FromString(ev.Id),
(election, ev) =>
{
election.SetEndDate(EndDate.FromDateTimeOffset(ev.EndDate));
return Task.CompletedTask;
});
}
}
I like this declarative style. It’s easy to see what this constructor accomplishes. The ApplicationService<T>
is a dependency of PulsarService<T>
.
About the only other major code point that should be looked at to gain a full understanding is the DI container setup:
using System.Security.Cryptography.X509Certificates;
using Azure.Identity;
using Azure.Security.KeyVault.Certificates;
using Azure.Security.KeyVault.Secrets;
using EventFramework.EventSourcing;
using EventFramework.MongoDB;
using EventFramework.Pulsar;
using MongoDB.Driver;
using Newtonsoft.Json.Linq;
using Domain.Vote.Ballot;
using Domain.Vote.Candidate;
using Domain.Vote.Competition;
using Domain.Vote.Election;
using Domain.Vote.Voter;
using Service.Vote.Ballot;
using Service.Vote.Ballot.Projections;
using Service.Vote.Candidate;
using Service.Vote.Candidate.Projections;
using Service.Vote.Competition;
using Service.Vote.Competition.Projections;
using Service.Vote.Election;
using Service.Vote.Election.Projections;
using Service.Vote.Voter;
using Service.Vote.Voter.Projections;
using Pulsar.Client.Api;
var host = Host.CreateDefaultBuilder(args);
host.ConfigureServices((context, services) =>
{
// the root cancellation token source
var cancellationTokenSource = new CancellationTokenSource();
services.AddSingleton(cancellationTokenSource);
// secrets and certificates from Azure Key Vault
// requires configuration Azure.KeyVaultUrl in appsettings.{Environment}.json
var secrets = new SecretClient(new Uri(context.Configuration.GetSection("Azure")["KeyVaultUrl"]),
new DefaultAzureCredential());
services.AddSingleton(secrets);
var certificates = new CertificateClient(new Uri(context.Configuration.GetSection("Azure")["KeyVaultUrl"]),
new DefaultAzureCredential());
services.AddSingleton(certificates);
// fetch the StreamNative OAuth token using client_credentials grant extracted from key file
// requires the secrets Pulsar-ClientId and Pulsar-Client-Secret in Azure Key Vault
// find the values for these secrets in the service account key JSON file
var httpClient = new HttpClient();
var response = httpClient.PostAsync(new Uri(context.Configuration.GetSection("Pulsar")["TokenUrl"]),
new FormUrlEncodedContent(new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", secrets.GetSecret("Pulsar-ClientId-OlympiaVotes").Value.Value},
{"client_secret", secrets.GetSecret("Pulsar-ClientSecret-OlympiaVotes").Value.Value},
{"audience", context.Configuration.GetSection("Pulsar")["Audience"]}
}))
.GetAwaiter()
.GetResult();
var json = response.Content.ReadAsStringAsync().GetAwaiter().GetResult();
var token = JObject.Parse(json)["access_token"]?.Value<string>();
// Pulsar dependencies
// requires creation of the Pulsar tenant and namespaces in StreamNative console
var pulsar = new PulsarClientBuilder()
.ServiceUrl(context.Configuration.GetSection("Pulsar")["ServiceUrl"])
.Authentication(AuthenticationFactory.Token(token))
.BuildAsync()
.GetAwaiter()
.GetResult();
services.AddSingleton(pulsar);
services.AddSingleton<IAggregateStore>(sp =>
new PulsarAggregateStore(pulsar, "votes", "domain.vote"));
// microservices
services.AddSingleton<ApplicationService<Ballot>, BallotService>();
services.AddSingleton<ApplicationService<Candidate>, CandidateService>();
services.AddSingleton<ApplicationService<Competition>, CompetitionService>();
services.AddSingleton<ApplicationService<Election>, ElectionService>();
services.AddSingleton<ApplicationService<Voter>, VoterService>();
services.AddHostedService(sp => new PulsarService<Ballot>(sp.GetRequiredService<PulsarClient>(),
sp.GetRequiredService<ApplicationService<Ballot>>(),
"votes"));
services.AddHostedService(sp => new PulsarService<Candidate>(sp.GetRequiredService<PulsarClient>(),
sp.GetRequiredService<ApplicationService<Candidate>>(),
"votes"));
services.AddHostedService(sp => new PulsarService<Competition>(sp.GetRequiredService<PulsarClient>(),
sp.GetRequiredService<ApplicationService<Competition>>(),
"votes"));
services.AddHostedService(sp => new PulsarService<Election>(sp.GetRequiredService<PulsarClient>(),
sp.GetRequiredService<ApplicationService<Election>>(),
"votes"));
services.AddHostedService(sp => new PulsarService<Voter>(sp.GetRequiredService<PulsarClient>(),
sp.GetRequiredService<ApplicationService<Voter>>(),
"votes"));
// MongoDB dependencies
// requires the secret MongoDb-Connection-String, found in the Atlas console
// requires the certificate MongoDb-Client-Credential, downloaded from Atlas console, converted to PFX and uploaded
// to Azure Key Vault
var mongoSettings =
MongoClientSettings.FromConnectionString(secrets.GetSecret("MongoDb-Connection-String").Value.Value);
mongoSettings.SslSettings = new SslSettings
{
ClientCertificates = new List<X509Certificate2>
{
certificates.DownloadCertificate("MongoDb-Client-Credential").Value
}
};
var mongoClient = new MongoClient(mongoSettings);
services.AddSingleton<IMongoClient>(mongoClient);
services.AddSingleton<IClientSession>(sp => sp.GetRequiredService<IMongoClient>().StartSession());
services.AddScoped(sp => sp.GetRequiredService<IMongoClient>().StartSession());
// subscription manager
services.AddHostedService(sp => new SubscriptionManager(
"votes", sp.GetRequiredService<PulsarClient>(),
"votes", "domain.vote",
sp.GetRequiredService<CancellationTokenSource>(),
new Projection(sp.GetRequiredService<IClientSession>(), BallotProjection.GetHandler),
new Projection(sp.GetRequiredService<IClientSession>(), CandidateProjection.GetHandler),
new Projection(sp.GetRequiredService<IClientSession>(), CompetitionProjection.GetHandler),
new Projection(sp.GetRequiredService<IClientSession>(), ElectionProjection.GetHandler),
new Projection(sp.GetRequiredService<IClientSession>(), VoterProjection.GetHandler)));
});
await host.Build().RunAsync();
One last thing I wanted to demonstrate is the use of the WebApiController
that Alexey provided:
using EventFramework.Pulsar;
using EventFramework.SharedKernel;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json;
using Pulsar.Client.Api;
namespace EventFramework.WebApi;
public abstract class WebApiController : ControllerBase
{
private readonly string _topic;
protected PulsarClient PulsarClient { get; }
protected WebApiController(PulsarClient pulsarClient, PulsarTopic topic)
{
_topic = topic;
PulsarClient = pulsarClient;
}
protected async Task<IActionResult> HandleEvent(IId @event)
{
var producer = await PulsarClient.NewProducer(Schema.STRING())
.Topic(_topic)
.CreateAsync();
var json = JsonConvert.SerializeObject(@event,
new JsonSerializerSettings {TypeNameHandling = TypeNameHandling.Objects});
var message = producer.NewMessage(json)
.WithProperties(new Dictionary<string, string>
{
// TODO: Access token
{"ClrType", @event.GetType().AssemblyQualifiedName ?? throw new Exception("Unknown CLR type")}
});
await producer.SendAsync(message);
return Ok();
}
}
The WebApiController
is a simple controller base that provides the HandleEvent()
method, which simply publishes the incoming message to the PulsarService
input queue. Here’s a class that extends WebApiController
. It’s wonderfully simple:
using EventFramework.Pulsar;
using EventFramework.WebApi;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Event;
using Pulsar.Client.Api;
namespace OlympiaVotes.Server.Controllers;
[Authorize, ApiController, Route("api/[controller]")]
public class ElectionController : WebApiController
{
public ElectionController(PulsarClient pulsarClient, PulsarTopic topic) : base(pulsarClient, topic)
{
}
[HttpPost]
public Task<IActionResult> Post([FromBody] Vote.Election.Created @event) => HandleEvent(@event);
[HttpPut("name")]
public Task<IActionResult> PutName([FromBody] Vote.Election.NameUpdated @event) => HandleEvent(@event);
[HttpPut("description")]
public Task<IActionResult> PutDescription([FromBody] Vote.Election.DescriptionUpdated @event) => HandleEvent(@event);
[HttpPut("startDate")]
public Task<IActionResult> PutStartDate([FromBody] Vote.Election.StartDateUpdated @event) => HandleEvent(@event);
[HttpPut("endDate")]
public Task<IActionResult> PutEndDate([FromBody] Vote.Election.EndDateUpdated @event) => HandleEvent(@event);
}
Those are all the components of an Event Sourced system I can think of right now. I hope this code helps you on your Pulsar journey and starts you thinking about Event Sourced systems. I believe that it is a robust architecture that provides elegant solutions to many common application concerns such as logging and auditing. The separation of query store and aggregate store allows a high-performance system without sacrificing visibility. If you haven’t read Alexey’s book yet, I think it’s an important primer to understand this content. I have barely done justice to what he has written. If you aren’t a DDD expert, I think that’s the book for you, even if you don’t know C#.