I’ve spent a fair bit of time lately working on observability. To date, I’ve been using Application Insights with a fair bit of success, though it takes some work to make the traces correlate properly and give you related spans correctly. I started looking into OpenTelemetry, and found that it is a suitable replacement for Application Insights, and seems to be a little easier to use. However, as it is currently only just releasing version 1.0, it’s a little difficult to piece together things that don’t appear in the demo.
Most notably, I’m using MassTransit/RabbitMQ for messaging, and the messaging example uses the RabbitMQ.Client libraries instead. I’ve also put all the pieces in one place with commentary to try and make things a bit more convenient. First, you define your ActivitySources in a central location so they can be shared between components:
public static class Telemetry
{
public static readonly ActivitySource Service1Source = new ActivitySource("Service1");
// ...
}
Now, you can configure OpenTelemetry in the Web API producer:
protected async Task<IActionResult> HandleCommand<TCommand>(
TCommand command,
Action<TCommand>? commandModifier = null)
where TCommand : class
{
using var activity = Telemetry.ApiActivitySource
.StartActivity($"{typeof(T).Name}.{typeof(TCommand).Name}", ActivityKind.Producer);
try
{
commandModifier?.Invoke(command);
await _bus.Request<TCommand, CommandResponse>(command, callback: c =>
{
var contextToInject = default(ActivityContext);
if (activity != null)
contextToInject = activity.Context;
else if (Activity.Current != null)
contextToInject = Activity.Current.Context;
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), c, InjectContext);
});
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
activity?.SetTag("exception", ex.StackTrace);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
finally
{
activity?.Stop();
}
return Ok();
}
Here is a generic web API controller that handles commands where the aggregate type and command type are concatenated to form the command name. The key part here is the Propagator.Inject() call, so let’s look at the method it uses to inject the ActivityContext into the RabbitMQ headers:
private void InjectContext<TCommand>(SendContext<TCommand> context, string key, string value) where TCommand : class
{
context.Headers.Set(key, value);
}
It’s only a one-liner, and the Propagator does all the work for us by calling InjectContext with the key and value arguments it needs. The corresponding ExtractContext consumer looks like the following:
private IEnumerable<string> ExtractContext<TCommand>(ConsumeContext<TCommand> context, string key) where TCommand : class
{
if (context.Headers.TryGetHeader(key, out var value))
{
return new[] { value?.ToString() ?? string.Empty };
}
return Enumerable.Empty<string>();
}
This method needs to return an IEnumerable<string> with all values for the given key. In our case, there will only ever be one value, but the signature is set for us by the Propagator. The service itself uses the Propagator as follows:
public class ConsumerBase<TAggregate, TCommand> : IConsumer<TCommand>
where TAggregate : AggregateRoot<TAggregate>
where TCommand : class
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
protected readonly ApplicationService<TAggregate> Service;
protected readonly ActivitySource ActivitySource;
public ConsumerBase(ApplicationService<TAggregate> service, ActivitySource source)
{
Service = service;
ActivitySource = source;
}
public virtual async Task Consume(ConsumeContext<TCommand> context)
{
var parentContext = Propagator.Extract(default, context, ExtractContext);
using var activity =
ActivitySource.StartActivity($"{typeof(TAggregate).Name}.{typeof(TCommand).Name}", ActivityKind.Consumer, parentContext.ActivityContext);
try
{
await Service.Handle(context.Message);
await context.RespondAsync(new CommandResponse { Success = true });
activity?.SetStatus(Status.Ok);
}
catch (Exception ex)
{
await context.RespondAsync(new CommandResponse
{ Error = ex.Message, StackTrace = ex.StackTrace, Success = false });
activity?.SetTag("exception", ex.StackTrace);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
}
private IEnumerable<string> ExtractContext<TCommand>(ConsumeContext<TCommand> context, string key) where TCommand : class
{
if (context.Headers.TryGetHeader(key, out var value))
{
return new[] { value?.ToString() ?? string.Empty };
}
return Enumerable.Empty<string>();
}
}
The complete consumer base class is shown above.
Finally, in each process that requires OpenTelemetry, the tracer must be initialized:
services.AddOpenTelemetryTracing(ot => ot
.AddSource("MyNamespace.MyService")
.SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("MyNamespace.MyService", serviceVersion: "1.0.0"))
.AddMassTransitInstrumentation()
.AddGrpcClientInstrumentation()
.AddOtlpExporter(otlp =>
{
otlp.Endpoint = new Uri(context.Configuration["Telemetry:OpenTelemetry"]);
otlp.Protocol = OtlpExportProtocol.Grpc;
}));
That should be it! OpenTelemetry is a very promising replacement for proprietary tracing protocols such as Application Insights and New Relic. Indeed, Application Insights now supports the use of OpenTelemetry instead of its proprietary protocol. OpenTelemetry allows for a choice of UIs for examining the telemetry. I am currently using Jaeger (jaegertracing.io), but have also looked at SigNoz (signoz.io). Either of these are very capable UIs, and OpenTelemetry seems very flexible and easy to use (once you figure out how!)