Notes on setting up AWS Glue with MSK on Micronaut - krickert/search-api GitHub Wiki

Two ways property vs yaml

property files will not screw up the camel case that AWS code doesn't like in a yaml file. That will allow for the full config that way.

# Enable Kafka
kafka.enabled=true

# Kafka Bootstrap and Security
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=AWS_MSK_IAM
kafka.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

# Schema Registry Configuration
kafka.schema.registry.url=${SCHEMA_REGISTRY_ENDPOINT}
kafka.schema.registry.aws.region=${AWS_REGION}
kafka.schema.registry.aws.credentials.access-key-id=${AWS_ACCESS_KEY_ID}
kafka.schema.registry.aws.credentials.secret-key=${AWS_SECRET_ACCESS_KEY}

# Producer Configuration
kafka.producer.properties.key.serializer=io.micronaut.serde.kafka.UUIDSerializer
kafka.producer.properties.value.serializer=com.amazonaws.services.schemaregistry.serializers.protobuf.GlueSchemaRegistryProtobufSerializer
kafka.producer.properties.schema.registry.url=${SCHEMA_REGISTRY_ENDPOINT}
kafka.producer.properties.aws.glue.schema.registry.region=${AWS_REGION}
kafka.producer.properties.aws.glue.schema.registry.dataFormat=PROTOBUF
kafka.producer.properties.com.amazonaws.services.schemaregistry.metadata.version=*
kafka.producer.properties.com.amazonaws.services.schemaregistry.name=${SCHEMA_NAME}

# Consumer Configuration
kafka.consumer.properties.key.deserializer=io.micronaut.serde.kafka.UUIDDeserializer
kafka.consumer.properties.value.deserializer=com.amazonaws.services.schemaregistry.serializers.protobuf.GlueSchemaRegistryProtobufDeserializer
kafka.consumer.properties.schema.registry.url=${SCHEMA_REGISTRY_ENDPOINT}
kafka.consumer.properties.aws.glue.schema.registry.region=${AWS_REGION}
kafka.consumer.properties.aws.glue.schema.registry.dataFormat=PROTOBUF

This should work

Here's how to integrate this with Micronaut's configuration system while maintaining testability:

1. Create Configuration Classes

@ConfigurationProperties("aws.glue")
public class GlueConfiguration {
    private String region;
    private String endpoint;
    private String registryName;
    private String schemaName;
    private String dataFormat = "PROTOBUF";
    private boolean schemaAutoRegistration = true;
    private String protobufMessageType = "POJO";
    
    // Getters and setters
}

@ConfigurationProperties("aws.credentials")
public class AwsCredentialsConfiguration {
    private String accessKeyId;
    private String secretKey;
    private String sessionToken;
    
    // Getters and setters
}

2. Create Serializer/Deserializer Factory

@Factory
public class GlueSerDeFactory {

    @Singleton
    public GlueSchemaRegistryKafkaSerializer glueSerializer(
            GlueConfiguration glueConfig,
            AwsCredentialsConfiguration awsCreds) {
        
        return new GlueSchemaRegistryKafkaSerializer(
            () -> AwsSessionCredentials.create(
                awsCreds.getAccessKeyId(),
                awsCreds.getSecretKey(),
                awsCreds.getSessionToken()
            ),
            null,
            createConfigMap(glueConfig)
        );
    }

    @Singleton
    public GlueSchemaRegistryKafkaDeserializer glueDeserializer(
            GlueConfiguration glueConfig,
            AwsCredentialsConfiguration awsCreds) {
            
        return new GlueSchemaRegistryKafkaDeserializer(
            () -> AwsSessionCredentials.create(
                awsCreds.getAccessKeyId(),
                awsCreds.getSecretKey(),
                awsCreds.getSessionToken()
            ),
            createConfigMap(glueConfig)
        );
    }

    private Map<String, Object> createConfigMap(GlueConfiguration config) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AWSSchemaRegistryConstants.AWS_REGION, config.getRegion());
        configs.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, config.getEndpoint());
        configs.put(AWSSchemaRegistryConstants.REGISTRY_NAME, config.getRegistryName());
        configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, config.getSchemaName());
        configs.put(AWSSchemaRegistryConstants.DATA_FORMAT, config.getDataFormat());
        configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, 
                   String.valueOf(config.isSchemaAutoRegistration()));
        configs.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, 
                   config.getProtobufMessageType());
        return configs;
    }
}

3. Configure application.yml

aws:
  glue:
    region: us-west-2
    endpoint: http://localhost:5000
    registry-name: dev-registry
    schema-name: indexer-document-unprocessed
  credentials:
    access-key-id: test
    secret-key: test
    session-token: test-session

4. Create Kafka Producer/Consumer

@KafkaClient(
    id = "glue-protobuf-producer",
    properties = {
        @Property(name = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                 value = "com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer")
    }
)
public interface GlueProtobufProducer {
    @Topic("test-topic")
    void send(@KafkaKey String key, IndexDocument value);
}

@KafkaListener(
    groupId = "test-group",
    properties = {
        @Property(name = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 value = "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer")
    }
)
public class GlueProtobufConsumer {
    
    @Topic("test-topic")
    public void receive(IndexDocument document) {
        // Handle document
    }
}

5. Test Configuration

@MicronautTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class GlueSchemaRegistryTest {

    @Inject
    GlueProtobufProducer producer;

    @Inject
    GlueClient glueClient;

    static GenericContainer<?> moto = new GenericContainer<>("motoserver/moto:latest")
        .withExposedPorts(5000)
        .withEnv("MOTO_SERVICE", "glue");

    @BeforeAll
    static void setup(ApplicationContext context) {
        moto.start();
        
        TestPropertyProvider propertyProvider = TestPropertyProvider.of(
            "aws.glue.endpoint" -> "http://localhost:" + moto.getMappedPort(5000),
            "aws.credentials.access-key-id" -> "test",
            "aws.credentials.secret-key" -> "test",
            "aws.credentials.session-token" -> "test-session"
        );
        
        context.registerSingleton(TestPropertyProvider.class, propertyProvider);
        
        // Create registry
        GlueClient testClient = GlueClient.builder()
            .endpointOverride(URI.create("http://localhost:" + moto.getMappedPort(5000)))
            .credentialsProvider(() -> AwsSessionCredentials.create("test", "test", "test-session"))
            .region(Region.US_WEST_2)
            .build();
            
        testClient.createRegistry(CreateRegistryRequest.builder()
            .registryName("dev-registry")
            .build());
    }

    @Test
    void testRoundTrip() {
        IndexDocument doc = buildTestDocument();
        producer.send("key", doc);
        
        // Verify consumption
        // (You'd need test consumer logic here)
    }
}

Key Advantages:

  1. Proper Configuration - All settings managed via Micronaut config
  2. Testable - Easy to override endpoints/credentials for testing
  3. DI Integration - Serializer/deserializer instances managed by Micronaut
  4. No Hacks - Uses official Micronaut and AWS SDK patterns

This approach maintains the clarity of your standalone test while integrating properly with Micronaut's configuration system. The factories ensure proper dependency injection, and the test configuration allows clean container management.