Durable Retries
In this section, we will learn how to use Durable Retries.
Durable Retries are useful when beyond a certain amount of retries and waiting, you want to keep processing next-in-line messages but you can't lose the current offset message. As persistence databases, MongoDb or SqlServer is available. And you can manage in-retry messages through HTTP API.
The configuration can be done during KafkaFlow Configuration process by registering a Middleware.
How to use it
Install the KafkaFlow.Retry package.
dotnet add package KafkaFlow.Retry
Install the package for the desired storage:
- SqlServer: KafkaFlow.Retry.SqlServer
- Postgres: KafkaFlow.Retry.Postgres
- MongoDb: KafkaFlow.Retry.MongoDb
On the configuration, add the RetryDurable
middleware extension method to your consumer middlewares to use it.
The RetryDurable
receives an Action as an argument to configure the Retry policy.
.AddMiddlewares(
middlewares => middlewares // KafkaFlow middlewares
.RetryDurable(
(config) => config
.Handle<NonBlockingException>()
.WithMessageType(typeof(OrderMessage)) // Message type to be consumed
.WithEmbeddedRetryCluster( // Retry consumer config
cluster,
config => config
.WithRetryTopicName("order-topic-retry")
.WithRetryConsumerBufferSize(4)
.WithRetryConsumerWorkersCount(2)
.WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
.AddHandler<Handler>()
).Enabled(true)
)
.WithQueuePollingJobConfiguration( // Polling configuration
config => config
.WithId("custom_search_key")
.WithCronExpression("0 0/1 * 1/1 * ? *")
.WithExpirationIntervalFactor(1)
.WithFetchSize(10)
.Enabled(true)
)
.WithMongoDbDataProvider(...)
// OR
.WithPostgresDataProvider(...)
// OR
.WithSqlServerDataProvider(...)
.WithRetryPlanBeforeRetryDurable( // Chained simple retry before triggering durable
config => config
.TryTimes(3)
.WithTimeBetweenTriesPlan(
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(1000))
.ShouldPauseConsumer(false)
)
)
...
)
As you can see above, there's a retry plan configured (WithRetryPlanBeforeRetryDurable
) to execute before the Durable Plan.
It's a Simple Retry that can perform a given number of attempts before delegating to the Durable policy.
You can find other samples here.
How to configure Message Type and Serialization
Durable Retries require the definition of a Message Type. It relies on KafkaFlow Serializer Middleware to perform the serialization.
You can find here an example using Avro.
.AddMiddlewares(
middlewares => middlewares
.AddSchemaRegistryAvroSerializer()
.RetryDurable(
(config) => config
.Handle<RetryDurableTestException>()
.WithMessageType(typeof(AvroLogMessage))
.WithMessageSerializeSettings(new JsonSerializerSettings
{
ContractResolver = new WritablePropertiesOnlyResolver()
})
...
)
...
)
How to use MongoDb as a Provider
Install the KafkaFlow.Retry.MongoDb package.
dotnet add package KafkaFlow.Retry.MongoDb
On the configuration, define the access configuration to the MongoDb instance.
.AddMiddlewares(
middlewares => middlewares
.RetryDurable(
(config) => config
...
.WithMongoDbDataProvider(
connectionString,
database,
retryQueueCollectionName,
retryQueueItemCollectionName)
...
)
...
)
How to use SQL Server as a Provider
Install the KafkaFlow.Retry.SqlServer package.
dotnet add package KafkaFlow.Retry.SqlServer
On the configuration, define the access configuration to the SqlServer instance.
.AddMiddlewares(
middlewares => middlewares
.RetryDurable(
(config) => config
...
.WithSqlServerDataProvider(
connectionString,
databaseName)
...
)
...
)
How to use Postgres as a Provider
Install the KafkaFlow.Retry.Postgres package.
dotnet add package KafkaFlow.Retry.Postgres
On the configuration, define the access configuration to the Postgres instance.
.AddMiddlewares(
middlewares => middlewares
.RetryDurable(
(config) => config
...
.WithPostgresDataProvider(
connectionString,
databaseName)
...
)
...
)
How to configure an HTTP API to manage the Data Provider
Install the KafkaFlow.Retry.API package.
dotnet add package KafkaFlow.Retry.API
Once you install the Package, install also the package for the desired storage:
- SqlServer: KafkaFlow.Retry.SqlServer
- Postgres: KafkaFlow.Retry.Postgres
- MongoDb: KafkaFlow.Retry.MongoDb
You can find a sample here.