Serializer Middleware - Farfetch/kafkaflow-retry-extensions GitHub Wiki
Kafka.Flow Commonly Used
Avro Sample
internal static class KafkaClusterConfigurationBuilder
{
internal static IClusterConfigurationBuilder SetupRetryDurableMongoAvroDb(
this IClusterConfigurationBuilder cluster,
string mongoDbConnectionString,
string mongoDbDatabaseName,
string mongoDbRetryQueueCollectionName,
string mongoDbRetryQueueItemCollectionName)
{
cluster.AddProducer(
"kafka-flow-retry-durable-mongodb-avro-producer",
producer => producer
.DefaultTopic("sample-kafka-flow-retry-durable-mongodb-avro-topic")
.WithCompression(Confluent.Kafka.CompressionType.Gzip)
.AddMiddlewares(
middlewares => middlewares
.AddSchemaRegistryAvroSerializer(
new AvroSerializerConfig
{
SubjectNameStrategy = SubjectNameStrategy.TopicRecord
})
)
.WithAcks(Acks.All)
)
.AddConsumer(
consumer => consumer
.Topic("sample-kafka-flow-retry-durable-mongodb-avro-topic")
.WithGroupId("sample-consumer-kafka-flow-retry-durable-mongodb-avro")
.WithName("kafka-flow-retry-durable-mongodb-avro-consumer")
.WithBufferSize(10)
.WithWorkersCount(20)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(
middlewares => middlewares
.AddSchemaRegistryAvroSerializer()
.RetryDurable(
configure => configure
.Handle<RetryDurableTestException>()
.WithMessageType(typeof(AvroLogMessage))
.WithMessageSerializeSettings(new JsonSerializerSettings
{
ContractResolver = new WritablePropertiesOnlyResolver()
})
...
);
Contract Resolver
The DefaultContractResolver is the default resolver used by the serializer. It provides many avenues of extensibility in the form of virtual methods that can be overridden.
internal class WritablePropertiesOnlyResolver : DefaultContractResolver
{
protected override IList<JsonProperty> CreateProperties(Type type, MemberSerialization memberSerialization)
{
IList<JsonProperty> props = base.CreateProperties(type, memberSerialization);
return props.Where(p => p.Writable).ToList();
}
}