Another note on pipelines - krickert/search-api GitHub Wiki
Got it! I'll design a comprehensive Micronaut-based data pipeline that integrates Kafka, gRPC, and Google Protocol Buffers while ensuring flexibility for future extensions. The architecture will follow Micronaut's default configurations for load balancing and service discovery while supporting hot-reloading for configuration management.
The pipeline will:
- Use Kafka and gRPC for step-to-step communication, ensuring modularity and pluggability.
- Maintain two Kafka topics per step (main and dead letter queue) with support for reprocessing failed messages.
- Implement a generic forwarder to handle message routing between pipeline steps.
- Support recursion by allowing entire pipelines to be treated as pipeline steps.
- Provide a stream-based request-response logging mechanism for visibility without disrupting the processing flow.
I'll detail the implementation, including:
- Protobuf schema definitions
- Micronaut service implementations for gRPC and Kafka
- Forwarding logic
- Dead letter queue handling
- Pipeline configuration structure
I'll get back to you with a full design plan soon!
This design outlines a modular, extensible data pipeline where each processing step is a Micronaut-based microservice communicating via Kafka or gRPC. It uses Google Protocol Buffers for all data serialization to ensure a compact, strongly-typed contract between services. All pipeline stages implement a common gRPC service interface, enabling uniform invocation and even recursive pipeline-in-pipeline calls. The system is configuration-driven (YAML/properties), supports hot-reloading of configs, and includes robust failure handling with Kafka dead-letter topics for reprocessing. Logging and monitoring are built in via stream-based request/response logging and Micronaut’s tracing support. Below, we detail each aspect of the design:
All services share a common Protobuf schema that defines the data objects and the gRPC service. This ensures platform-neutral, backward-compatible message formats and a single interface contract for all steps ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=Protocol%20Buffers%2C%20or%20protobufs%2C%20are,efficient%20binary%20format%2C%20also%20developed)) ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=backwards%20compatible%2C%20and%20protobufs%20are,to%20send%20over%20the%20wire)). The core messages capture the document content, pipeline context, request, and response, along with routing metadata:
syntax = "proto3";
package pipeline;
// Document or data unit flowing through the pipeline
message PipeDoc {
string id = 1; // Unique document ID or key
bytes content = 2; // Raw content (e.g., text or binary) for processing
repeated PipeDoc chunks = 3;// (Optional) Sub-documents if this doc is chunked
// ... Additional fields (metadata, embeddings, etc.) can be added
}
// Context for a pipeline execution (one "run" of the pipeline)
message SemanticRun {
string run_id = 1; // Correlation ID for tracking the pipeline run
string pipeline_name = 2; // Name/ID of the pipeline configuration used
uint32 step_index = 3; // Index or sequence of current step in the pipeline
uint32 recursion_depth = 4;// Counter for recursion (if pipeline calls itself)
repeated ErrorDetail errors = 5; // Accumulated errors (if any) during the run
}
// Detailed error info for troubleshooting
message ErrorDetail {
string step_name = 1; // The pipeline step where the error occurred
string error_message = 2; // Human-readable error description
int32 error_code = 3; // Application-specific error code (optional)
}
// The request message that each pipeline step receives
message PipeRequest {
PipeDoc doc = 1; // The document or data payload to process
SemanticRun run = 2; // Pipeline context (tracking IDs, errors, etc.)
}
// The response message each step returns/forwards
message PipeResponse {
PipeDoc doc = 1; // The (possibly enriched/modified) document after processing
SemanticRun run = 2; // Updated pipeline context (e.g., step_index incremented)
bool success = 3; // Whether this step succeeded
ErrorDetail error = 4; // Error info if `success` is false (optional)
}
// gRPC service definition – same for all pipeline services
service PipelineService {
rpc Process(PipeRequest) returns (PipeResponse);
}
Key points of the data model:
-
PipeDoc
holds the content and any sub-units. For example, a “chunking” service can split a document into chunks by populating thechunks
field with smallerPipeDoc
entries. Downstream steps can decide to process the main content or the chunks array. -
SemanticRun
carries context through the pipeline. It includes arun_id
to correlate logs/traces across services, the pipeline name (for pipelines-as-a-service scenarios), and astep_index
that increments at each step. Arecursion_depth
is used to handle recursive pipeline calls (discussed later). It also contains anerrors
list: rather than failing silently, each service can append anErrorDetail
describing any issue while still passing the message along. This provides a built-in audit trail for debugging. -
ErrorDetail
captures the step name and error message (e.g., exception message or validation failure). These can travel with the payload. Notably, by including errors in the message (or in headers), we preserve the original data while adding failure context ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=The%20failure%20cause%20should%20be,of%20historical%20events%20are%20straightforward)). This means even if a step fails, the message (with error info attached) can be routed to error handling without losing the content, facilitating reprocessing. -
PipeRequest
andPipeResponse
wrap thePipeDoc
andSemanticRun
. Every pipeline invocation uses these, ensuring a uniform method signature for all services. Thesuccess
flag inPipeResponse
indicates if the step’s processing was successful. Iffalse
, theerror
field should contain details. Even on failure, aPipeResponse
is produced and forwarded (usually into a dead-letter topic) rather than crashing the service, so that errors are handled gracefully.
Using Protocol Buffers for all messages guarantees that all services speak the same language. Protobuf’s efficient binary encoding keeps messages small and fast to transmit ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=Protocol%20Buffers%2C%20or%20protobufs%2C%20are,efficient%20binary%20format%2C%20also%20developed)) ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=backwards%20compatible%2C%20and%20protobufs%20are,to%20send%20over%20the%20wire)). The .proto definitions serve as the single source of truth for our data schema and RPC interface, enabling code generation for clients/servers in any language. We define the gRPC PipelineService
in the same file as the messages – Protobuf allows bundling service RPC definitions with message types ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=The%20protobuf%20format%20also%20allows,who%20can%20understand%20how%20the)), which makes it easy for developers to see in one place what each service does and what data it expects/produces.
Routing Logic in Protobuf: Rather than hard-coding routing in the messages, we use the pipeline config (external YAML) to determine how to route messages. However, the SemanticRun
does carry minimal routing context (step_index
and pipeline_name
) so that each step knows where it is in the sequence. We avoid embedding explicit next-step addresses in the message itself to keep the protobuf schema general, but we could include a hint if needed (for example, a string field for next_step
in SemanticRun
that the forwarder could use). In this design, routing is mostly handled by the forwarder logic and config (see Section 3), not by the message content. This keeps the protobuf definitions business-focused (document + context + error), and routing remains a deploy-time concern.
Each pipeline stage is implemented as a Micronaut service that can consume messages from Kafka and/or handle gRPC calls. We leverage Micronaut’s support for Kafka and gRPC in a idiomatic way, avoiding vendor-specific code so the services remain portable across environments (AWS MSK, Confluent, etc.). All services implement the same gRPC contract (PipelineService.Process
) defined above.
gRPC Endpoint Implementation: In Micronaut, gRPC services are typically implemented by extending the generated base class (from the proto) and registering as a bean. For example, when we compile the proto, we get a Java class PipelineServiceGrpc.PipelineServiceImplBase
. Our service (say, ChunkService
) will extend this and override process(...)
. We annotate it with @Singleton
(or @Prototype
) so Micronaut detects it. Micronaut’s gRPC server will automatically bind any bean that is a BindableService
(i.e. a gRPC service implementation) at runtime ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=By%20default%2C%20the%20server%20will,beans%20of%20the%20following%20types)) – no manual server setup needed. For instance:
import jakarta.inject.Singleton;
import io.grpc.stub.StreamObserver;
@Singleton // Registers this service as a gRPC server bean
public class ChunkService extends PipelineServiceGrpc.PipelineServiceImplBase {
@Override
public void process(PipeRequest request, StreamObserver<PipeResponse> responseObserver) {
// Business logic: e.g., chunk the input text into smaller parts
PipeDoc inputDoc = request.getDoc();
PipeDoc chunkedDoc = performChunking(inputDoc); // your chunking logic
// Prepare response
PipeResponse.Builder respBuilder = PipeResponse.newBuilder()
.setDoc(chunkedDoc)
.setRun(request.getRun())
.setSuccess(true);
// (If an error occurred during chunking, set success=false and respBuilder.setError(...))
PipeResponse response = respBuilder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
// Forward to next step (if using gRPC chaining, or this can be omitted for Kafka – see Forwarder)
forwarder.forward(response, "chunk");
}
private PipeDoc performChunking(PipeDoc doc) {
// ... implement chunk splitting, populate PipeDoc.chunks ...
return doc.toBuilder() // for simplicity, return the same doc or a modified one
.addAllChunks(generateChunksFrom(doc.getContent()))
.build();
}
}
In this snippet, forwarder.forward(response, "chunk")
indicates handing off the result to the next step. The forwarder (explained in Section 3) knows what “next step” corresponds to the current step (“chunk”) and will route the message accordingly (either via Kafka or gRPC). Notably, if the pipeline is configured such that the next hop is via Kafka, the gRPC service might simply log or store the result and let a Kafka producer (perhaps triggered elsewhere) send it on. In a pure gRPC chain, the service could directly call the next service’s gRPC stub.
Kafka Consumer Implementation: Each service also can listen on a Kafka topic for its input. We use Micronaut’s @KafkaListener
and @Topic
annotations to define consumers. For example, if the chunking service is fed by a Kafka topic "docs.to.chunk"
, we can have:
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@KafkaListener(groupId = "pipeline-chunk-listeners", offsetReset = OffsetReset.EARLIEST)
public class ChunkKafkaListener {
private static final Logger LOG = LoggerFactory.getLogger(ChunkKafkaListener.class);
@Topic("docs.to.chunk")
public void receive(PipeRequest request) {
LOG.debug("Received document {} for chunking via Kafka", request.getDoc().getId());
// Process the message just like the gRPC handler would
PipeResponse response = chunkService.process(request);
// Forward result to next step (via Forwarder)
forwarder.forward(response, "chunk");
}
}
Micronaut will automatically deserialize the incoming Kafka message to our PipeRequest
type, provided we’ve configured the Kafka binder to use Protobuf. (In practice, we might need to specify a custom serde: e.g., using Kafka’s ByteArray deserializer and then parsing into PipeRequest
. We can integrate Confluent Schema Registry’s Protobuf serde or manually handle serialization by sending PipeRequest.toByteArray()
as the Kafka payload. The key point is that the Kafka message contains the Protobuf binary for PipeRequest
, so no JSON or other translation is needed.)
The example above uses Micronaut’s declarative consumer API: @KafkaListener
on the class to enable Kafka consumption and @Topic("...")
on the method to subscribe to a specific topic ([Micronaut Kafka](https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#:~:text=1%20The%20%40KafkaListener%20%20is,that%20will%20receive%20the%20value)). The OffsetReset.EARLIEST
ensures we start from beginning for new consumers (useful in dev/test). The method signature directly uses PipeRequest
as the message type; Micronaut will use the configured deserializer to convert the Kafka record’s value to PipeRequest
. This keeps our code clean – no manual Kafka poll loop needed. Similarly, for producing messages, Micronaut allows creating a @KafkaClient
bean.
Kafka Producer Implementation: To forward messages to the next Kafka topic, we use Micronaut’s @KafkaClient
. For example:
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
@KafkaClient
public interface PipelineProducer {
@Topic("<<DYNAMIC>>")
void sendToTopic(String topicName, PipeResponse message);
}
Micronaut will generate an implementation of this interface at compile time ([Micronaut Kafka](https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#:~:text=At%20compile%20time%20Micronaut%20will,Inject)). We can inject PipelineProducer
into our forwarder or services. The method uses a dynamic topic: by not hardcoding the topic in the annotation and instead accepting it as a parameter (@Topic("<<DYNAMIC>>")
is pseudocode here — in Micronaut, you can also use @Topic
on the parameter itself or overload methods ([Micronaut Kafka](https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#:~:text=1%20The%20%40KafkaClient%20%20annotation,making%20it%20a%20method%20argument))). This allows the forwarder to call producer.sendToTopic(nextTopic, responseMessage)
based on config.
All Micronaut Kafka communication relies on the standard Apache Kafka client under the hood. This means connecting to Amazon MSK or Confluent Kafka is just a matter of configuration (bootstrap servers, security protocol, etc.), with no code changes or vendor-specific APIs. For instance, if using Confluent Cloud, we’d set SASL/SSL properties in application.yml
, but still use the same PipelineProducer
and @KafkaListener
in code. This design choice ensures portability – we avoid any proprietary frameworks, and use Micronaut’s abstractions which are thin wrappers over the Kafka Java client (making it easy to tune or extend).
Uniform gRPC Interface: Because every service implements PipelineService.Process
, one service can call another easily via gRPC. Micronaut can inject gRPC clients too using @GrpcChannel
and the generated stub. For example, if the “chunk” service needs to call the “embed” service via gRPC (instead of Kafka), it can use a stub generated from the same proto. This uniform interface means our pipeline steps are interchangeable – whether a step is invoked via Kafka message or direct gRPC call, the input/output schema is the same, and the behavior is consistent.
In summary, each pipeline microservice has:
- A gRPC server endpoint (
PipelineService.Process
) for synchronous or direct calls. - A Kafka listener on its intake topic for async message consumption.
- A Kafka producer (client) to emit results to the next step’s topic (if the next step uses Kafka).
- (Optionally) a gRPC client stub to call the next service directly if configured so.
All of these are configured using Micronaut’s native features (annotations and DI), keeping the implementation concise and idiomatic to Micronaut. This also means features like Micronaut’s service discovery, distributed tracing, and cloud config support automatically apply to our services ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=Micronaut%20adds%20the%20following%20features,to%20the%20gRPC%20experience)).
To decouple the pipeline configuration from code, we implement a generic Forwarder component that each service uses to route messages to the next step. The Forwarder reads a pipeline configuration (loaded from YAML/properties) that describes the sequence of steps and the communication method between them. This allows dynamic routing: the same code can decide at runtime whether to send the message to a Kafka topic or via gRPC, based on config for that pipeline step.
Pipeline Configuration Structure: We define in a config file (e.g. application.yml
or a dedicated pipeline.yml
) the ordered steps and their routing info. For example:
pipeline:
name: "semantic-index-pipeline"
steps:
- id: "chunk" # step name
next: "embed" # next step id
transport: "kafka" # use Kafka to forward
topic: "docs.to.chunk" # the topic this step listens on (for forwarder verification)
next_topic: "docs.to.embed" # the Kafka topic to send results to
- id: "embed"
next: "nlp"
transport: "grpc" # call next via gRPC
target: "nlp-service:50051" # address for gRPC (could also use service discovery name)
- id: "nlp"
next: "index"
transport: "kafka"
next_topic: "docs.to.index"
- id: "index"
transport: "none" # final step, stores in search index
indexName: "documents_idx"
(The above is a conceptual example; the exact YAML structure can vary. You might nest these under pipeline name, etc.)
This config tells us: Chunk -> Embed -> NLP -> Index. Chunk forwards via Kafka (to docs.to.embed
topic), Embed forwards via gRPC to nlp-service
, NLP forwards via Kafka to docs.to.index
, and Index is the terminal step (no forward, it writes to the search engine).
The Forwarder component uses such config to drive routing. It can be implemented as a singleton bean that each service injects, or even as an abstract base class that pipeline services extend. The forwarder exposes a method like forward(PipeResponse response, String currentStepId)
which looks up the currentStepId
in the config to find where to send the response next.
Forwarder Logic (Pseudo-code):
@Singleton
public class Forwarder {
@Inject PipelineProducer kafkaProducer; // Micronaut Kafka client for producing
@Inject @Client("nlp-service") PipelineServiceGrpc.PipelineServiceBlockingStub nlpClient;
// ... similarly, we might inject stubs for other services or use dynamic channel resolution
@Inject PipelineConfig pipelineConfig; // Bean holding the parsed YAML config
public void forward(PipeResponse response, String currentStep) {
PipelineStepConfig stepCfg = pipelineConfig.getStep(currentStep);
if (stepCfg == null || stepCfg.getTransport().equals("none")) {
// No forwarding (end of pipeline)
handleTerminalStep(response);
return;
}
String nextId = stepCfg.getNext();
PipelineStepConfig nextCfg = pipelineConfig.getStep(nextId);
if (nextCfg == null) {
// No next step defined (safety check)
return;
}
// Update the run context for next step
PipeResponse updatedResp = incrementStepIndex(response, nextId);
switch (nextCfg.getTransport()) {
case "kafka":
String topic = nextCfg.getTopicName(); // next step's intake topic
kafkaProducer.sendToTopic(topic, updatedResp);
break;
case "grpc":
// Determine which gRPC client to call based on nextId or target address
PipelineServiceGrpc.PipelineServiceBlockingStub stub = grpcClientFor(nextId);
PipeResponse nextResponse = stub.process(updatedResp.getDoc(), updatedResp.getRun());
// Optionally, handle the nextResponse synchronously or even do recursive forward if needed
break;
}
}
}
In this pseudo-code, PipelineConfig
is a custom bean bound to the YAML structure (could use Micronaut’s @EachProperty
or @ConfigurationProperties
). The forwarder obtains the config for the current step, finds the next step, and then uses the specified transport to forward. If the transport is Kafka, it uses the injected PipelineProducer
to send the message to the next step’s topic. If gRPC, it finds or creates a gRPC client stub for the next service. Micronaut makes it easy to inject gRPC clients via @Client("<serviceName>")
if using service discovery, or one can manually create channels. In the example, we injected nlpClient
stub for the NLP service (identified by "nlp-service"
). In a more generic solution, the forwarder might maintain a map of stubs keyed by step ID, or use Micronaut’s Service Discovery to resolve hostnames (Micronaut gRPC supports service discovery for clients (Micronaut gRPC)).
Reactive vs Blocking Forwarding: The design can support both. The example shows a blocking stub for gRPC and a fire-and-forget Kafka send. We could also use reactive streams (Micronaut supports Project Reactor or RxJava) where each PipeRequest
flows through a Flux. For instance, a Kafka listener could emit a PipeRequest
into a Flux stream; the forwarder could then call the next service’s gRPC asynchronously and map to the next PipeResponse
, etc., chaining steps in a non-blocking flow. The mention of onNext()
in the prompt suggests thinking in terms of a stream observer or reactive subscriber – which aligns with the idea of treating each pipeline message as a stream event that the forwarder consumes and pushes forward.
However, to keep things simpler and decoupled, our design treats Kafka as the buffering mechanism between steps. So we typically do one step at a time: consume message -> process -> produce to next. This naturally throttles and decouples each stage (each stage can scale independently, reading from its Kafka topic when ready). If a stage is configured to use gRPC for the next hop, that becomes a synchronous call – in those cases, we may deploy the two services (caller and callee) closely or ensure the gRPC call is fast to not block too long. Alternatively, the forwarder could use an async stub or return a future. Micronaut’s Kafka client can also return a Publisher
or CompletableFuture
instead of void for non-blocking sends ([Micronaut Kafka](https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#:~:text=Note%20that%20since%20the%20,blocking%20message%20delivery)).
Pipeline as a Service: It’s worth noting the pipeline as a whole can be treated as a gRPC service with the same interface. We could have a top-level Orchestrator service that exposes PipelineService.Process
and simply acts as an entry point: it receives a PipeRequest
and then internally uses the forwarder to dispatch to the first step (maybe via Kafka to decouple the caller). This orchestrator could wait for the final response or just acknowledge receipt. In many cases, we might not need a separate orchestrator service – the first step’s Kafka topic (or gRPC endpoint) can serve as the pipeline entry. But having a pipeline-level service is useful for recursion and external API integration (the user of the pipeline just calls this one service and the pipeline kicks off).
Thus, the forwarder plus config gives us modularity and flexibility: adding a new step or changing a communication pattern is as simple as editing the YAML config. We can even switch a step from local gRPC to remote Kafka by config, without code changes, enabling testing different performance trade-offs or deploying in different environments (e.g., use direct gRPC in a small deployment, but switch to Kafka topics when decoupling for scale).
The pipeline configuration (steps, topics, endpoints, etc.) is externalized in property/YAML files. Micronaut’s cloud-native configuration support ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=)) allows us to source this config from files, environment, or a config server (Consul, etc.). The goal is to hot-reload the pipeline definition without restarting services.
Configuration Structure: As shown, we likely have a section in application.yml
(or a separate config) that lists pipeline steps. We can bind this to a bean, for example:
@Singleton
@ConfigurationProperties("pipeline")
public class PipelineConfig {
String name;
List<PipelineStepConfig> steps;
// getters and setters
// We can add a Map<id, PipelineStepConfig> for quick lookup as well
}
public class PipelineStepConfig {
String id;
String next;
String transport; // "kafka", "grpc", or "none"
String topic; // for Kafka transport, the topic to listen on (current step's input)
String nextTopic; // for Kafka, the topic to send output to (if transport==kafka)
String target; // for gRPC, the target service address or identifier
// ... other fields like any specific configs for step (e.g., index name)
}
Micronaut will populate this bean at startup from the YAML. To support hot-reloading, we take advantage of Micronaut’s refresh scope. Micronaut allows emitting a RefreshEvent
to trigger reloading of configuration beans marked as refreshable. We can annotate PipelineConfig
with @io.micronaut.runtime.context.scope.Refreshable
to indicate it should be rebuilt on config changes. If using an external config source (like Consul or Spring Cloud Config), one can configure it to push changes or have the app poll for changes. On detecting a change (or via an admin trigger), a RefreshEvent
can be published in the application context, causing Micronaut to rebind the latest config values ([java - Micronaut @Refreshable not working as expected - Stack Overflow](https://stackoverflow.com/questions/65124444/micronaut-refreshable-not-working-as-expected#:~:text=%40Inject%20private%20ConfigBean%20config%3B)) ([java - Micronaut @Refreshable not working as expected - Stack Overflow](https://stackoverflow.com/questions/65124444/micronaut-refreshable-not-working-as-expected#:~:text=%40Produces%28MediaType.TEXT_PLAIN%29%20public%20String%20refresh%28%29%20,)).
In practice, we might implement a simple file-watching thread or use Micronaut’s Cloud Config which can watch for changes. For example, if using Consul KV store, updating the pipeline config there and hitting a refresh endpoint could update the bean without restart.
When PipelineConfig
is reloaded, the Forwarder (or whichever component holds the pipeline logic) should be made aware. If the forwarder looks up the config from the bean on each call, then it will naturally use the new values next time a message comes through. If it cached any routing decisions, we ensure it updates those caches.
Hot-reload Example: Suppose we add a new step between “embed” and “nlp” for a new enrichment. We update the YAML to insert that step. Once the config is updated and refresh event fired, the forwarder’s PipelineConfig
bean now has the new step in the list. The next message that comes through “embed” will see that embed
’s next
is now the new step (instead of nlp
). The embed service’s forwarder will then route to the new step (maybe by sending to its Kafka topic). If that new step service is already deployed and listening on the topic, it will start receiving data. In this way, we get dynamic reconfiguration without downtime.
Micronaut’s emphasis on cloud-native config ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=)) means we can also externalize environment-specific settings (like Kafka broker URLs, credentials, gRPC target addresses, etc.) in different config files or profiles, and even refresh them at runtime if needed. Our pipeline architecture benefits from this by enabling on-the-fly tuning and re-wiring.
To summarize, configuration management is handled via Micronaut’s configuration properties for defining the pipeline. We enable hot-reloading by using refreshable config beans and external config sources or manual triggers (no code redeploy required to change the pipeline flow). This makes the system highly adaptable: e.g., we can respond to new requirements by updating a config and all running pipeline instances will adjust, or even toggle a step off (by changing a transport to “none” or rerouting its input to skip it) in case of failures.
Reliability is critical in a data pipeline. Each pipeline step is equipped with a primary topic for normal processing and a Dead Letter Queue (DLQ) topic for failures. The DLQ is a Kafka topic where messages that could not be processed (due to errors/exceptions in the service) are sent for later analysis or reprocessing. This follows the Dead Letter Channel integration pattern ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=The%20Dead%20Letter%20Queue%20,messages%20that%20are%20not%20processed)), applied in Kafka as separate topics per step ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=Alternatives%20for%20a%20Dead%20Letter,Queue%20in%20Apache%20Kafka)).
Per-Step DLQ Topics: For a step with ID “embed”, for example, we have a topic like docs.to.embed
for normal input, and docs.to.embed.dlq
(or similar naming convention) as the DLQ. The Micronaut Kafka listener for the embed service listens on docs.to.embed
. If processing a message throws an exception or cannot produce a valid result, the service will catch that error. Instead of losing the message, it will publish the message (or a transformed error message) to docs.to.embed.dlq
. The PipeResponse
message sent to DLQ will have success=false
and the error
field populated with the exception info. We also include the original PipeDoc
(and any partial changes) in the DLQ message so that nothing is lost.
This approach ensures failed messages are not dropped but routed to a holding area. Operators or automated jobs can then inspect and replay those messages. For example, an ops team member might fix a data issue and re-publish the DLQ message back to the main topic to retry processing.
Implementing DLQ in Micronaut: We can integrate DLQ publishing into the forwarder or into an AOP advice around the processing logic. For instance, we could wrap the call to chunkService.process(request)
in a try-catch. If an exception occurs, we create a PipeResponse
with the error and call kafkaProducer.sendToTopic("docs.to.chunk.dlq", response)
. Alternatively, the service itself can catch exceptions and use a specific Kafka client for DLQ. For clarity, we might have another @KafkaClient
defined for DLQ topics, or reuse PipelineProducer
by just specifying the DLQ topic name.
The error detail in the message helps debugging. Additionally, as a best practice, we may include the failure cause in the Kafka message headers too ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=The%20failure%20cause%20should%20be,of%20historical%20events%20are%20straightforward)). For example, set a header error.class = "NullPointerException"
and error.msg = "... stacktrace ..."
. This way, consumers (or tooling like Kafka UI or connectors) can quickly see error metadata without parsing the protobuf. But the same info is also inside our Protobuf ErrorDetail
for completeness.
Reprocessing Strategy: Because the DLQ messages carry the original payload, we can re-ingest them when ready. This could be manual (someone moves messages from DLQ topic back to main topic) or automated via a retry service. An automated approach might be: a separate Micronaut service listens on each DLQ topic, and for each message, decides whether to retry immediately or later (perhaps implementing an exponential backoff or sending to a “retry later” topic). For simplicity, our design leaves reprocessing as an external concern – the important part is that DLQ messages are processable (the format is the same PipeResponse
with error info). The presence of error info does not prevent reprocessing; the consumer of DLQ could choose to clear the errors
list and send the PipeDoc
through again.
Another advantage of including error details is the pipeline can be made error-tolerant if desired. For example, we could configure a step to continue even if it had an error, simply forwarding the error along in the SemanticRun.errors
list and moving to next step. The next step might still try to do its work, or it might check and decide to skip because a prior error was fatal. This behavior can be configured per pipeline or per error type. In cases where a step’s failure means the whole pipeline should stop, the forwarder would instead route the message to DLQ of that step and not call downstream steps. In other cases, we might allow downstream steps to proceed (for instance, a non-critical enrichment fails, but we still want to index the original doc sans that enrichment – with an error note attached). The design supports both modes because each message carries its error history. We ensure final storage (search index) can capture those errors too if needed (e.g., index a flag “had_error=true” so that document can be flagged for review).
To summarize failure handling:
- Every Kafka consumer in the pipeline has a corresponding DLQ topic. A message that can’t be processed in the normal path is sent to DLQ rather than dropped.
- The original key and value are preserved as much as possible in the DLQ message, modifying only what’s needed to note the error ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=The%20failure%20cause%20should%20be,of%20historical%20events%20are%20straightforward)). This makes it straightforward to reprocess: the message looks just like a normal output (PipeResponse) with an error flag.
- Micronaut’s Kafka client and config make it easy to produce to DLQ topics. We might set these topics with longer retention (so we don’t lose the messages before we can fix them).
- Optionally, implement a retry mechanism reading from DLQs and pushing back to main topics when appropriate (with human oversight or automated rules).
This robust DLQ setup increases the pipeline’s resilience. Combined with monitoring on DLQ topic sizes and contents, we gain visibility into failure rates and can continuously improve the pipeline.
One powerful feature of this design is the ability to treat the entire pipeline as a step within itself or another pipeline, enabling recursive processing or multi-pass algorithms. Because every interaction is via the same interface (PipeRequest -> PipeResponse), a pipeline can call another pipeline just like it would call an atomic service.
Pipeline calling itself: Suppose we have a use case where after completing all steps, we realize the output could benefit from another round of processing (maybe the document was updated with new info that triggers another chunk->embed->NLP cycle). We can achieve this by configuring the last step’s “next” to point back to the first step (or to a specific step earlier in the pipeline). Essentially, the pipeline’s output is fed as input to itself. The Forwarder
would recognize that “next” is the pipeline’s own entry topic or service and would send the message accordingly.
For example, in our config if we set nlp.next = chunk
(instead of “index”), the pipeline would loop back: chunk -> embed -> nlp -> (back to chunk) ... etc. This of course can lead to an infinite loop if unchecked, so we utilize the recursion_depth
in SemanticRun
. Each time a pipeline step sees that the next is actually a “previous” step, we increment recursion_depth
. We can enforce a max depth (configured globally or per pipeline) – e.g., no message can loop more than 3 times. If the depth is exceeded, we break the loop, perhaps by sending to a final DLQ or by forcing the last iteration to skip the recursive call.
A safer approach to recursion is to treat the whole pipeline as a unit that can be invoked by another. For instance, we might have Pipeline A’s last step configured to call Pipeline B (via gRPC or Kafka). Pipeline B could even be the same as A (hence recursion). In implementation, this could mean Pipeline A’s forwarder produces to Pipeline B’s input topic (or calls its gRPC). Since Pipeline B is also a PipelineService
, this is seamless. We just need to ensure to carry over the context (run_id
could be the same or we generate a new one for B but link them via a parent ID).
Use cases for recursion:
- Iterative enrichment: Run the pipeline repeatedly until data is fully normalized or no new entities are found in text, etc.
- Batching and splitting: A pipeline might aggregate results and then you want to feed the aggregate back in as a new document for further processing.
- Pipeline composition: Treat a complex pipeline as a single step in a larger meta-pipeline. E.g., Pipeline X does heavy NLP, Pipeline Y does image processing; you can have a master pipeline that invokes X then Y then stores results. They all share the same message types, so plugging one pipeline into another is natural.
Implementation considerations: The recursion or self-call can be achieved simply by configuration (point next to the pipeline’s own entry). The forwarder doesn’t actually need special code for recursion versus any other next step – it will follow the config. The only addition is tracking recursion_depth
to avoid infinite loops. We can implement a check like: if nextId == "chunk"
(the first step) and that equals some known starting step, then increment depth. If depth > N, log a warning and stop forwarding further (maybe send to a “pipeline.dlq” topic for unresolvable loops).
Another strategy is to deploy the pipeline orchestrator as a gRPC service that internally calls the first step and waits for final output (synchronous pipeline execution). This orchestrator can call itself or another orchestrator recursively. However, that introduces tight coupling. The design we prefer is more event-driven recursion: the message just loops through Kafka topics if needed, which is naturally load-balanced and fault-tolerant (and doesn’t tie up threads waiting). This fits the overall async model.
In summary, by leveraging the consistent interface and message format, we enable recursion:
- We can configure a pipeline step to target the pipeline’s own input, effectively creating a loop.
- Use the
SemanticRun.pipeline_name
andrecursion_depth
to detect and control recursive invocations. - Ensure termination either by data conditions or a max depth to prevent endless cycling.
- This capability shows the flexibility of the design – the pipeline can not only have steps, but those steps themselves can be entire pipelines (composing multiple smaller pipelines into bigger ones).
Observability is built into the pipeline so we can trace each document’s journey and diagnose issues in real time. We implement stream-based logging of request and response events at each step, meaning every time a PipeRequest
is received or a PipeResponse
is sent, we log an entry (with the run_id, step, and any error info). This essentially creates an audit trail for the pipeline’s activity.
Logging Implementation: In Micronaut, we can use AOP interceptors or simply add logging in our Kafka listener and gRPC methods. For example, in the ChunkKafkaListener.receive()
method above, we log the received document ID. Similarly, after processing and before forwarding, we could log “Chunking completed for doc X, forwarding to Embed”. If using gRPC, we might implement a ServerInterceptor that intercepts the Process
calls and logs the request and response objects (possibly in a non-blocking way). Micronaut gRPC supports interceptors as beans (Micronaut gRPC), so we could write a LoggingInterceptor
that on each call logs the metadata.
We also propagate a correlation ID (run_id
) via the SemanticRun
. We can include this in log statements (Micronaut’s logging MDC could be used by putting run_id
into MDC context during that request’s processing). This allows grouping logs by pipeline execution when viewing in a log aggregator.
Stream-Based Logging: The wording suggests we treat the sequence of events for a single message as a stream and log each transformation. We could implement this by having the forwarder also act as a logger: every time forward()
is called, it logs “Step X produced output Y (success/fail) -> sending to Z”. Because the forwarder sees every transition, it’s a good central point to log the flow. Another approach is to emit these logs as events to a monitoring topic. For example, each service on processing a message could produce a small log event to a Kafka topic like pipeline.logs
with details (run_id, step, timestamp, outcome). This would be a non-intrusive way to collect processing events without writing to disk logs. A separate monitoring service or consumer can read pipeline.logs
and, for instance, feed a dashboard.
Micronaut also makes it straightforward to integrate distributed tracing (using Zipkin, Jaeger, etc.) ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=)). Since gRPC is used, we can propagate trace IDs over gRPC and Kafka (for Kafka, include trace info in headers). Micronaut’s OpenTracing or Micrometer integration can capture spans for each Kafka listener and each gRPC call. This means in a trace viewer, you would see a span for “chunk service process”, then a span for “embed service process”, etc., all under one trace ID (the run_id can be tied to the trace). This gives deep visibility without manual logging of each step’s latency.
Monitoring metrics: Each service can emit metrics (e.g., how many docs processed, how many errors). Micronaut has Micrometer support to easily count events. For instance, increment a counter for pipeline.chunk.requests
and pipeline.chunk.errors
. These metrics, along with Kafka consumer lag metrics (how many messages pending in each topic), help monitor throughput and backpressure in the pipeline.
Crucially, all logging/monitoring is done asynchronously and non-blocking relative to the main processing. Writing to log files or even Kafka logging topics should not slow down the pipeline significantly. We use either separate threads or let the logging be handled by Micronaut’s async logging capabilities. This satisfies “visibility into pipeline execution without interfering with primary processing.”
Example Logging Output: (for one document with run_id 123
):
RUN123 [ChunkService] Received doc 45 (size=10KB)
RUN123 [ChunkService] Produced 5 chunks, forwarding to EmbedService
RUN123 [EmbedService] Received doc 45 (5 chunks)
RUN123 [EmbedService] Generated embeddings (vector size=768) for doc 45, forwarding to NLPService
RUN123 [NLPService] Received doc 45, extracting entities
RUN123 [NLPService] ERROR at NLPService: NullPointerException ... (logged error details), forwarding to NLP DLQ
RUN123 [IndexService] Received doc 45 - SKIPPED indexing due to prior error
In the above imaginary log, we see each step’s events. The NLPService encountered an error, forwarded to DLQ. The IndexService in this scenario might either not receive the message at all (if NLP didn’t forward on success path), or if it did, it might check success
flag and skip. In any case, we have clear logs of each stage. These logs (or traces) make debugging easy: one can follow the run_id through all steps.
Additionally, the pipeline could provide an admin interface (HTTP or gRPC) to query the status of a run_id
– possibly by looking up logs or if we store the final PipeResponse in a temporary store. But that’s an extra; often, logs and external monitoring suffice.
This Micronaut-based pipeline architecture offers a scalable, flexible processing engine:
- Modularity: Each step is isolated, communicating via well-defined messages over Kafka/gRPC. This makes it easy to add, remove, or reorder steps via configuration, without changing code in other steps.
- Unified Interface: A single gRPC service definition for all steps means that whether a step is performing text chunking, embedding generation, or NLP enrichment, it looks the same from the outside. This contract-first design ensures all teams (and languages) adhere to the same data structures ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=Protocol%20Buffers%2C%20or%20protobufs%2C%20are,efficient%20binary%20format%2C%20also%20developed)) ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=backwards%20compatible%2C%20and%20protobufs%20are,to%20send%20over%20the%20wire)).
- Asynchronous Pipeline with Synchronous Options: Kafka decouples the steps (providing buffering and scalability), while gRPC allows efficient direct calls when needed. The ability to choose per hop allows optimizing performance versus reliability trade-offs. And since both gRPC and Kafka use Protobuf binary payloads, our data model is consistent across both mediums (no translation needed).
-
Robust Error Handling: Dead letter queues for each stage improve reliability. The inclusion of error info in responses means errors are first-class citizens in the system, not hidden. Operators can inspect exactly where and why a failure happened by looking at the
PipeResponse
in DLQ, and then replay it after fixes. This design aligns with best practices for streaming error handling in Kafka (using DLQ topics) ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=Alternatives%20for%20a%20Dead%20Letter,Queue%20in%20Apache%20Kafka)). - Configuration-Driven: The use of external YAML/properties for pipeline definition and Micronaut’s refresh capabilities ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=)) means the pipeline can evolve at runtime. It avoids hardcoding topic names or endpoints in code, reducing the chance of mistakes and easing deployments to different environments (e.g., dev vs prod might use different Kafka clusters or enable/disable certain steps).
- Recursion and Composition: The pipeline can call itself or other pipelines, enabling complex workflows and reusability. This is possible without special-case code simply because all pipelines and steps speak the same protocoled language.
- Micronaut-Native Implementation: We stick to Micronaut’s conventions and features: annotation-based Kafka listeners ([Micronaut Kafka](https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#:~:text=1%20The%20%40KafkaListener%20%20is,that%20will%20receive%20the%20value)) and clients, gRPC services as beans ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=By%20default%2C%20the%20server%20will,beans%20of%20the%20following%20types)), cloud config, etc. No vendor lock-in or proprietary libraries are used – for example, if in the future we switch to a different messaging system (say RabbitMQ), we could adapt the forwarder but keep the rest of the pipeline the same. Micronaut’s abstraction over Kafka and its support for gRPC make it an excellent fit for this polyglot communication scenario.
By leveraging these design elements, the resulting pipeline is scalable (horizontally, by adding more instances per service or more Kafka partitions), resilient (with DLQ and retry), and easy to observe. Each piece can be developed and deployed independently, and the overall flow can be managed by ops through config changes and monitoring, rather than code deployments. This meets the requirements and provides a solid foundation for future extensions (like adding new kinds of processing steps, integrating a schema registry for Protobuf compatibility checks, or even swapping out the final storage target).
References:
- Ben Ibinson, “Building scalable microservices with gRPC”, Bugsnag Blog – on choosing gRPC+Protobuf for a polyglot microservice pipeline ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=While%20work%20on%20the%20Releases,js)) ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=Protocol%20Buffers%2C%20or%20protobufs%2C%20are,efficient%20binary%20format%2C%20also%20developed)). This highlights Protobuf’s efficiency and compatibility benefits and how service definitions and messages are in one proto file ([gRPC Microservices: Building Scalable Microservices | Insight Hub Blog](https://www.bugsnag.com/blog/grpc-and-microservices-architecture/#:~:text=The%20protobuf%20format%20also%20allows,who%20can%20understand%20how%20the)).
- Micronaut Documentation – Micronaut Kafka and Micronaut gRPC:
- Using
@KafkaClient
and@KafkaListener
for declarative Kafka producers/consumers ([Micronaut Kafka](https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#:~:text=1%20The%20%40KafkaClient%20%20annotation,making%20it%20a%20method%20argument)) (Micronaut Kafka). -
Micronaut gRPC server auto-registers any
BindableService
bean, simplifying service setup (Micronaut gRPC). - Micronaut features like Distributed Tracing and Cloud Native config facilitate monitoring and config changes ([Micronaut gRPC](https://micronaut-projects.github.io/micronaut-grpc/4.8.0/guide/#:~:text=Micronaut%20adds%20the%20following%20features,to%20the%20gRPC%20experience)).
- Using
- Kai Waehner, “Error Handling via Dead Letter Queue in Apache Kafka” – explains the DLQ pattern in Kafka (using separate topics for failed messages) ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=Alternatives%20for%20a%20Dead%20Letter,Queue%20in%20Apache%20Kafka)) and recommends including failure cause with the message for analysis ([Error Handling via Dead Letter Queue in Apache Kafka - Kai Waehner](https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/#:~:text=The%20failure%20cause%20should%20be,of%20historical%20events%20are%20straightforward)), practices we adopted in our design.