Notes on a tika grpc streaming response service - krickert/search-api GitHub Wiki
Below is an example of how you might wire things up so that Tika parses an incoming document “on the fly” and streams content chunks via a gRPC streaming reply while converting metadata into a Google Protobuf Struct. In this example, we assume that you’ve defined your proto so that your streaming reply message (here called DocumentResponse
) contains a oneof for either a metadata message (a google.protobuf.Struct) or a content chunk (a string). (If your proto previously expected the entire document content as a single field, you’ll need to update it to support streaming.)
Sample Proto Definition
For example, your proto might look like this:
syntax = "proto3";
package mypackage;
import "google/protobuf/struct.proto";
message DocumentRequest {
// The document content (could be bytes or some reference)
bytes content = 1;
}
message DocumentResponse {
oneof data {
google.protobuf.Struct metadata = 1;
string content_chunk = 2;
}
}
service DocumentService {
// Server streaming: many responses (chunks + final metadata) will be sent.
rpc parseDocument(DocumentRequest) returns (stream DocumentResponse);
}
Java Server Implementation Using Tika Streaming
Below is an example of a Java implementation that uses Tika’s AutoDetectParser
with a custom SAX ContentHandler
that flushes text chunks immediately to the gRPC response stream. It also converts Tika’s metadata (normally returned as strings) into a Struct
that can represent numbers, booleans, or strings.
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.grpc.stub.StreamObserver;
import mypackage.DocumentRequest;
import mypackage.DocumentResponse;
import mypackage.DocumentServiceGrpc;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Arrays;
// gRPC service implementation.
public class DocumentServiceImpl extends DocumentServiceGrpc.DocumentServiceImplBase {
@Override
public void parseDocument(DocumentRequest request, StreamObserver<DocumentResponse> responseObserver) {
// Create an InputStream from the request content.
InputStream input = new ByteArrayInputStream(request.getContent().toByteArray());
AutoDetectParser parser = new AutoDetectParser();
Metadata metadata = new Metadata();
ParseContext context = new ParseContext();
// Create our streaming content handler.
// Here we flush after every 1024 characters. Adjust as needed.
StreamingContentHandler streamingHandler = new StreamingContentHandler(responseObserver, 1024);
try {
// The parse call will stream content via the streamingHandler.
parser.parse(input, streamingHandler, metadata, context);
// Send any remaining buffered content.
streamingHandler.flushBuffer();
// After parsing is complete, convert metadata to a protobuf Struct.
Struct metadataStruct = convertMetadataToStruct(metadata);
responseObserver.onNext(
DocumentResponse.newBuilder().setMetadata(metadataStruct).build()
);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(e);
}
}
/**
* Converts Tika Metadata to a Google Protobuf Struct.
* Attempts to convert values to numbers or booleans when possible.
*/
private Struct convertMetadataToStruct(Metadata metadata) {
Struct.Builder structBuilder = Struct.newBuilder();
Arrays.stream(metadata.names())
.forEach(name -> {
String value = metadata.get(name);
structBuilder.putFields(name, convertStringToValue(value));
});
return structBuilder.build();
}
/**
* Converts a String to a Protobuf Value.
* Attempts to parse the string as an int, double, or boolean.
*/
private Value convertStringToValue(String value) {
if (value == null) {
return Value.newBuilder().setNullValueValue(0).build();
}
// Attempt integer conversion.
try {
int intValue = Integer.parseInt(value);
return Value.newBuilder().setNumberValue(intValue).build();
} catch (NumberFormatException ex) {
// Not an integer.
}
// Attempt double conversion.
try {
double d = Double.parseDouble(value);
return Value.newBuilder().setNumberValue(d).build();
} catch (NumberFormatException ex) {
// Not a double.
}
// Attempt boolean conversion.
if ("true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value)) {
return Value.newBuilder().setBoolValue(Boolean.parseBoolean(value)).build();
}
// Fallback: use as string.
return Value.newBuilder().setStringValue(value).build();
}
}
The StreamingContentHandler
This custom handler extends SAX’s DefaultHandler
and overrides the characters
method. It buffers incoming characters until a set chunk size is reached, then sends the chunk via the gRPC stream. At the end of the document, any remaining text is flushed.
import io.grpc.stub.StreamObserver;
import mypackage.DocumentResponse;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;
public class StreamingContentHandler extends DefaultHandler {
private final StreamObserver<DocumentResponse> responseObserver;
private final StringBuilder buffer = new StringBuilder();
private final int chunkSize;
/**
* @param responseObserver the gRPC observer to which content chunks are sent
* @param chunkSize the number of characters to accumulate before sending a chunk
*/
public StreamingContentHandler(StreamObserver<DocumentResponse> responseObserver, int chunkSize) {
this.responseObserver = responseObserver;
this.chunkSize = chunkSize;
}
@Override
public void characters(char[] ch, int start, int length) throws SAXException {
buffer.append(ch, start, length);
if (buffer.length() >= chunkSize) {
flushBuffer();
}
}
/**
* Flushes the accumulated text buffer as a content chunk message.
*/
public void flushBuffer() {
if (buffer.length() > 0) {
String chunk = buffer.toString();
buffer.setLength(0); // clear the buffer
responseObserver.onNext(
DocumentResponse.newBuilder().setContentChunk(chunk).build()
);
}
}
@Override
public void endDocument() throws SAXException {
flushBuffer();
}
}
How It Works
-
Input Streaming:
The service receives the document content as bytes (or any streamable form). We wrap it in anInputStream
so that Tika can parse it without loading the whole document into memory. -
Tika Streaming:
Tika’sAutoDetectParser
is used with a customStreamingContentHandler
that immediately sends content chunks (instead of buffering them in aBodyContentHandler
). This minimizes the memory footprint. -
Metadata Conversion:
When parsing completes, theMetadata
object is converted into aStruct
(fromgoogle.protobuf.Struct
). The helper method tries to interpret string values as numbers or booleans if possible. -
gRPC Streaming:
The gRPC service method sends multipleDocumentResponse
messages. First, content chunks are sent as they’re parsed. Finally, once parsing is complete, the metadata is sent in a final message and the stream is closed.
Do You Need to Change the Proto?
If your current proto expects a single document object (with one metadata field and one large content field), you will need to update it to support streaming (for example, using a oneof to indicate whether a message is a metadata record or a content chunk). This lets you send parts of the content as they’re parsed rather than waiting for the entire document to be buffered in memory.
This example should give you a good starting point for streaming Tika output directly through your gRPC service while keeping memory usage low and preserving data types in your metadata.
==`import com.google.protobuf.ListValue; import com.google.protobuf.Value; import mypackage.BatchEmbeddingsReply; // adjust package as needed
public final class EmbeddingUtil {
private EmbeddingUtil() {
// Prevent instantiation
}
/**
* Converts a BatchEmbeddingsReply into a ListValue of ListValues. Each inner ListValue
* represents one embedding (a list of float values), preserving the grouping.
*
* @param reply the BatchEmbeddingsReply containing embeddings
* @return a ListValue where each element is a ListValue representing an embedding
*/
public static ListValue batchEmbeddingsReplyToListValue(BatchEmbeddingsReply reply) {
return ListValue.newBuilder()
.addAllValues(
reply.getEmbeddingsList().stream()
// For each Embedding, build an inner ListValue of its float values
.map(embedding -> {
ListValue innerList = ListValue.newBuilder()
.addAllValues(
embedding.getEmbeddingList().stream()
.map(f -> Value.newBuilder().setNumberValue(f).build())
.toList() // JDK 21's toList() collector
)
.build();
// Wrap the inner ListValue in a Value
return Value.newBuilder().setListValue(innerList).build();
})
.toList() // Collect all the embedding values into a list
)
.build();
}
} `