Apache Kafka C#.NET-Producer and Consumer-Part II

Kafka CNET Producer and Consumer Part II c kafka

Today in this series of Kafka .net core tutorial articles, we will learn Kafka C# .NET-Producer with examples. We will create a .NET Client application that produces messages to and consumes messages from an Apache Kafka cluster.

What is KAFKA

Kafka is a popular messaging system that facilitates the implementation of event-driven architectures

Kafka, developed by Apache, excels at handling high-throughput, real-time data streaming.

It acts as a distributed event streaming platform, where events are produced and consumed by various applications.

Kafka’s publish-subscribe model allows multiple consumers to subscribe to the same event stream, ensuring scalability and fault tolerance.

Its durability and fault-tolerant design make it suitable for scenarios demanding data integrity, such as log aggregation, real-time analytics, and monitoring.

Today in this article, we will cover below aspects of Kafka Producer configuration,

So we shall be creating a Kafka client for the below,

You can create a Kafka cluster using any of the below approaches

  • Confluent Cloud Cluster
  • Your localhost (if any)
  • Remote Kafka cluster(Any)

Below discussed approach can be used for any of the above Kafka clusters configured.

We shall connect to the Confluent cluster hosted in the cloud and demonstrate the feature.

Here we will configure our client with the required cluster credentials and try to create both producer and consumer clients.

For Consumer clients, please visit the below article,

Getting started

Create .NET Core application( .NET Core 3.1 or 6.0 ,net45, netstandard1.3, netstandard2.1 and above)

Install below Nuget package from Nuget Package Manager. The below Nuget package is officially supported by Confluent.

Using Package Manager Console,

PM>Install-Package Confluent.Kafka -Version 1.5.1

Or

Using Command prompts,

dotnet add package -v 1.5.1 Confluent.Kafka

This NuGet package comes with all basic classes and methods which let you define the configuration.

Please define the class ProducerConfig.

This class initializes a new Confluent.Kafka.ProducerConfig instance wrapping an existing Confluent.Kafka.ClientConfig instance.

Define Kafka C# – Producer Configuration

Define Producer configuration using the class ProducerConfig. Please make sure to define config details like BootstrapServers etc.

Define properties like SaslMechanism or SecurityProtocol accordingly.

SaslUsername and SaslPassword properties can be defined from CLI or Cloud interface.

 var pConfig = new ProducerConfig
            {
                BootstrapServers = "xxxx",
                SaslMechanism = SaslMechanism.Plain,
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SaslUsername = "xxxxxxx",
                SaslPassword = "xxxx+"
            };

The above configuration is currently hardcoded but you can use Configurationbuilder to load them from the configuration file easily. For more details , please visit

Example

{

"BootstrapServers": "*****",

"SaslUsername": "****",

SaslPassword :"****"

}

It’s recommended to use secured storage for sensitive and confidential details like Vault etc.

Publish Message to Kafka Topics

Let’s create a Producer and Publish the message to Kafka Topics.

ProducerBuilder class method called Produce() which asynchronously sends a single message to a Kafka topic.

Let’s use the above-defined config and build it with ProducerBuilder,

using (var p = new ProducerBuilder<Null, string>(pConfig).Build())
            {
                for (int i = 0; i < 100; ++i)
                {
                  
                    p.Produce(topic, new Message<Null, string> { Value = "Test"+ i.ToString() }, handler);
                    Thread.Sleep(3000);

                }

                // wait for up to 10 seconds for any inflight messages to be delivered.
                p.Flush(TimeSpan.FromSeconds(10));
            }

In the above method ‘topic‘ is the Kafka topic name where you want to produce the messages so that consumers can consume them.

pConfig is the config object defined above and has all details of which topic message will be produced.

The partition for the message sent is determined by the partitioner defined using the ‘partitioner’ configuration property.

A delegate above defined ‘ handler‘ will be called with a delivery report corresponding to produce requests.

Below is the implementation of the handler,

            Action<DeliveryReport<Null, string>> handler = r =>
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");


TopicPartitionOffset represents a Kafka detail on Topic, Partition, and Offset details.

Kafka c examples kafka producer c example

In the above example, we are producing 100 messages with incremental values “Test1”, “Test2″…. and so on.

Below is how Kafka’s topic shows populated massages.

Once executed below are the results populating Kafka topics with messages,

Kafka NET examples C Kafka kafka producer example

That’s All! This was very much the basics of getting started with Apache Kafka .NET.

References:

Do you have any comments or ideas or any better suggestions to share?

Please sound off your comments below.

Happy Coding !!

Summary

It’s simple to use the .NET Client application to produce the messages to the Apache Kafka topic. Confluent Kafka is a lightweight wrapper around librdkafka that provides an easy interface for the Producer Client to produce messages asynchronously to the Kafka topic.



Please bookmark this page and share it with your friends. Please Subscribe to the blog to receive notifications on freshly published(2024) best practices and guidelines for software design and development.



5 thoughts on “Kafka C#.NET-Producer and Consumer-Part II

  1. You never mentioned where in kafka config files do we need to set the values for SaslUsername and SaslPassword

Leave a Reply

Your email address will not be published. Required fields are marked *