Kafka Producer - C# Sync vs Async

Kafka Producer - C# Sync vs Async

To Sync or not to Sync in dotnet

Rules of Thumb

We're taught early and often that any I/O Bound operations should be done Asynchronously. A general rule of thumb is that anything that takes more than 50ms should be done async. While this is quite true in most cases, sometimes, just seeing an option to run a method async, we default to that implementation without much forethought.

Confluent Kafka Producer

Confluent offers a very nice Apache Kafka integration NuGet package Confluent.Kafka. Within this package you'll find nearly all the bits and pieces you'll need to connect with, consume and produce via Kafka. In particular, we'll look at the behavior of the default implementation of IProducer<TKey, TValue>

To first set the stage we'll pull in some Protobuf tooling via Google.Protobuf and Grpc.Tools NuGet packages. With those, we'll create a message as an instance of Message<TKey, TValue> with the following schema.

syntax = "proto3";

package ConcurrentFlows.KafkaProducer;

message WidgetEvent {
    string Greeting = 1;
}

Benchmark - Environment

We'll need a local Kafka cluster to run our benchmarks against and we can docker-compose.yml one up relatively easily via docker-compose up -d.

In addition to our cluster, we'll initialize two topics via topic-init.sh. One for a Sync Producer and one for an Async Producer.

Within our cluster, we've also stood up a Schema Registry and we'll use the associated Confluent NuGet package Confluent.SchemaRegistry.Serdes.Protobuf This package brings in the necessary bits and pieces to talk to the Schema Registry and serialize/deserialize, (Ser/Des), our Protobuf messages.

Finally, to easily create messages we'll leverage the Bogus NuGet package and create an instance of its Faker; Faker<WidgetEvent> = new();.

Benchmark - Setup

The complete benchmark setup can be found within ProduceBenchmarks.cs

To benchmark both Async and Sync Produce... methods we'll use BenchmarkDotNet and define a IProducer<TKey, TValue> for both Sync & Async. Each Producer will share the same config for both itself and its Registry.

Note: The default Acks setting is Acks = ALL. This means our Producers will wait for acknowledgment from all three replicas

First, the config

var producerConfig = new ProducerConfig()
{
    BootstrapServers = "localhost:9092,localhost:9093,localhost:9094",
    QueueBufferingMaxMessages = 500_000
};

var registryConfig = new SchemaRegistryConfig()
{
    Url = "localhost:8081",
};

Next, the Schema Registry

var registryClient = new CachedSchemaRegistryClient(registryConfig);

Then, our Sync Producer

syncProducer = new ProducerBuilder<string, WidgetEvent>(producerConfig)
    .SetValueSerializer(
        new ProtobufSerializer<WidgetEvent>(registryClient)
        .AsSyncOverAsync())
    .SetErrorHandler((p, e) 
        => Console.WriteLine(
            errorMessage, e.Code, e.Reason))
    .Build();

Finally, our Async Producer

asyncProducer = new ProducerBuilder<string, WidgetEvent>(producerConfig)
    .SetValueSerializer(
        new ProtobufSerializer<WidgetEvent>(registryClient))
    .SetErrorHandler((p, e) 
        => Console.WriteLine(
            errorMessage, e.Code, e.Reason))
    .Build();

Benchmarks - Methods

We'll measure the performance of two methods of interest

  1. Produce("topic", TMessage, deliveryHandler)

  2. ProduceAsync("topic", TMessage)

[Benchmark]
public void KafkaProducerSync()
{
    var msg = new Message<string, WidgetEvent>()
    {
        Key = $"{Guid.NewGuid()}",
        Value = faker.Generate()
    };
    syncProducer.Produce(sync_topic, msg,
        d =>
        {
            if (d.Error.IsError)
                throw new InvalidOperationException(
                    $"{d.Error.Code}:{d.Error.Reason}");
        });
}

[Benchmark]
public async Task KafkaProducerAsync()
{
    var msg = new Message<string, WidgetEvent>()
    {
        Key = $"{Guid.NewGuid()}",
        Value = faker.Generate()
    };
    await asyncProducer.ProduceAsync(async_topic, msg);
}

Benchmarks - Results

Running the benchmarks is as simple as kicking off our console project in Release

using BenchmarkDotNet.Running;
using ConcurrentFlows.KafkaProducer1;

Console.WriteLine("Starting Producer Benchmarks");
BenchmarkRunner.Run<ProducerBenchmarks>();

On the same platform, with the same previously stood-up cluster

Cluster -

  • Network concurrentflows - Created

  • Container zookeeper - Started

  • Container broker3 - Started

  • Container broker1 - Started

  • Container broker2 - Started

  • Container schema-registry - Started

  • Container rest-proxy - Started

  • Container topic-init - Started

Platform -

BenchmarkDotNet=v0.13.4, OS=Windows 11 (10.0.22621.1105)
12th Gen Intel Core i9-12900HK, 1 CPU, 20 logical and 14 physical cores
.NET SDK=7.0.100
[Host] : .NET 7.0.0 (7.0.22.51805), X64 RyuJIT AVX2
DefaultJob : .NET 7.0.0 (7.0.22.51805), X64 RyuJIT AVX2

We can see a significant difference between Produce() and ProduceAsync()

MethodMeanErrorStdDevGen0Gen1Allocated
KafkaProducerSync2.381 µs0.0468 µs0.0415 µs0.51120.00766.24 KB
KafkaProducerAsync15,291.646 µs87.8239 µs68.5671 µs--6.74 KB

Key Takeaway -

Produce() is ~6,000 times faster than ProduceAsync() !!!

Why such a difference???

To begin with, note we had set QueueBufferingMaxMessages to 500,000. We did this because messages will stack up in the underlying Producer buffer as they work their way out to all replicas. Essentially, Produce() is "Producing" faster than we're "Delivering".

The big highlight is that -

Both Produce() and ProduceAsync() are Asynchronous 🤓

  • ProduceAsync wraps the entire operation of "Producing" a message into a dotnet friendly Task<DeliveryResult<TKey, TValue>>

  • Produce leverages a callback style future defined by the third optional parameter deliveryHandler

This async style difference lets us decouple "Producing" from "Delivering" by deferring the handling of our DeliveryReport<TKey, TValue> . This is as opposed to waiting for a DeliveryResult<TKey, TValue>> to be fully formed.

Wrap Up

Now don't go change all your code to Produce() , it's still an unnecessary blocking call, albeit with minimal impact, even when your cluster is not localhost . What's important is to understand how these methods leverage asynchronicity in different ways and the way this impacts: Acknowledgement, Error Management, etc.

Ideally, I'd propose an entire separation of concerns of hot path message production and Producing/Delivering the message to the wire. That may be the subject of a future post. 🙃

Finally, find all relevant code here
ConcurrentFlows.HashNode/ConcurrentFlows.KafkaProducer1