Skip to main content
Version: 3.x

Batch Consume Middleware

In this section, we will learn how to use the Batch Consume Middleware.

The Batch Consume Middleware is used to accumulate a number of messages or wait some time to build a collection of messages and deliver them to the next middleware to be processed, as it was just one message.

How to use it

On the configuration, use the AddBatching extension method to add the middleware to your consumer middlewares.

The AddBatching method has two arguments:

  • The first one must define the maximum batch size.
  • The second one defines the TimeSpan that the Middleware waits for new messages to be part of the batch.
services.AddKafka(kafka => kafka
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(
consumerBuilder => consumerBuilder
...
.AddMiddlewares(
middlewares => middlewares
...
.AddBatching(100, TimeSpan.FromSeconds(10)) // Configuration of the BatchConsumeMiddleware
.Add<HandlingMiddleware>() // Middleware to process the batch
)
)
)
);

To access the batch from the next middleware, use the GetMessagesBatch method accessible through the context argument.

warning

When using the Batch Consume middleware, the IServiceScopeFactory should be used to create scopes instead of the IServiceProvider, as the latter may dispose the scope.

using KafkaFlow;

internal class HandlingMiddleware : IMessageMiddleware
{
public Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
var batch = context.GetMessagesBatch();

(...)

return Task.CompletedTask;
}
}
tip

You can find a sample on batch processing here.