Some buzz came my way recently about Apache Pulsar, specifically Apache Pulsar hosted by StreamNative. Given that it has a free offering with no CC requirement, I decided to have a look. I’ll start by saying this blog post can’t possibly do it justice, but I’ll try to cover the basics at least. The documentation is reasonably good, and enough time spent going through them should get you the answers you need. I’ll walk through a simple application using Pulsar, and add some commentary at the end.
I chose to connect a Blazor WASM application to Pulsar, since that’s what most of my client apps will be. There’s a bit of disappointment there, since it’s not possible to connect the WASM client directly to Pulsar and thus avoid delays associated with proxying. Still, that’s a lot to ask for, so it’s not a huge deal. Proxying through the server API is more secure, anyway since credentials don’t have to be sent to the client side.
Create the application using dotnet new blazorwasm --hosted -n HelloPulsar -o HelloPulsar. Open the resulting solution file in your favourite IDE and clean it up by removing the Weather Forecast page, cleaning up navigation, etc. In the server component, add the Pulsar.Client NuGet package. This package is produced by the F# community, and attempts to mimic the Java API verbatim, as opposed to the official .NET client, DotPulsar, which doesn’t. Add the following initialization to Program.cs:
var pulsarClient = await pulsarBuilder
.ServiceUrl("pulsar+ssl://<my-cluster>.<my-org-namespace>.snio.cloud:6651")
.Authentication(AuthenticationFactoryOAuth2.ClientCredentials(
new Uri("https://auth.streamnative.cloud/"),
"urn:sn:pulsar:<my-namespace>:<my-instance>",
new Uri("file:///path/to/my/keyfile.json")))
.BuildAsync();
builder.Services.AddSingleton(pulsarClient);
This will add the PulsarClient singleton to the application to allow us to create producers. Note that the service account used for authentication must have producer access to the tenant, namespace, or topic.
Now, in a controller within the server API (say, PulsarController), create a method that says hello:
[HttpPost("Hello")]
public async Task<IActionResult> Hello([FromBody] string? name = "world")
{
var greeting = $"Hello, {name}!";
var producer = await _pulsarClient.NewProducer(Schema.STRING())
.Topic("persistent://public/default/hello")
.CreateAsync();
await producer.SendAsync(greeting);
return Ok();
}
This will send a greeting using the posted string to Pulsar. You will need to create the topic first in the StreamNative portal. You can verify that the message was received in the StreamNative portal by creating a subscription and peeking the message.
Ok, this works. But why would I want to choose it over Kafka? I can see some glimmers of why, and I’ll be keeping an eye on things to make a choice. The killer feature for me, I think, would be to use my own OAuth server for authentication. There’s some indication this is possible, though not yet with the StreamNative offering.
The community seems kind of small, and the StreamNative product rather minimal at present. On the other hand, if Pulsar offers more features (especially the custom OAuth server) it could be worth migrating new projects from Kafka. More to come…
I’ve spent a bit of time working with KeyCloak lately. It’s been some time since I looked in the Open Source world for an OIDC/OAuth2 solution, and when I found KeyCloak, I thought, “How did I miss this?”. I’ve been working with an ancient OIDC framework available for .NET, whose name escapes me right now. Later on, I came across IdentityServer4, now IdentityServer5, available as Duende IdentityServer under commercial license.
But KeyCloak was developed quietly by Red Hat, and seems to have gained some traction. Indeed, it is a highly capable authentication server, supporting the OIDC protocol, SSO complete with user provisioning via SCIM, and a complete OAuth2 implementation for more advanced scenarios. For this article, I’ll discuss the more basic approach of RBAC using KeyCloak and the built-in authn/authz available in .NET 6.
Unzip/untar the binary package in a system directory somewhere
Create a service that launches the KeyCloak server from the distribution
I’ll give instructions for Linux, but I imagine it should work equally well on any machine with a reasonably recent version of Java. I untarred under /opt, which creates a directory /opt/keycloak-20.01. I link this to /opt/keycloak.
We have to bootstrap the service before we can install it. We will need an initial username and password, and those will be set in the environment variables KEYCLOAK_ADMIN and KEYCLOAK_ADMIN_PASSWORD respectively. Before we start that, though, we need to install and configure Java and MySQL. I’ll leave this for the reader, as it’s usually just a simple matter of installing the openjdk-18-jdk and mysql-server packages.
Next, we need to modify /opt/keycloak/conf/keycloak.conf as follows:
# postgres is supported too if you prefer
db=mysql
# Create this user
db-username=keycloak
db-password=keycloak
# The full database JDBC URL. If not provided, a default URL is set based on the selected database vendor.
db-url=jdbc:mysql://localhost:3306/keycloak
# HTTPS - requires root privileges or change of port
https-protocols=TLSv1.3,TLSv1.2
https-port=443
# The file path to a server certificate or certificate chain in PEM format.
https-certificate-file=${kc.home.dir}/conf/server.crt.pem
# The file path to a private key in PEM format.
https-certificate-key-file=${kc.home.dir}/conf/server.key.pem
This will enable you to login to the service at http://localhost:8080 with the username and password set in the environment. Your first order of business should be to create a new administrative user with a secure password, and disable the admin user.
You can now stop the running service (Ctrl-C in the terminal in which you ran the kc.sh command. It is time to replace it with a proper service file and have KeyCloak start automatically on boot.
The first thing we will need is a new realm to manage our application’s users. Create a realm by logging into the server (hopefully you took the time to configure HTTPS!). In the top left corner, there is a dropdown that will list all the current realms:
You can add a new realm from the button on the bottom. Give it a name, and save it. This will bring you to the main configuration screen for your new realm:
There’s a lot here to configure, but don’t worry about most of it for now. A lot of the options are related to security policies and automatic enforcement of scopes and permissions using OAuth2 Resource Server flow. This is an advanced topic that this article will not cover.
For our purposes, we will configure just the name. We will use the Client settings to configure our RBAC. So, create a new Client by selecting Clients on the left navigation, and clicking Create. Fill in a name, leave the protocol on openid-connect. You don’t need to fill in the Root URL, but you can if you like.
Now you are at the main configuration screen for your new client:
We are only interested in roles. Go to the Roles tab and add any roles you might need (I used Administrator and User, making Administrator a composite role that contained User as well). You can then assign these roles to individual users in their details screen.
So adding users with roles is easy enough. How do we inform our application of those roles? We need to put a claim in the access token that will declare our roles to the application. KeyCloak’s built-in mapper for User Client Role will put those roles in a JSON block within the token as follows:
Unfortunately, .NET 6 won’t interpret these roles out-of-the-box, so we need to give it a little help. Help is provided in the form of a class extending AccountClaimsPrincipalFactory<RemoteUserAccount>. The base class provides a virtual method, CreateUserAsync(), that will construct the ClaimsIndentity given an access token (well, more specifically a token accessor – more on that below). The entire class looks like this:
public class KeycloakClaimsPrincipalFactory : AccountClaimsPrincipalFactory<RemoteUserAccount>
{
public KeycloakClaimsPrincipalFactory(IAccessTokenProviderAccessor accessor) : base(accessor)
{
}
public override async ValueTask<ClaimsPrincipal> CreateUserAsync(RemoteUserAccount account, RemoteAuthenticationUserOptions options)
{
var user = await base.CreateUserAsync(account, options);
if (user.Identity is ClaimsIdentity identity)
{
var tokenRequest = await TokenProvider.RequestAccessToken();
if (tokenRequest.Status == AccessTokenResultStatus.Success)
{
if (tokenRequest.TryGetToken(out var token))
{
var handler = new JwtSecurityTokenHandler();
var parsedToken = handler.ReadJwtToken(token.Value);
var json = parsedToken.Claims.SingleOrDefault(c => c.Type == "resource_access");
if (json?.Value != null)
{
var obj = JsonConvert.DeserializeObject<dynamic>(json.Value);
var roles = (JArray?) obj?["bookstore"]["roles"];
if (roles != null)
foreach (var role in roles)
identity.AddClaim(new Claim(ClaimTypes.Role, role.ToString()));
}
}
}
}
return user;
}
Note that we use the TokenProvider provided by the base class. This is an IAccessTokenProvider, which will use the IdP token endpoint to fetch a fresh access token. This is important to note, because if we are not yet authenticated, we obviously cannot get an access token, hence the need to ensure that we are receiving a valid token response prior to proceeding.
The key line here is var roles = (JArray?) obj?["bookstore"]["roles"]. A JArray works very much like a Javascript Array, and can dereference multiple levels of a hierarchy using array notation. Once we have the roles, we simply add the claim to the identity using the expected claim type and return the updated identity.
Now that we have an access token with the proper claims, we should be able to simply use the following service declaration:
(Note – this is Blazor WASM. You will need to use an appropriate package for Blazor Server to do the same thing). You will also need an appsettings.json with the following content:
I’ve spent a fair bit of time lately working on observability. To date, I’ve been using Application Insights with a fair bit of success, though it takes some work to make the traces correlate properly and give you related spans correctly. I started looking into OpenTelemetry, and found that it is a suitable replacement for Application Insights, and seems to be a little easier to use. However, as it is currently only just releasing version 1.0, it’s a little difficult to piece together things that don’t appear in the demo.
Most notably, I’m using MassTransit/RabbitMQ for messaging, and the messaging example uses the RabbitMQ.Client libraries instead. I’ve also put all the pieces in one place with commentary to try and make things a bit more convenient. First, you define your ActivitySources in a central location so they can be shared between components:
public static class Telemetry
{
public static readonly ActivitySource Service1Source = new ActivitySource("Service1");
// ...
}
Now, you can configure OpenTelemetry in the Web API producer:
protected async Task<IActionResult> HandleCommand<TCommand>(
TCommand command,
Action<TCommand>? commandModifier = null)
where TCommand : class
{
using var activity = Telemetry.ApiActivitySource
.StartActivity($"{typeof(T).Name}.{typeof(TCommand).Name}", ActivityKind.Producer);
try
{
commandModifier?.Invoke(command);
await _bus.Request<TCommand, CommandResponse>(command, callback: c =>
{
var contextToInject = default(ActivityContext);
if (activity != null)
contextToInject = activity.Context;
else if (Activity.Current != null)
contextToInject = Activity.Current.Context;
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), c, InjectContext);
});
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetTag("exception", ex.StackTrace);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
finally
{
activity?.Stop();
}
return Ok();
}
Here is a generic web API controller that handles commands where the aggregate type and command type are concatenated to form the command name. The key part here is the Propagator.Inject() call, so let’s look at the method it uses to inject the ActivityContext into the RabbitMQ headers:
private void InjectContext<TCommand>(SendContext<TCommand> context, string key, string value) where TCommand : class
{
context.Headers.Set(key, value);
}
It’s only a one-liner, and the Propagator does all the work for us by calling InjectContext with the key and value arguments it needs. The corresponding ExtractContext consumer looks like the following:
private IEnumerable<string> ExtractContext<TCommand>(ConsumeContext<TCommand> context, string key) where TCommand : class
{
if (context.Headers.TryGetHeader(key, out var value))
{
return new[] { value?.ToString() ?? string.Empty };
}
return Enumerable.Empty<string>();
}
This method needs to return an IEnumerable<string> with all values for the given key. In our case, there will only ever be one value, but the signature is set for us by the Propagator. The service itself uses the Propagator as follows:
public class ConsumerBase<TAggregate, TCommand> : IConsumer<TCommand>
where TAggregate : AggregateRoot<TAggregate>
where TCommand : class
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
protected readonly ApplicationService<TAggregate> Service;
protected readonly ActivitySource ActivitySource;
public ConsumerBase(ApplicationService<TAggregate> service, ActivitySource source)
{
Service = service;
ActivitySource = source;
}
public virtual async Task Consume(ConsumeContext<TCommand> context)
{
var parentContext = Propagator.Extract(default, context, ExtractContext);
using var activity =
ActivitySource.StartActivity($"{typeof(TAggregate).Name}.{typeof(TCommand).Name}", ActivityKind.Consumer, parentContext.ActivityContext);
try
{
await Service.Handle(context.Message);
await context.RespondAsync(new CommandResponse { Success = true });
activity?.SetStatus(Status.Ok);
}
catch (Exception ex)
{
await context.RespondAsync(new CommandResponse
{ Error = ex.Message, StackTrace = ex.StackTrace, Success = false });
activity?.SetTag("exception", ex.StackTrace);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
}
private IEnumerable<string> ExtractContext<TCommand>(ConsumeContext<TCommand> context, string key) where TCommand : class
{
if (context.Headers.TryGetHeader(key, out var value))
{
return new[] { value?.ToString() ?? string.Empty };
}
return Enumerable.Empty<string>();
}
}
The complete consumer base class is shown above.
Finally, in each process that requires OpenTelemetry, the tracer must be initialized:
services.AddOpenTelemetryTracing(ot => ot
.AddSource("MyNamespace.MyService")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("MyNamespace.MyService", serviceVersion: "1.0.0"))
.AddMassTransitInstrumentation()
.AddGrpcClientInstrumentation()
.AddOtlpExporter(otlp =>
{
otlp.Endpoint = new Uri(context.Configuration["Telemetry:OpenTelemetry"]);
otlp.Protocol = OtlpExportProtocol.Grpc;
}));
That should be it! OpenTelemetry is a very promising replacement for proprietary tracing protocols such as Application Insights and New Relic. Indeed, Application Insights now supports the use of OpenTelemetry instead of its proprietary protocol. OpenTelemetry allows for a choice of UIs for examining the telemetry. I am currently using Jaeger (jaegertracing.io), but have also looked at SigNoz (signoz.io). Either of these are very capable UIs, and OpenTelemetry seems very flexible and easy to use (once you figure out how!)
I recently switched to Auth0 as an OAuth2 provider, and was a little surprised to find how little data was stored in the bearer token. I’d previously shoved pretty much everything in there: profile and roles as well. Some of you may be snickering since you already know better. Access tokens should be short. But we still need all the user claims. This is available on the Auth0 server as /userinfo. The access token provided will also have access to the /userinfo endpoint, which contains the profile and claims that we are looking for. If you do the obvious thing, and load the claims and profile on demand when receiving the access token, you’ll find out something else about Auth0 – they rate limit the userinfo endpoint.
I put the following singleton into all of my web APIs and microservices:
public class ClaimsHolder
{
private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, object>> _claims = new();
public void AddClaim(string userid, string name, object value)
{
_claims.AddOrUpdate(userid,
k =>
{
var v = new ConcurrentDictionary<string, object>();
v[name] = value;
return v;
},
(k, v) =>
{
v[name] = value;
return v;
});
}
public IList<Claim> this[string userid]
{
get
{
return _claims.GetOrAdd(userid, new ConcurrentDictionary<string, object>())
.Select(c => new Claim(c.Key, c.Value.ToString() ?? throw new Exception("null claim??")))
.ToList();
}
set
{
_claims.AddOrUpdate(userid,
k => new ConcurrentDictionary<string, object>(),
(k, v) =>
{
foreach (var claim in value)
v.GetOrAdd(claim.Type, claim.Value);
return v;
});
foreach (var claim in value)
{
_claims[userid].AddOrUpdate(claim.Type,
k =>
{
var v = new ConcurrentDictionary<string, object>();
v[claim.Type] = claim.Value;
return v;
},
(k, v) => claim.Value);
}
}
}
}
A web API can use this claims holder as follows. First, add JWT bearer authentication to your Program.cs similar to the following:
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(JwtBearerDefaults.AuthenticationScheme, c =>
{
c.Authority = $"https://{auth0Domain}";
c.TokenValidationParameters = new()
{
ValidAudience = auth0Audience,
ValidIssuer = $"https://{auth0Domain}"
};
c.Events = new()
{
OnTokenValidated = async context =>
{
if (context.SecurityToken is not JwtSecurityToken accessToken) return;
token.Value = accessToken.RawData;
if (context.Principal?.Identity is ClaimsIdentity identity)
{
var userid = identity.Claims.Single(c => c.Type == ClaimTypes.NameIdentifier).Value;
var claims = claimsHolder[userid ?? throw new Exception("null user!")];
if (!claims.Any())
{
var httpClient = new HttpClient();
httpClient.DefaultRequestHeaders.Add("Authorization", $"Bearer {accessToken.RawData}");
claims = (await
httpClient.GetFromJsonAsync<Dictionary<string, object>>(
$"https://{auth0Domain}/userinfo")
)?.Select(x =>
new Claim(x.Key, x.Value?.ToString() ?? throw new Exception("null claim??"))).ToList();
if (claims != null)
foreach (var claim in claims.ToList())
claimsHolder.AddClaim(userid, claim.Type, claim.Value);
}
identity.AddClaims(claims ?? Enumerable.Empty<Claim>().ToList());
identity.AddClaim(new Claim("access_token", accessToken.RawData));
}
}
};
});
What we end up with, then, is a wrapper around a ConcurrentDictionary that holds a per-user ConcurrentDictionary containing all of the claims. This provides a convenience singleton to minimize the amount of times the user info must be retrieved from Auth0. There is a problem with this code, however, in that the claims are not refreshed when the token is refreshed. We should allow the access token to work until the end of its lifetime using the cached claims, but once the token is refreshed we should refresh the cached claims. I don’t currently know how to do this, however. For many low-security scenarios, the code above would work as-is; the claims just don’t change that often. Still, this is a problem that must be solved eventually.
Any system that has clients in multiple time zones is always problematic. With the rise in cloud services and containerized applications, it is frequently the case that your web or desktop clients do not use the same time as your server components (which are more frequently running in UTC). Typically this is dealt with by using DateTimeOffset instead of DateTime. But I’ve noticed that it doesn’t quite work as expected in Blazor WASM. While the actual DateTime value itself works as expected, the times are always displayed in GMT. Asking clients to work in GMT simply because the server or database does is not going to be acceptable.
The odd thing is that Blazor WASM seems to know the correct time zone offset:
<p>
Local Time Zone: @TimeZoneInfo.Local.DisplayName<br />
Local Time Offset: @TimeZoneInfo.Local.BaseUtcOffset<br />
Local Time: @DateTimeOffset.Now.ToString("R")
</p>
But, the Local Time displayed here is in GMT! Why? I haven’t really been able to find an answer, but plenty of similar complaints. The real oddity is as follows:
<p>
Local Time: @DateTimeOffset.Now.LocalDateTime.ToString("g")
</p>
I use the “g” format here because LocalDateTime is a DateTime property, without time zone information. This displays the correct local time! So why doesn’t DateTimeOffset.Now contain the correct offset information? This seems to remain a mystery.
Still, this is at least workable. The correct time is stored in the database and it is possible to display the local user’s time for DateTimeOffset values using the LocalDateTime property. A combination of this, plus the (correct, thankfully) data in System.TimeZoneInfo should provide all of the necessary components to display the time as desired.