Notes on a pipeline service - krickert/search-api GitHub Wiki
#Sun Mar 09 13:27:54 EDT 2025
micronaut.application.name=core
micronaut.router.static-resources.swagger-ui.mapping=/swagger-ui/**
micronaut.router.static-resources.swagger-ui.paths=classpath\:META-INF/swagger/views/swagger-ui
micronaut.router.static-resources.swagger.mapping=/swagger/**
micronaut.router.static-resources.swagger.paths=classpath\:META-INF/swagger
micronaut.server.port=8081
# ==============================
# TIKA SERVICE (ENTRY POINT)
# ==============================
pipeline.service.tika.grpc-endpoint=localhost:50405
pipeline.service.tika.kafka-publish-topics=tika-documents
# ==============================
# SOLR TO KAFKA (ENTRY POINT)
# ==============================
pipeline.service.solr-to-kafka.grpc-endpoint=localhost:50404
pipeline.service.solr-to-kafka.kafka-publish-topics=solr-documents
# ==============================
# CHUNKER SERVICE
# ==============================
pipeline.service.chunker.grpc-endpoint=localhost:50402
pipeline.service.chunker.kafka-listen-topics=solr-documents,tika-documents
pipeline.service.chunker.grpc-forward-to=embedder
# ==============================
# EMBEDDER SERVICE
# ==============================
pipeline.service.embedder.grpc-endpoint=localhost:50401
pipeline.service.embedder.kafka-listen-topics=chunker
pipeline.service.embedder.kafka-publish-topics=enhanced-documents
# ==============================
# SOLR INDEXER (FINAL STEP)
# ==============================
pipeline.service.solr-indexer.grpc-endpoint=localhost:50403
pipeline.service.solr-indexer.kafka-listen-topics=enhanced-documents
#Sun Mar 09 13:27:54 EDT 2025
endpoints.all.enabled=true
endpoints.all.sensitive=false
endpoints.health.details-visible=ANONYMOUS
endpoints.loggers.write-sensitive=false
syntax = "proto3";
package com.krickert.search.pipeline;
option java_multiple_files = true;
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/empty.proto";
message PipeDoc {
string id = 1;
string title = 2;
string body = 3;
repeated string keywords = 4;
string document_type = 5;
string revision_id = 6;
google.protobuf.Timestamp creation_date = 7;
google.protobuf.Timestamp last_modified = 8;
google.protobuf.Struct custom_data = 9;
optional SemanticData semantic_data = 10;
}
message SemanticData {
map<string, SemanticDocument> chunkData = 8;
}
message SemanticDocument {
repeated SemanticChunk chunks = 1;
string parent_id = 2;
map<string,string> metadata = 3;
}
message SemanticChunk {
string chunk_id = 1;
int64 chunk_number = 2;
string chunk = 3;
repeated float embeddings = 4;
}
message Route {
RouteType routeType = 1;
}
enum RouteType {
UNKNOWN = 0;
NULL_TERMINATION = 2;
KAFKA = 3;
GRPC = 4;
}
message PipeRequest {
PipeDoc doc = 1;
map<string, string> config = 2;
repeated Route destinations = 3;
}
message OutputResponse {
bool success = 1;
oneof reply {
PipeDoc outputDoc = 2;
ErrorData errorData = 3;
}
}
message ErrorData {
string errorMessage = 2;
repeated Route failedRoutes = 3;
optional PipeRequest errorRequest = 4;
}
message PipeResponse {
bool success = 1;
optional ErrorData errorDate = 2;
}
message PipeStream {
PipeRequest request = 1;
repeated PipeResponse pipeReplies = 2;
repeated string streamLogs = 3;
}
service PipelineService {
rpc forward(PipeStream) returns (google.protobuf.Empty);
rpc getOutput(PipeRequest) returns (PipeResponse);
}
package com.krickert.search.pipeline.config;
import io.micronaut.context.annotation.ConfigurationProperties;
import jakarta.inject.Singleton;
import lombok.Data;
import lombok.Getter;
import java.util.List;
import java.util.Map;
@Data
@Singleton
@ConfigurationProperties("pipeline")
public class PipelineConfig {
private final Map<String, ServiceConfiguration> service;
public PipelineConfig(Map<String, ServiceConfiguration> service) {
this.service = service;
}
public ServiceConfiguration getRouteForService(String serviceName) {
return service.get(serviceName);
}
public String getServiceGrpcEndpoint(String serviceName) {
return getRouteForService(serviceName).getGrpcEndpoint();
}
public List<String> getServiceKafkaListenTopics(String serviceName) {
return getRouteForService(serviceName).getKafkaListenTopics();
}
public List<String> getServiceKafkaPublishTopics(String serviceName) {
return getRouteForService(serviceName).getKafkaPublishTopics();
}
public List<String> getServiceGrpcForwardTo(String serviceName) {
return getRouteForService(serviceName).getGrpcForwardTo();
}
}
package com.krickert.search.pipeline.config;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import jakarta.inject.Singleton;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Data
@EachProperty("pipeline.service")
@Singleton
public class ServiceConfiguration {
private final String name;
private String grpcEndpoint;
private List<String> kafkaListenTopics;
private List<String> kafkaPublishTopics;
private List<String> grpcForwardTo;
public ServiceConfiguration(@Parameter String name) {
this.name = name;
}
}
package com.krickert.search.pipeline;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Value;
import io.micronaut.grpc.annotation.GrpcService;
import io.micronaut.http.annotation.Controller;
import io.micronaut.messaging.annotation.Topic;
import io.reactivex.Single;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import com.krickert.search.model.pipe.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@GrpcService
public abstract class AbstractPipelineService extends PipelineServiceGrpc.PipelineServiceImplBase {
@Inject private KafkaProducer kafkaProducer;
@Inject private GrpcForwarder grpcForwarder;
@Inject private PipelineConfig pipelineConfig;
@Value("${pipeline.service.name}")
private String serviceName; // Inject the service name for self-registration
private static final Map<String, RouteConfig> routeRegistry = new ConcurrentHashMap<>();
public AbstractPipelineService() {
log.info("Initializing pipeline service: {}", serviceName);
}
/**
* The only method users need to implement to process documents.
*/
protected abstract PipeDoc processPipe(PipeDoc doc);
@Override
public void forward(PipeStream request, io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
log.info("Received request for forward processing, processing {} documents", request.getRequestCount());
Flux.fromIterable(request.getPipeRepliesList())
.flatMap(reply -> {
PipeDoc enhancedDoc;
try {
enhancedDoc = processPipe(reply.getRequest().getDoc());
} catch (Exception e) {
log.error("Processing failed: {}", e.getMessage(), e);
return handleError(reply.getRequest(), e.getMessage());
}
return sendToDestinations(reply.getRequest(), enhancedDoc);
})
.doOnTerminate(() -> responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance()))
.subscribe();
}
private Mono<PipeStream> sendToDestinations(PipeRequest originalRequest, PipeDoc updatedDoc) {
RouteConfig routes = pipelineConfig.getRouteForService(serviceName);
if (routes == null) {
log.error("No configured routes found for service: {}", serviceName);
return Mono.empty();
}
log.info("Routing document {} to destinations", updatedDoc.getId());
// Send to Kafka
routes.getKafkaTopics().forEach(topic -> kafkaProducer.send(topic, updatedDoc));
// Send to gRPC destinations
routes.getGrpcEndpoints().forEach(endpoint -> grpcForwarder.send(endpoint, updatedDoc));
return Mono.just(PipeStream.newBuilder()
.addPipeReplies(PipeResponse.newBuilder().setSuccess(true).build())
.build());
}
private Mono<PipeStream> handleError(PipeRequest request, String errorMessage) {
RouteConfig routes = pipelineConfig.getRouteForService(serviceName);
ErrorData errorData = ErrorData.newBuilder()
.setErrorMessage(errorMessage)
.addAllFailedRoutes(request.getDestinationsList())
.setErrorRequest(request)
.build();
PipeResponse errorResponse = PipeResponse.newBuilder()
.setSuccess(false)
.setErrorDate(errorData)
.build();
log.error("Routing error to failure queue");
routes.getKafkaFailureTopics().forEach(topic -> kafkaProducer.send(topic, request.getDoc()));
return Mono.just(PipeStream.newBuilder()
.addPipeReplies(errorResponse)
.build());
}
}
Having each pipeline step run behind a load-balanced set of containers (like on Kubernetes/ECS/EKS) with multiple instances per step is common and quite effective.
Given you're deploying this in AWS (or eventually Azure) using load-balanced containers, here’s how to approach this clearly and efficiently:
If each step (forward
) is running behind multiple container replicas, then unary asynchronous RPC calls are typically easiest to load-balance initially.
-
Easy Horizontal Scaling:
Unary async calls are stateless and easy to horizontally scale behind a load balancer or Kubernetes Service. -
Simpler debugging and logging:
Each call has clear start/end boundaries. -
Better support for retries:
You can easily implement retries if a downstream pipeline step temporarily fails. -
Great starting point:
You’ll quickly see if your bottlenecks justify the complexity of streaming.
Streaming might help in these cases:
-
Persistent connections reduce overhead when you're handling millions of short-lived requests per second, and connection establishment overhead becomes a bottleneck.
-
Reduced latency for critical pipelines, as streams offer continuous connections and rapid message passing.
-
Backpressure: the receiving service can throttle traffic if overwhelmed.
However, there's an important caveat:
Load balancing with gRPC streaming is complex because streams are persistent. Typically, streams are balanced at connection time—not per-message. Once a connection is opened, all messages for that stream go to a single server instance.
- This means your load balancer should use advanced algorithms like:
- HTTP/2-aware (e.g., gRPC-aware) load balancing in Kubernetes with a service mesh (e.g., Linkerd or Istio).
- Or use gRPC-specific load balancing (client-side load balancing).
- Standard AWS Application Load Balancers (ALB) don’t support streaming gRPC by default. Use Network Load Balancers (NLB) if streaming is required.
- Kubernetes services: Typically use headless services, ClusterIP, or gRPC Load Balancing integrated via service mesh tools (Linkerd, Istio, or Envoy) for better streaming compatibility.
- If you must use streaming later, plan to incorporate service meshes for efficient routing.
Given your setup and the goal of simplicity, scalability, and ease of debugging:
✅ Recommended now:
- Remove the
stream
keyword for now:
rpc forward(PipeStream) returns (PipeStream);
-
Use async gRPC clients from your Java codebase for parallelism.
-
Deploy multiple instances behind a load balancer or Kubernetes service.
-
Use Kubernetes services or AWS ALBs to balance unary calls easily:
[Pipeline Step A containers] → Service Load Balancer → [Pipeline Step B containers] → Service Load Balancer → Step C → Solr
Each step can independently scale replicas without complexity.
Your Java client could look like this:
PipelineSearviceGrpc.PipelineSearviceStub asyncStub = PipelineSearviceGrpc.newStub(channel);
// Call downstream asynchronously
asyncStub.forward(pipeStreamRequest, new StreamObserver<PipeStream>() {
@Override
public void onNext(PipeStream response) {
if (response.getSuccess()) {
// Forward succeeded
} else {
// handle failure
}
}
@Override
public void onError(Throwable t) {
// log, retry, or fallback logic
}
@Override
public void onCompleted() {
// done
}
});
Using async clients ensures you don’t block threads and maximize performance.
If you reach throughput limitations or see overhead in thousands of new connections, evaluate streaming:
- Measure time spent per connection setup vs actual work processing.
- Evaluate if connection overhead becomes a significant latency bottleneck.
- If streaming benefits outweigh complexity, then incorporate gRPC streaming with a Kubernetes service mesh.
- Start with simple Horizontal Pod Autoscaler (HPA) in Kubernetes.
- Each pipeline step can independently scale, based on CPU usage or queue lengths.
- Kafka can buffer incoming traffic spikes, and downstream services can scale independently.
Kafka Topic → Pipeline Step 1 (Parse) → Kafka → Pipeline Step 2 (Embedder) → Kafka → Pipeline Step 3 (Solr Indexer)
Kafka as the broker helps you decouple steps, easily scale containers independently, and leverage Kafka’s persistence and resilience.
A solid architecture recommendation for your scenario:
Kafka Topic -> [Indexer Pipeline Step A]
-> Kafka Topic -> [Indexer Pipeline Step B]
-> Kafka Topic -> [Indexer Step C]
-> Kafka Topic (indexed docs) -> Solr
- Each pipeline step communicates via Kafka for resilience.
- Use gRPC unary calls (async) to signal and coordinate smaller internal status or success/failure acknowledgments as needed.
- Kafka’s inherent resilience ensures pipeline durability and simplicity.
- Using gRPC streaming to achieve persistent connections can be ideal in ultra-low-latency scenarios.
- Your scenario (high throughput, batch indexing with Kafka) might not require persistent streaming upfront.
- It’s generally easier to start unary and upgrade to streaming later if needed rather than vice versa.
✔️ For immediate clarity and speed, remove streaming for now and rely on Kafka buffering + async gRPC calls:
rpc forward(PipeStream) returns (PipeStream);
Async unary calls will give you a clean, highly scalable setup from the start.
✔️ After establishing solid metrics, you can then experiment with gRPC streaming if needed for additional performance.
Stage | Approach | Complexity |
---|---|---|
Start Here | Async Unary Calls (No Streaming) | ✅ Simple |
Scale Out | Async Unary + Kubernetes Scaling | ✅ Simple |
If Bottlenecked | Add gRPC Streaming w/ Service Mesh (Istio/Linkerd) |
-
✅ Initially:
Remove streaming. Keep simple, async, unary RPCs. -
✅ Long-term:
Evaluate streaming if overhead or latency demands optimization. Plan on service mesh integration (Istio, Linkerd) for streaming support in Kubernetes.
This approach minimizes initial complexity and maximizes long-term flexibility.
Below is an example of how you might set up a generic Micronaut application that exposes a gRPC service (with a streaming “forward” method) and also uses Kafka for both output‐routing and for receiving messages that are “forwarded” by the same service logic. (In production you’d likely split things into separate files and add more production‐grade error handling, configuration, logging, etc.)
In this example we assume you have a proto file for your index document (shown below) and a separate proto file that defines the “Pipe” message (which bundles the index document, configuration, and one or more routes) along with the gRPC service definition.
index_document.proto
(This is your provided sample – note that it lives in package com.krickert.search.model.document
.)
syntax = "proto3";
package com.krickert.search.model.document;
option java_multiple_files = true;
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
message IndexDocument {
string id = 1;
string title = 2;
string body = 3;
repeated string keywords = 4;
google.protobuf.Timestamp creation_date = 5;
google.protobuf.Timestamp last_updated_date = 6;
google.protobuf.Struct custom_data = 7;
}
pipeline.proto
(This file defines our “Pipe” object, a simple enum for route types, and the PipelineService with two methods: a streaming “forward” and a non-streaming “getOutput” for testing.)
syntax = "proto3";
package com.example.pipeline;
import "google/protobuf/struct.proto";
import "com/krickert/search/model/document.proto"; // Reference to your index document proto
enum RouteType {
UNKNOWN = 0;
NULL_TERMINATION = 1;
KAFKA = 2;
GRPC = 3;
}
message Route {
RouteType routeType = 1;
// For Kafka: the topic name; for gRPC: the endpoint tag or destination identifier.
string destination = 2;
}
message Pipe {
com.krickert.search.model.document.IndexDocument indexDocument = 1;
// Generic configuration options (key/value pairs)
map<string, string> config = 2;
// One or more routes (e.g. send to Kafka, then to gRPC, etc.)
repeated Route routes = 3;
}
message PipeResponse {
bool success = 1;
// An optional error message
string errorMessage = 2;
// List of routes that failed to forward
repeated Route failedRoutes = 3;
}
service PipelineService {
// A bidirectional streaming call
rpc forward(stream Pipe) returns (stream PipeResponse);
// A simple call that returns the output of this pipe step
rpc getOutput(Pipe) returns (PipeResponse);
}
After compiling your protos with the Micronaut gRPC plugin, you’ll have generated Java classes (e.g. Pipe
, Route
, PipeResponse
, and PipelineServiceGrpc
).
Below is a sample service implementation that “processes” a Pipe by iterating through its routes. For each route, it either:
- Forwards the message to a Kafka topic (using a Kafka client),
- Forwards it to a gRPC endpoint (using an injected gRPC client/stub), or
- Does nothing if the route type is
NULL_TERMINATION
.
If any route fails, we catch the exception and record the failed route. Optionally, we then try to forward the same message to a backup Kafka topic.
Note: In this example, both the streaming
forward
and the non-streaminggetOutput
methods call the same privateprocessPipe
method.
// PipelineServiceImpl.java
package com.example.pipeline;
import io.grpc.stub.StreamObserver;
import io.micronaut.configuration.grpc.server.GrpcService;
import jakarta.inject.Inject;
import java.util.List;
@GrpcService
public class PipelineServiceImpl extends PipelineServiceGrpc.PipelineServiceImplBase {
@Inject
KafkaForwarder kafkaForwarder;
@Inject
GrpcForwarder grpcForwarder;
@Override
public StreamObserver<Pipe> forward(StreamObserver<PipeResponse> responseObserver) {
return new StreamObserver<Pipe>() {
@Override
public void onNext(Pipe pipe) {
PipeResponse response = processPipe(pipe);
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
// Log error as needed and propagate the error
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
@Override
public void getOutput(Pipe request, StreamObserver<PipeResponse> responseObserver) {
PipeResponse response = processPipe(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
private PipeResponse processPipe(Pipe pipe) {
PipeResponse.Builder responseBuilder = PipeResponse.newBuilder();
List<Route> routes = pipe.getRoutesList();
for (Route route : routes) {
try {
switch (route.getRouteType()) {
case KAFKA:
kafkaForwarder.forwardToKafka(pipe, route);
break;
case GRPC:
grpcForwarder.forwardToGrpc(pipe, route);
break;
case NULL_TERMINATION:
// Do nothing: this route terminates forwarding.
break;
default:
throw new IllegalArgumentException("Unsupported route type: " + route.getRouteType());
}
} catch (Exception e) {
// Record the failed route (and log the error)
responseBuilder.addFailedRoutes(route);
// Optionally: forward the same pipe to a backup Kafka topic for reprocessing
try {
kafkaForwarder.forwardToBackup(pipe, route);
} catch (Exception backupEx) {
// Log backup failure if needed
}
}
}
boolean success = responseBuilder.getFailedRoutesCount() == 0;
responseBuilder.setSuccess(success);
if (!success) {
responseBuilder.setErrorMessage("Some routes failed during processing.");
}
return responseBuilder.build();
}
}
The KafkaForwarder
uses an injected Kafka client (here defined by the interface KafkaForwarderClient
) to send messages. The forwardToKafka
method sends the message to the topic specified in the route’s destination
field. There’s also an example method for sending to a backup topic if the primary route fails.
// KafkaForwarder.java
package com.example.pipeline;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@Singleton
public class KafkaForwarder {
@Inject
KafkaForwarderClient kafkaForwarderClient;
public void forwardToKafka(Pipe pipe, Route route) {
// The 'destination' field contains the Kafka topic name.
String topic = route.getDestination();
kafkaForwarderClient.send(topic, pipe);
}
public void forwardToBackup(Pipe pipe, Route route) {
// Use a backup topic (e.g. prefix with "backup-") for reprocessing failed messages.
String backupTopic = "backup-" + route.getDestination();
kafkaForwarderClient.send(backupTopic, pipe);
}
}
The Kafka client interface is defined as follows. (Micronaut supports dynamic topic names by passing the topic as a method parameter.)
// KafkaForwarderClient.java
package com.example.pipeline;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
@KafkaClient
public interface KafkaForwarderClient {
// This method sends a Pipe message to the given topic.
void send(String topic, Pipe pipe);
}
In this example the GrpcForwarder
uses a gRPC client stub to forward the Pipe message to another gRPC endpoint. In a real application you might maintain a registry of stubs (keyed by the destination tag) so that each route’s destination is handled appropriately. Here we demonstrate a simple call to the getOutput
method on a stub.
// GrpcForwarder.java
package com.example.pipeline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import jakarta.inject.Singleton;
import jakarta.inject.Inject;
@Singleton
public class GrpcForwarder {
// For demonstration we create a single stub.
// In practice, you may want to select the stub based on route.getDestination().
private final PipelineServiceGrpc.PipelineServiceBlockingStub stub;
@Inject
public GrpcForwarder() {
// Create a channel to the destination gRPC service (address could be externalized to config)
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
this.stub = PipelineServiceGrpc.newBlockingStub(channel);
}
public void forwardToGrpc(Pipe pipe, Route route) {
// In a real-world scenario, use route.getDestination() to choose the correct stub.
// Here we simply call the getOutput method and ignore the response.
stub.getOutput(pipe);
}
}
Finally, you can have a Kafka listener that receives Pipe messages (e.g. from an “input” topic) and then processes them using the same service logic. This allows the same code to be used whether the message comes in over gRPC or via Kafka.
// PipelineKafkaListener.java
package com.example.pipeline;
import io.grpc.stub.StreamObserver;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@KafkaListener(groupId = "pipe-group")
@Singleton
public class PipelineKafkaListener {
@Inject
PipelineServiceImpl pipelineService;
@Topic("input-pipe")
public void receive(Pipe pipe) {
// Process the pipe using the same logic as in the gRPC service.
pipelineService.getOutput(pipe, new StreamObserver<PipeResponse>() {
@Override
public void onNext(PipeResponse response) {
// For example, log the response or perform additional processing.
}
@Override
public void onError(Throwable t) {
// Log error as needed.
}
@Override
public void onCompleted() {
// Completion logic if needed.
}
});
}
}
This code sample shows:
- A gRPC service exposing a streaming
forward
method and a non-streaminggetOutput
method. - A Pipe object that contains an index document, configuration, and one or more routes.
- Forwarding logic that, based on each route’s type, sends the message to a Kafka topic or makes a gRPC call.
- Error handling that records which routes fail and optionally attempts a backup (e.g. by sending to a backup Kafka topic).
- A Kafka listener that feeds messages into the same processing logic as the gRPC endpoint.
You can further refine this sample by externalizing configuration (for destination addresses, backup topics, etc.), implementing richer error handling and retry logic, and adding proper logging and metrics as needed.