From v2 to v3
KafkaFlow version 3 brings several significant changes and improvements. This guide will help you navigate through the migration process from version 2 to version 3.
Table of Contents
Prerequisites
As .NET Core 3.1 has reached the end of support in its lifecycle, we have updated references to Core Packages (Microsoft.*
and System.*
) to version 6.
While the KafkaFlow core and most of the extension packages are still targeting netstandard2.0
which supports a range of runtimes, we recommend with this v3 update to target at least .NET 6 in applications using KafkaFlow.
Update package references
To update to KafkaFlow v3, change the Version
related to KafkaFlow packages to the latest v3 available in each project referencing KafkaFlow packages.
<ItemGroup>
- <PackageReference Include="KafkaFlow" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Abstractions" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Admin" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Admin.Dashboard" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Admin.WebApi" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.BatchConsume" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Compressor" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Compressor.Gzip" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Extensions.Hosting" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.LogHandler.Console" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.LogHandler.Microsoft" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.OpenTelemetry" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.SchemaRegistry" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer.JsonCore" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer.NewtonsoftJson" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer.ProtobufNet" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer.SchemaRegistry.ConfluentJson" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.TypedHandler" Version="2.5.0" />
- <PackageReference Include="KafkaFlow.Unity" Version="2.5.0" />
+ <PackageReference Include="KafkaFlow" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Abstractions" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Admin" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Admin.Dashboard" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Admin.WebApi" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Compressor.Gzip" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Extensions.Hosting" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.LogHandler.Console" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.LogHandler.Microsoft" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Microsoft.DependencyInjection" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.OpenTelemetry" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.SchemaRegistry" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Serializer.JsonCore" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Serializer.NewtonsoftJson" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Serializer.ProtobufNet" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Serializer.SchemaRegistry.ConfluentJson" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf" Version="3.0.0" />
+ <PackageReference Include="KafkaFlow.Unity" Version="3.0.0" />
</ItemGroup>
Breaking Changes
The update to v3 introduces some breaking changes. Please consider them when updating KafkaFlow.
1. Update to .NET 6 with Admin Packages
The target frameworks for ASP.NET Admin projects were updated from netcoreapp3.1
to .net6.0
.
If you are using the Admin packages (KafkaFlow.Admin.Dashboard
and KafkaFlow.Admin.WebApi
) you will need to target the .NET 6 runtime.
2. UI Dashboard URL Change
Related Issues: #303
To be in conformity with KafkaFlow naming conventions, the UI Dashboard URL has changed from /kafka-flow
to /kafkaflow
. Update any bookmarks or references accordingly.
3. Removed Packages and Core Changes
3.1 Removed Packages
Related Issues: #301
Because some KafkaFlow extension packages contained elements that belong to the core of KafkaFlow, these packages were removed and moved to the core library. Most of these packages contained only interfaces or core functionalities that were moved to the core library while the concrete implementations continue to exist as separate packages to keep KafkaFlow's modular design.
Here is the list of KafkaFlow packages that no longer exist in v3. In case you are referencing any of these packages, please remove those references since their references were moved to the core KafkaFlow package:
- KafkaFlow.TypedHandler
- KafkaFlow.Compressor
- KafkaFlow.Serializer
- KafkaFlow.BatchConsume
3.2 Serializer and Compressor Interface Segregation
Before KafkaFlow v3, references were used on the consumer side, for example, AddSerializer()
or .AddCompressor()
. While this convention was used for simplicity's sake to keep the same nomenclature as in the producer counterpart, the nomenclature can be error-prone since on the consumer side the behavior when using AddSerializer()
is adding a middleware for deserializing the message.
To fix this, on KafkaFlow v3, the IDeserializer
and IDecompressor
were created to segregate from the ISerializer
and ICompressor
interfaces.
In case you are configuring the consumer to deserialize and/or decompress a message similar to this configuration:
.AddConsumer(
consumerBuilder => consumerBuilder
.Topic("test-topic")
.WithGroupId("group1")
.AddMiddlewares(
middlewares => middlewares
.AddCompressor<GzipMessageCompressor>()
.AddSerializer<JsonCoreSerializer>()
)
)
The following changes need to be made when updating to KafkaFlow v3.
.AddConsumer(
consumerBuilder => consumerBuilder
.Topic("test-topic")
.WithGroupId("group1")
.AddMiddlewares(
middlewares => middlewares
.AddDecompressor<GzipMessageDecompressor>()
.AddDeserializer<JsonCoreDeserializer>()
)
)
Not only the .AddSerializer()
and .AddCompressor()
were renamed to .AddDeserializer()
and .AddDecompressor()
, but also the concrete implementations like JsonCoreSerializer
and GzipMessageCompressor
were renamed to JsonCoreDeserializer
and GzipMessageDecompressor
. Please be aware that this change is only on the consumer side.
Having this segregation contributes to a more consistent name scheme together with the middleware behavior. Nevertheless, other changes related to this topic were made, here is a complete list of changes related to the segregation of ISerializer
and ICompressor
interfaces (consumer configuration only):
Related to Deserialization:
IDeserializer
interface created..AddSerializer()
renamed to.AddDeserializer()
.- Created
JsonCoreDeserializer
,ProtobufNetDeserializer
,ConfluentProtobufDeserializer
,NewtonsoftJsonDeserializer
,ConfluentJsonDeserializer
andConfluentAvroDeserializer
that implements the.DeserializeAsync()
method. .AddSingleTypeDeserializer()
renamed to.AddSingleTypeDeserializer()
.AddSingleTypeSerializer()
renamed to.AddSchemaRegistryAvroDeserializer()
Related to Decompression:
IDecompressor
interface created..AddCompressor()
renamed to.AddDecompressor()
.- Created
GzipMessageDecompressor
that implements the.Decompress()
method.
3.3 Consumer Batching Configuration
Consumer batching configuration renamed from .BatchConsume()
to .AddBatching()
. Update your configuration accordingly.
3.4. Async Support for Message Type Resolvers and Schema Registry Resolvers
In both IMessageTypeResolvers
and ISchemaRegistryTypeNameResolver
the interface methods only supported sync calls before KafkaFlow v3.
To accommodate some use cases where calls to these methods can be parallelized, the methods defined in this interface were refactored to support the Async pattern.
If your application has a concrete implementation of these interfaces, please make sure to update your code accordingly by using the async/await pattern. Here is the list of changes related to this subject:
Resolve()
renamed toResolveAsync()
inISchemaRegistryTypeNameResolver
and now returns aTask<string>
.OnConsume()
renamed toOnConsumeAsync()
inIMessageTypeResolver
and returns aValueTask<Type>
.OnProduce()
renamed toOnProduceAsync()
inIMessageTypeResolver
and returns aValueTask
.
New Features
Additionally, with the update to v3, some quality-of-life features were added to KafkaFlow core, mainly related to how workers are configured.
1. Dynamic Workers Calculation
One important feature added to KafkaFlow v3 was the ability to calculate how many workers are handling the message consumption dynamically. Please check the feature documentation here.
In load scenarios where the consumer needs to ramp up message consumption to reduce the consumer lag, increasing the number of workers can be a valid strategy in this scenario.
Before, the number of workers was determined statically by configuration at the time of application startup. With KafkaFlow v3, a new method was introduced that supports calculating the number of available workers periodically. This allows the application to adapt constantly and also being able to scale the number of workers during the application's lifetime.
To accommodate this requirement multiple changes were made to the core of KafkaFlow, next we will go through some of these changes.
1.1 Dynamic Worker Pool Scaling Configuration
Related Issues: #429
Two new overrides were added to the consumer configuration. Nevertheless, the previous method of statically determining the worker count is still available.
The new overrides added were:
WithWorkersCount(Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator)
WithWorkersCount(Func<WorkersCountContext, IDependencyResolver, Task<int>> calculator, TimeSpan evaluationInterval)
The difference between both overrides is that the first works with a predefined frequency of 5 minutes between calls to the delegate that returns the number of workers. While in the second override that frequency can be user-defined.
Additionally, the delegate receives a WorkersCountContext
that sends some consumer-related context to the callback like the Consumer Name, the Consumer Group Key and the Consumer Partitions Assignment. Also, the IDependencyResolver
is made available in case some dependencies need to be resolved to determine the worker count.
This means that a consumer worker count can be configured like this:
.AddConsumer(
consumerBuilder => consumerBuilder
.Topic("test-topic")
.WithGroupId("group1")
.WithWorkersCount((context, resolver) => {
// Use whatever logic to determine the worker count
return GetWorkerCount(context, resolver);
}, TimeSpan.FromMinutes(15))
)
1.2 Improved Mechanism for Signalling Message Processing Completion
Related Issues: #426
For the KafkaFlow engine to adapt to the dynamic change of workers, the way the messages are signaled as completed also needed to be updated because the workers can only be scaled after all the messages in the worker's buffers are signaled as completed.
To achieve this, a new method in the message context was provided to signal the message completion .Complete()
. Additionally, a new property called AutoMessageCompletion
was added to signal if KafkaFlow should signal the message as completed automatically which is the default behavior. This property is useful for scenarios where we don't want to complete the message right away, for example in Batch Consume scenarios.
- Use
WithManualMessageCompletion()
instead ofWithManualStoreOffsets()
. - Use
context.ConsumerContext.Complete()
instead ofcontext.ConsumerContext.StoreOffset()
.
1.3 Expose Worker Events to Client Applications
Related Issues: #427
KafkaFlow now exposes some events related to its internal functionality. The events that are being published are the following:
- MessageProduceStarted
- MessageProduceCompleted
- MessageProduceError
- MessageConsumeStarted
- MessageConsumeCompleted
- MessageConsumeError
- WorkerStopping
- WorkerStopped
- WorkerProcessingEnded
While worker-related events are available through the IWorker
reference, the message-related events can be subscribed using the .SubscribeGlobalEvents()
configuration method.
services.AddKafka(
kafka => kafka
.UseConsoleLog()
.SubscribeGlobalEvents(hub =>
{
hub.MessageConsumeStarted.Subscribe(eventContext => Console.WriteLine("Message Consume Started"));
hub.MessageConsumeError.Subscribe(eventContext => Console.WriteLine("Message Consume Error"));
hub.MessageConsumeCompleted.Subscribe(eventContext => Console.WriteLine("Message Consume Completed"));
hub.MessageProduceStarted.Subscribe(eventContext => Console.WriteLine("Message Produce Started"));
hub.MessageProduceError.Subscribe(eventContext => Console.WriteLine("Message Produce Error"));
hub.MessageProduceCompleted.Subscribe(eventContext => Console.WriteLine("Message Produce Completed"));
})
);
1.4 Improve Dependency Injection Scope Management
Related Issues: #428
Some improvements were made to the way the dependency scopes are managed to provide more fine-grained control over dependency lifecycles. Here are some changes that were made in this context:
IMessageContext
now exposes a propertyDependencyResolver
which is intrinsically tied to the scope of a single processed message.IConsumerContext
introduces two properties:ConsumerDependencyResolver
andWorkerDependencyResolver
. The former will resolve dependencies tied to the lifecycle of a single consumer, whereas the latter caters to a single worker's lifecycle.
2. Improved Worker Distribution Strategy
Related Issues: #440
KafkaFlow v2 already provided a method to define the worker to handle a message when consuming using the IDistributionStrategy
interface. However, the only context provided by this method was the partition key of the message, which sometimes can be insufficient to determine how to distribute messages across workers in more complex scenarios.
With KafkaFlow v3 this interface got renamed to IWorkerDistributionStrategy
and now the method .GetWorkerAsync()
receives a WorkerDistributionContext
structure. Properties like message topic and partition are now part of the context which provides more flexibility when choosing the worker to handle a message.
Conclusion
Please ensure you review and adapt your codebase according to these changes. If you encounter any issues or need assistance, feel free to reach out to the KafkaFlow community. Thank you for using KafkaFlow!