Serializer Middleware
In this section, we will learn how to use the Serializer Middleware.
The Serializer Middleware is used to serialize and deserialize messages.
You can use one of the following common serializers or build your own:
How to use it
On the configuration, add the AddSerializer
/AddDeserializer
extension method to your producer/consumer middlewares to use it.
The AddSerializer
/AddDeserializer
method has two arguments:
- The first one must implement the
ISerializer
/IDeserializer
interface. - The second one is optional and must implement the
IMessageTypeResolver
interface. If the parameter is not provided, then theDefaultTypeResolver
will be used. Both classes can be provided as an argument through a factory method too.
For topics that have just one message type, use the AddSingleTypeSerializer
/AddSingleTypeDeserializer
method.
Serializer middleware also handles the produce of tombstone records. The messages produced are null
whenever the message value is null, but not when that value is an empty byte
array.
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddProducer<ProductEventsProducer>(producer => producer
...
.AddMiddlewares(middlewares => middleware
...
.AddSerializer<JsonMessageSerializer>() // Using the DefaultMessageTypeResolver
// or
.AddSerializer<JsonMessageSerializer, YourTypeResolver>()
// or
.AddSerializer(
resolver => new JsonMessageSerializer(...),
resolver => new YourTypeResolver(...))
// or
.AddSingleTypeSerializer<YourMessageType, JsonMessageSerializer>()
// or
.AddSingleTypeSerializer<YourMessageType>(resolver => new JsonMessageSerializer(...))
...
)
)
)
);
Adding Schema Registry support
Serializer middlewares can be used along with schema registry allowing the evolution of schemas according to the configured compatibility setting.
Install the KafkaFlow.SchemaRegistry package, configure the schema registry broker, and use one of the following packages to use all the schema registry integration features.
- KafkaFlow.Serializer.SchemaRegistry.ConfluentJson
- KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro
- KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddKafka(
kafka => kafka
.AddCluster(
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.WithSchemaRegistry(config => config.Url = "localhost:8081")
.AddProducer(
...
.AddMiddlewares(middlewares =>
middlewares.AddSchemaRegistryAvroSerializer(new AvroSerializerConfig{ SubjectNameStrategy = SubjectNameStrategy.TopicRecord })
)
.AddConsumer(
...
.AddMiddlewares(middlewares => middlewares.AddSchemaRegistryAvroDeserializer()
)
)
);
}
}
ConfluentAvro and ConfluentProtobuf type resolvers can support multiple types per topic however, due to the JSON serialization format used by confluent-kafka-dotnet, ConfluentJson type resolver can only resolve a single type of message per topic.
To be able to publish multiple type messages per topic, SubjectNameStrategy.Record
or SubjectNameStrategy.TopicRecord
must be used.
You can see a detailed explanation here.
Creating a Message Type Resolver
A type resolver is needed to instruct the middleware where to find the destination message type in the message metadata when consuming and where to store it when producing.
The framework has the DefaultTypeResolver
that will be used omitting the second type parameter in the AddSerializer
/AddDeserializer
method. You can create your own implementation of IMessageTypeResolver
to allow communication with other frameworks.
public class SampleMessageTypeResolver : IMessageTypeResolver
{
private const string MessageType = "MessageType";
public Type OnConsume(IMessageContext context)
{
var typeName = context.Headers.GetString(MessageType);
return Type.GetType(typeName);
}
public void OnProduce(IMessageContext context)
{
context.Headers.SetString(
MessageType,
$"{context.Message.GetType().FullName}, {context.Message.GetType().Assembly.GetName().Name}");
}
}