Pipeline part 3 - krickert/search-api GitHub Wiki
Here's the full Micronaut-based abstract gRPC implementation that handles Kafka and gRPC forwarding, allowing users to only define processPipe()
. This setup:
β
Auto-loads pipeline configurations from properties
β
Registers the service dynamically based on configurations
β
Automatically routes messages to Kafka and gRPC
β
Handles StreamObserver responses
β
Only requires the user to define a simple processPipe
function
Here's a structured way to define your pipeline routes dynamically:
pipeline:
registry:
route:
embedder:
kafka:
topic: "embeddings"
topic-failures: "embeddings-failures"
grpc:
endpoint: "localhost:50401"
chunker:
kafka:
topic: "chunk"
topic-failures: "chunk-failures"
grpc:
endpoint: "localhost:50402"
solr-indexer:
kafka:
topic: "enhanced-documents"
topic-failures: "enhanced-documents-failures"
grpc:
endpoint: "localhost:50403"
solr-to-kafka:
kafka:
topic: "solr-documents"
topic-failures: "solr-documents-failures"
grpc:
endpoint: "localhost:50404"
tika:
kafka:
topic: "tika-documents"
topic-failures: "tika-document-failures"
grpc:
endpoint: "localhost:50405"
This makes it easy to register multiple services dynamically without hardcoding.
This is the base class that handles everything for the user, except processPipe()
.
- Loads pipeline registry from configuration
- Detects which routes to send to (Kafka/gRPC)
- Automatically sends failures to dead-letter topics
- Handles gRPC async calls and Kafka publishing
- The user only defines
processPipe(PipeDoc doc)
package com.krickert.search.pipeline;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.context.annotation.Value;
import io.micronaut.grpc.annotation.GrpcService;
import io.micronaut.http.annotation.Controller;
import io.micronaut.messaging.annotation.Topic;
import io.reactivex.Single;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import com.krickert.search.model.pipe.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@GrpcService
public abstract class AbstractPipelineService extends PipelineServiceGrpc.PipelineServiceImplBase {
@Inject private KafkaProducer kafkaProducer;
@Inject private GrpcForwarder grpcForwarder;
@Inject private PipelineConfig pipelineConfig;
@Value("${pipeline.service.name}")
private String serviceName; // Inject the service name for self-registration
private static final Map<String, RouteConfig> routeRegistry = new ConcurrentHashMap<>();
public AbstractPipelineService() {
log.info("Initializing pipeline service: {}", serviceName);
}
/**
* The only method users need to implement to process documents.
*/
protected abstract PipeDoc processPipe(PipeDoc doc);
@Override
public void forward(PipeStream request, io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
log.info("Received request for forward processing, processing {} documents", request.getRequestCount());
Flux.fromIterable(request.getPipeRepliesList())
.flatMap(reply -> {
PipeDoc enhancedDoc;
try {
enhancedDoc = processPipe(reply.getRequest().getDoc());
} catch (Exception e) {
log.error("Processing failed: {}", e.getMessage(), e);
return handleError(reply.getRequest(), e.getMessage());
}
return sendToDestinations(reply.getRequest(), enhancedDoc);
})
.doOnTerminate(() -> responseObserver.onNext(com.google.protobuf.Empty.getDefaultInstance()))
.subscribe();
}
private Mono<PipeStream> sendToDestinations(PipeRequest originalRequest, PipeDoc updatedDoc) {
RouteConfig routes = pipelineConfig.getRouteForService(serviceName);
if (routes == null) {
log.error("No configured routes found for service: {}", serviceName);
return Mono.empty();
}
log.info("Routing document {} to destinations", updatedDoc.getId());
// Send to Kafka
routes.getKafkaTopics().forEach(topic -> kafkaProducer.send(topic, updatedDoc));
// Send to gRPC destinations
routes.getGrpcEndpoints().forEach(endpoint -> grpcForwarder.send(endpoint, updatedDoc));
return Mono.just(PipeStream.newBuilder()
.addPipeReplies(PipeResponse.newBuilder().setSuccess(true).build())
.build());
}
private Mono<PipeStream> handleError(PipeRequest request, String errorMessage) {
RouteConfig routes = pipelineConfig.getRouteForService(serviceName);
ErrorData errorData = ErrorData.newBuilder()
.setErrorMessage(errorMessage)
.addAllFailedRoutes(request.getDestinationsList())
.setErrorRequest(request)
.build();
PipeResponse errorResponse = PipeResponse.newBuilder()
.setSuccess(false)
.setErrorDate(errorData)
.build();
log.error("Routing error to failure queue");
routes.getKafkaFailureTopics().forEach(topic -> kafkaProducer.send(topic, request.getDoc()));
return Mono.just(PipeStream.newBuilder()
.addPipeReplies(errorResponse)
.build());
}
}
The user only has to extend AbstractPipelineService
and implement processPipe()
:
@GrpcService
public class EmbeddingPipelineService extends AbstractPipelineService {
@Override
protected PipeDoc processPipe(PipeDoc doc) {
log.info("Processing embeddings for document: {}", doc.getId());
// Simulate embedding generation
SemanticData enrichedData = SemanticData.newBuilder()
.putChunkData("chunk1", SemanticDocument.newBuilder().build())
.build();
return doc.toBuilder().setSemanticData(enrichedData).build();
}
}
This producer is used for sending messages to Kafka.
@KafkaClient
public interface KafkaProducer {
@Topic("")
void send(@Topic String topic, PipeDoc message);
}
Handles sending documents to other gRPC services.
@Singleton
public class GrpcForwarder {
private final Map<String, PipelineServiceGrpc.PipelineServiceStub> clients = new ConcurrentHashMap<>();
public void send(String endpoint, PipeDoc doc) {
if (!clients.containsKey(endpoint)) {
ManagedChannel channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
clients.put(endpoint, PipelineServiceGrpc.newStub(channel));
}
PipelineServiceGrpc.PipelineServiceStub stub = clients.get(endpoint);
PipeStream request = PipeStream.newBuilder().addPipeReplies(
PipeResponse.newBuilder().setSuccess(true).build()
).build();
stub.forward(request, new StreamObserver<com.google.protobuf.Empty>() {
@Override
public void onNext(com.google.protobuf.Empty value) {}
@Override
public void onError(Throwable t) {
log.error("gRPC forwarding error to {}: {}", endpoint, t.getMessage());
}
@Override
public void onCompleted() {}
});
}
}
β
User only needs to extend and implement processPipe(PipeDoc doc)
β
Automatically handles routing via Kafka or gRPC
β
Loads dynamic pipeline configurations from properties
β
Routes errors automatically to failure topics
β
Handles async operations using Micronaut + Reactor
This is fully extensible, highly scalable, and configurable via properties without needing to change code every time you add a new pipeline step. π
We'll use Micronautβs @ConfigurationProperties
to load the pipeline dynamically into a Map<String, ServiceConfiguration>
, where:
- The key (
String
) is the service name. - The value (
ServiceConfiguration
) stores Kafka and gRPC settings.
This represents each serviceβs Kafka and gRPC settings.
package com.krickert.search.pipeline.config;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import jakarta.inject.Singleton;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
@EachProperty("pipeline.service")
@Singleton
public class ServiceConfiguration {
private final String name;
private List<String> kafkaListenTopics;
private List<String> kafkaPublishTopics;
private String grpcForwardTo;
public ServiceConfiguration(@Parameter String name) {
this.name = name;
}
}
This loads all services into a Map where the key is the service name.
package com.krickert.search.pipeline.config;
import io.micronaut.context.annotation.ConfigurationProperties;
import jakarta.inject.Singleton;
import lombok.Getter;
import java.util.Map;
@Getter
@Singleton
@ConfigurationProperties("pipeline")
public class PipelineConfig {
private final Map<String, ServiceConfiguration> service;
public PipelineConfig(Map<String, ServiceConfiguration> service) {
this.service = service;
}
public ServiceConfiguration getRouteForService(String serviceName) {
return service.get(serviceName);
}
}
Now, you can inject PipelineConfig
anywhere to get the map of services.
Example Usage in AbstractPipelineService
:
@Inject private PipelineConfig pipelineConfig;
public void processService(String serviceName) {
ServiceConfiguration config = pipelineConfig.getRouteForService(serviceName);
if (config != null) {
log.info("Service: {}", serviceName);
log.info("Listening to Kafka topics: {}", config.getKafkaListenTopics());
log.info("Publishing to Kafka topics: {}", config.getKafkaPublishTopics());
log.info("Forwarding via gRPC to: {}", config.getGrpcForwardTo());
} else {
log.warn("No configuration found for service: {}", serviceName);
}
}
Micronaut will automatically map these properties:
pipeline.service.solr-indexer.kafka.listen-topics=solr-documents
pipeline.service.solr-indexer.kafka.publish-topics=
pipeline.service.solr-indexer.grpc.forward-to=null
pipeline.service.chunker.kafka.listen-topics=solr-documents
pipeline.service.chunker.grpc.forward-to=embedder
pipeline.service.embedder.kafka.listen-topics=chunker
pipeline.service.embedder.kafka.publish-topics=enhanced-documents
pipeline.service.solr-indexer.kafka.listen-topics=enhanced-documents
pipeline.service.solr-indexer.grpc.forward-to=null
β
Fully Dynamic Configuration using Micronaut
β
Properties-to-Object Mapping for clarity
β
Easy Retrieval via PipelineConfig.getRouteForService(serviceName)
β
Extensible & Clean
This approach keeps your pipeline flexible, meaning you can add/remove services without modifying code. π
pipeline.registry.route.embedder.kafka.topic=embeddings
pipeline.registry.route.embedder.kafka.topic-failures=embeddings-failures
pipeline.registry.route.embedder.grpc.endpoint=localhost:50401
pipeline.registry.route.chunker.kafka.topic=chunk
pipeline.registry.route.chunker.kafka.topic-failures=chunk-failures
pipeline.registry.route.chunker.grpc.endpoint=localhost:50402
pipeline.registry.route.solr-indexer.kafka.topic=enhanced-documents
pipeline.registry.route.solr-indexer.kafka.topic-failures=enhanced-documents-failures
pipeline.registry.route.solr-indexer.grpc.endpoint=localhost:50403
pipeline.registry.route.solr-to-kafka.kafka.topic=solr-documents
pipeline.registry.route.solr-to-kafka.kafka.topic-failures=solr-documents-failures
pipeline.registry.route.solr-to-kafka.grpc.endpoint=localhost:50404
pipeline.registry.route.tika.kafka.topic=tika-documents
pipeline.registry.route.tika.kafka.topic-failures=tika-document-failures
pipeline.registry.route.tika.grpc.endpoint=localhost:50405
Not sure if we'll use the above?
syntax = "proto3";
package com.krickert.search.pipeline;
option java_multiple_files = true;
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/empty.proto";
message PipeDoc {
string id = 1;
string title = 2;
string body = 3;
repeated string keywords = 4;
string document_type = 5;
string revision_id = 6;
google.protobuf.Timestamp creation_date = 7;
google.protobuf.Timestamp last_modified = 8;
google.protobuf.Struct custom_data = 9;
optional SemanticData semantic_data = 10;
}
message SemanticData {
map<string, SemanticDocument> chunkData = 8;
}
message SemanticDocument {
repeated SemanticChunk chunks = 1;
string parent_id = 2;
map<string,string> metadata = 3;
}
message SemanticChunk {
string chunk_id = 1;
int64 chunk_number = 2;
string chunk = 3;
repeated float embeddings = 4;
}
message Route {
RouteType routeType = 1;
}
enum RouteType {
UNKNOWN = 0;
NULL_TERMINATION = 2;
KAFKA = 3;
GRPC = 4;
}
message PipeRequest {
PipeDoc doc = 1;
map<string, string> config = 2;
repeated Route destinations = 3;
}
message OutputResponse {
bool success = 1;
oneof reply {
PipeDoc outputDoc = 2;
ErrorData errorData = 3;
}
}
message ErrorData {
string errorMessage = 2;
repeated Route failedRoutes = 3;
optional PipeRequest errorRequest = 4;
}
message PipeResponse {
bool success = 1;
optional ErrorData errorDate = 2;
}
message PipeStream {
PipeRequest request = 1;
repeated PipeResponse pipeReplies = 2;
repeated string streamLogs = 3;
}
service PipelineService {
rpc forward(PipeStream) returns (google.protobuf.Empty);
rpc getOutput(PipeRequest) returns (PipeResponse);
}