Serializer Middleware
In this section, we will learn how to use the Serializer Middleware.
The Serializer Middleware is used to serialize and deserialize messages.
You can use one of the following common serializers or build your own:
How to use it
Install the KafkaFlow.Serializer package.
dotnet add package KafkaFlow.Serializer
On the configuration, add the AddSerializer extension method to your producer/consumer middlewares to use it.
The AddSerializer method has two arguments:
- The first one must implement the
IMessageSerializerinterface. - The second one is optional and must implement the
IMessageTypeResolverinterface. If the parameter is not provided, then theDefaultTypeResolverwill be used. Both classes can be provided as an argument through a factory method too.
For topics that have just one message type, use the AddSingleTypeSerializer method.
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddProducer<ProductEventsProducer>(producer => producer
...
.AddMiddlewares(middlewares => middleware
...
.AddSerializer<JsonMessageSerializer>() // Using the DefaultMessageTypeResolver
// or
.AddSerializer<JsonMessageSerializer, YourTypeResolver>()
// or
.AddSerializer(
resolver => new JsonMessageSerializer(...),
resolver => new YourTypeResolver(...))
// or
.AddSingleTypeSerializer<JsonMessageSerializer, YourMessageType>()
// or
.AddSingleTypeSerializer<YourMessageType>(resolver => new JsonMessageSerializer(...))
...
)
)
)
);
Adding Schema Registry support
Serializer middlewares can be used along with schema registry allowing the evolution of schemas according to the configured compatibility setting.
Install the KafkaFlow.SchemaRegistry package, configure the schema registry broker, and use one of the following packages to use all the schema registry integration features.
- KafkaFlow.Serializer.SchemaRegistry.ConfluentJson
- KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro
- KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddKafka(
kafka => kafka
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.WithSchemaRegistry(config => config.Url = "localhost:8081")
.AddProducer(
...
.AddMiddlewares(middlewares =>
middlewares.AddSchemaRegistryAvroSerializer(new AvroSerializerConfig{ SubjectNameStrategy = SubjectNameStrategy.TopicRecord })
)
.AddConsumer(
...
.AddMiddlewares(middlewares => middlewares.AddSchemaRegistryAvroSerializer()
)
)
);
}
}
ConfluentAvro and ConfluentProtobuf type resolvers can support multiple types per topic however, due to the JSON serialization format used by confluent-kafka-dotnet, ConfluentJson type resolver can only resolve a single type of message per topic.
To be able to publish multiple type messages per topic, SubjectNameStrategy.Record or SubjectNameStrategy.TopicRecord must be used.
You can see a detailed explanation here.
Creating a Message Type Resolver
A type resolver is needed to instruct the middleware where to find the destination message type in the message metadata when consuming and where to store it when producing.
The framework has the DefaultTypeResolver that will be used omitting the second type parameter in the AddSerializer method. You can create your own implementation of IMessageTypeResolver to allow communication with other frameworks.
public class SampleMessageTypeResolver : IMessageTypeResolver
{
private const string MessageType = "MessageType";
public Type OnConsume(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);
return Type.GetType(typeName);
}
public void OnProduce(IMessageContext context)
{
context.Headers.SetString(
MessageType,
$"{context.Message.GetType().FullName}, {context.Message.GetType().Assembly.GetName().Name}");
}
}