stream observer that counts - krickert/search-api GitHub Wiki

Here’s a reusable CountingStreamObserver<T> class that you can plug into any gRPC test or client code to count successes, failures, and total messages received for any T extends Message.

import io.grpc.stub.StreamObserver;
import com.google.protobuf.Message;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public class CountingStreamObserver<T extends Message> implements StreamObserver<T> {

    private final AtomicInteger successCount = new AtomicInteger();
    private final AtomicInteger failureCount = new AtomicInteger();
    private final AtomicInteger totalCount = new AtomicInteger();

    private final Consumer<T> onNextHandler;
    private final Consumer<Throwable> onErrorHandler;
    private final Runnable onCompletedHandler;

    public CountingStreamObserver() {
        this(null, null, null);
    }

    public CountingStreamObserver(Consumer<T> onNextHandler) {
        this(onNextHandler, null, null);
    }

    public CountingStreamObserver(Consumer<T> onNextHandler, Consumer<Throwable> onErrorHandler, Runnable onCompletedHandler) {
        this.onNextHandler = onNextHandler != null ? onNextHandler : m -> {};
        this.onErrorHandler = onErrorHandler != null ? onErrorHandler : t -> {};
        this.onCompletedHandler = onCompletedHandler != null ? onCompletedHandler : () -> {};
    }

    @Override
    public void onNext(T value) {
        totalCount.incrementAndGet();
        successCount.incrementAndGet();
        onNextHandler.accept(value);
    }

    @Override
    public void onError(Throwable t) {
        failureCount.incrementAndGet();
        totalCount.incrementAndGet();
        onErrorHandler.accept(t);
    }

    @Override
    public void onCompleted() {
        onCompletedHandler.run();
    }

    public int getSuccessCount() {
        return successCount.get();
    }

    public int getFailureCount() {
        return failureCount.get();
    }

    public int getTotalCount() {
        return totalCount.get();
    }

    public boolean isSuccess() {
        return failureCount.get() == 0;
    }

    public boolean hasFailures() {
        return failureCount.get() > 0;
    }

    public void reset() {
        successCount.set(0);
        failureCount.set(0);
        totalCount.set(0);
    }
}

Example usage:

CountingStreamObserver<MyResponse> observer = new CountingStreamObserver<>(
    response -> System.out.println("Received: " + response),
    throwable -> System.err.println("Error: " + throwable),
    () -> System.out.println("Stream completed")
);

// Then pass this to a gRPC async stub
myStub.myStreamingCall(request, observer);
⚠️ **GitHub.com Fallback** ⚠️