Skip to main content

Durable Retries

In this section, we will learn how to use Durable Retries.

Durable Retries are useful when beyond a certain amount of retries and waiting, you want to keep processing next-in-line messages but you can't lose the current offset message. As persistence databases, MongoDb or SqlServer is available. And you can manage in-retry messages through HTTP API.

The configuration can be done during KafkaFlow Configuration process by registering a Middleware.

How to use it

Install the KafkaFlow.Retry package.

dotnet add package KafkaFlow.Retry

Install the package for the desired storage:

On the configuration, add the RetryDurable middleware extension method to your consumer middlewares to use it.

The RetryDurable receives an Action as an argument to configure the Retry policy.

.AddMiddlewares(
middlewares => middlewares // KafkaFlow middlewares
.RetryDurable(
(config) => config
.Handle<NonBlockingException>()
.WithMessageType(typeof(OrderMessage)) // Message type to be consumed
.WithEmbeddedRetryCluster( // Retry consumer config
cluster,
config => config
.WithRetryTopicName("order-topic-retry")
.WithRetryConsumerBufferSize(4)
.WithRetryConsumerWorkersCount(2)
.WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
.WithRetryTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Transient)
.AddHandler<Handler>()
).Enabled(true)
)
.WithQueuePollingJobConfiguration( // Polling configuration
config => config
.WithId("custom_search_key")
.WithCronExpression("0 0/1 * 1/1 * ? *")
.WithExpirationIntervalFactor(1)
.WithFetchSize(10)
.Enabled(true)
)

.WithMongoDbDataProvider(...)
// OR
.WithPostgresDataProvider(...)
// OR
.WithSqlServerDataProvider(...)

.WithRetryPlanBeforeRetryDurable( // Chained simple retry before triggering durable
config => config
.TryTimes(3)
.WithTimeBetweenTriesPlan(
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500),
TimeSpan.FromMilliseconds(1000))
.ShouldPauseConsumer(false)
)
)
...
)
info

As you can see above, there's a retry plan configured (WithRetryPlanBeforeRetryDurable) to execute before the Durable Plan. It's a Simple Retry that can perform a given number of attempts before delegating to the Durable policy.

note

You can find other samples here.

How to configure Message Type and Serialization

Durable Retries require the definition of a Message Type. It relies on KafkaFlow Serializer Middleware to perform the serialization.

You can find here an example using Avro.

.AddMiddlewares(
middlewares => middlewares
.AddSchemaRegistryAvroSerializer()
.RetryDurable(
(config) => config
.Handle<RetryDurableTestException>()
.WithMessageType(typeof(AvroLogMessage))
.WithMessageSerializeSettings(new JsonSerializerSettings
{
ContractResolver = new WritablePropertiesOnlyResolver()
})
...
)
...
)

How to use MongoDb as a Provider

Install the KafkaFlow.Retry.MongoDb package.

dotnet add package KafkaFlow.Retry.MongoDb

On the configuration, define the access configuration to the MongoDb instance.

.AddMiddlewares(
middlewares => middlewares
.RetryDurable(
(config) => config
...
.WithMongoDbDataProvider(
connectionString,
database,
retryQueueCollectionName,
retryQueueItemCollectionName)
...
)
...
)

How to use SQL Server as a Provider

Install the KafkaFlow.Retry.SqlServer package.

dotnet add package KafkaFlow.Retry.SqlServer

On the configuration, define the access configuration to the SqlServer instance.

.AddMiddlewares(
middlewares => middlewares
.RetryDurable(
(config) => config
...
.WithSqlServerDataProvider(
connectionString,
databaseName)
...
)
...
)

How to use Postgres as a Provider

Install the KafkaFlow.Retry.Postgres package.

dotnet add package KafkaFlow.Retry.Postgres

On the configuration, define the access configuration to the Postgres instance.

.AddMiddlewares(
middlewares => middlewares
.RetryDurable(
(config) => config
...
.WithPostgresDataProvider(
connectionString,
databaseName)
...
)
...
)

How to configure an HTTP API to manage the Data Provider

Install the KafkaFlow.Retry.API package.

dotnet add package KafkaFlow.Retry.API

Once you install the Package, install also the package for the desired storage:

note

You can find a sample here.