Solr Query Parallelism - krickert/search-api GitHub Wiki

Creating a system with:

  • Uses N concurrent threads (configurable via injection) to query Solr efficiently.
  • Ensures each thread fetches unique data using mod-based filtering (mod(hash(md5_hash_field), N)).
  • Uses cursor-based pagination for full dataset traversal while maximizing throughput.
  • Leverages a single Solr URL with 1 shard and 3 replicas, ensuring Solr internally balances the queries.
  • Optimizes querying strategy to maximize parallelism while keeping network traffic efficient.

I will get back to you shortly with a complete implementation.

Micronaut-Based Solr Parallel Query System

To efficiently query a Solr collection using Micronaut, we can design a parallel, partitioned fetching system. This approach uses multiple threads (or reactive tasks) in parallel to scan different portions of the index, ensuring full coverage without overlap. Key features include data partitioning by hash, cursor-based pagination, reactive HTTP requests, local shard preference, lightweight responses, and robust error handling. Below, we outline each aspect of the implementation in detail.

1. Data Partitioning Across N Threads

Partition the query load by splitting the collection into N disjoint subsets (where N is configurable via dependency injection). We use a mod-based hash filter on a stable field (e.g. an MD5-hash field) to ensure each thread handles a unique portion of data. For example, we can apply a Solr function filter in the query:

fq={!frange l=i u=i}mod(hash(md5_hash_field), N)

This filter uses Solr’s function query to compute hash(md5_hash_field) (or the field’s numeric value if it’s already a hash) and then take a modulo N. By wrapping it in a frange query with lower and upper bound = i, we get only documents whose hash mod N equals the partition index i ([lucene - How to implement mod in solr query - Stack Overflow](https://stackoverflow.com/questions/12197090/how-to-implement-mod-in-solr-query#:~:text=I%20have%20found%20the%20solution,Query%20seem%20like)). Each thread i (for i = 0 to N–1) will use a different value of i in this filter, so no two threads fetch the same document. This guarantees unique subsets per thread and full coverage of the collection.

Implementation Tip: Inject the number of partitions (threads) via configuration (e.g. @Value("${solr.numThreads}") int N). Create an executor or reactive loop that iterates i from 0 to N-1, launching an asynchronous Solr query for each partition.

2. Cursor-Based Pagination for Complete Retrieval

Each thread must retrieve all documents in its partition. Instead of using high start offsets (which is inefficient for deep paging), use cursor-based pagination with Solr’s cursorMark parameter. Cursor-based pagination allows scanning through large result sets without the performance penalties of deep paging ([How does Solr's cursorMark solve deep pagination while being stateless? - Stack Overflow](https://stackoverflow.com/questions/39629747/how-does-solrs-cursormark-solve-deep-pagination-while-being-stateless#:~:text=cursorMark%20doesn%27t%20affect%20search%20,becomes%20moot%2C%20as%20it%20doesn%27t)). The approach is:

Each thread will repeatedly call Solr with its partition filter and an updating cursorMark, collecting documents page by page. This ensures we fetch the entire result set for that partition efficiently, without heavy memory load on Solr that a deep start would cause.

3. Using Micronaut’s Reactive HTTP Client for Concurrency

Micronaut’s HTTP client supports reactive streams (RxJava, Project Reactor, etc.), which lets us perform multiple Solr requests concurrently without blocking threads ([Micronaut HTTP Client](https://guides.micronaut.io/latest/micronaut-http-client-maven-java.html#:~:text=Micronaut%20framework%20supports%20Reactive%20Streams,and%20scalability%20of%20your%20application)). We leverage this by launching N non-blocking HTTP calls in parallel – one for each partition:

  • Reactive client setup: Inject a low-level HTTP client with Micronaut’s @Client annotation pointing to the Solr base URL (or use HttpClient.create(...)). Ensure it’s configured for non-blocking I/O.
  • Parallel execution: For example, using RxJava, we can do: Flowable.range(0, N).flatMap(i -> queryPartition(i).subscribeOn(Schedulers.io())) – this will concurrently execute the queryPartition(i) reactive flow for each partition i. In Project Reactor, a similar approach can be done with Flux.merge or parallel().
  • Non-blocking requests: Each Solr request is made via the reactive client (e.g. client.retrieve(HttpRequest.GET(...), MyResponse.class) which returns a Publisher/Mono/Flowable). The threads are not blocked waiting for Solr; instead, the client uses Netty event loops to handle I/O, allowing maximum throughput.
  • Thread management: The “N threads” in this context can be logical threads or scheduler workers. Micronaut will handle the underlying event loop threads. N is simply the degree of parallelism. This is configurable, and the client calls can be scheduled on a bounded I/O thread pool if needed to avoid oversubscription.

By using the reactive HTTP client, the system can compose multiple HTTP calls efficiently and perform all partitioned queries concurrently ([Micronaut HTTP Client](https://guides.micronaut.io/latest/micronaut-http-client-maven-java.html#:~:text=Micronaut%20framework%20supports%20Reactive%20Streams,and%20scalability%20of%20your%20application)). This maximizes parallelism and fully utilizes available CPU and network resources.

4. Ensuring Local Query Execution (preferLocalShards)

When querying a SolrCloud collection, add preferLocalShards=true to each query. This hint optimizes Solr query execution by favoring local shard replicas for servicing the request ([ Enable Solr preferLocalShards Feature – Lucidworks](https://support.lucidworks.com/hc/en-us/articles/18214080218519--Enable-Solr-preferLocalShards-Feature#:~:text=When%20a%20query%20with%20,and%20improving%20query%20response%20times)). In our scenario, we have 1 shard with 3 replicas (on presumably 3 nodes). By enabling this parameter:

In practice, if our Micronaut service always hits one specific Solr node, that node will handle all partitioned queries on its local replica. (All docs are on that replica since it’s a full copy of the shard.) If needed, you could even target different replicas per thread (e.g. by varying the base URL per thread) to spread load, but with preferLocalShards=true it’s typically sufficient to let Solr use the local core. This parameter is deprecated in newer Solr versions (replaced by shards.preference=replica.location:local), but the effect is the same ([ Enable Solr preferLocalShards Feature – Lucidworks](https://support.lucidworks.com/hc/en-us/articles/18214080218519--Enable-Solr-preferLocalShards-Feature#:~:text=q%3Dmy_query%26preferLocalShards%3Dtrue)).

5. Limiting Data Transfer Overhead (Fields & Compression)

To improve performance, fetch only the necessary data and compress it:

Implementation details: In the Micronaut HTTP request builder, add .header("Accept-Encoding", "gzip"). Also ensure the Solr server has gzip enabled (it typically is, or can be enabled via Jetty’s GzipHandler). The benefit is significant – as noted in one case, enabling gzip cut network usage to 25% and improved response times by 60% for large transfers ([Deploying SolrCloud across multiple Data Centers (DC): Performance - Abobos Software](https://abobos.com/2015/12/04/deploying-solrcloud-across-multiple-data-centers-dc-performance/#:~:text=1,slight%20increase%20in%20CPU%20usage)).

Combining these, each query URL will look roughly like:

http://<solr-host>/solr/<collection>/select?q=*:*&fq=...partition filter...&rows=1000&cursorMark=...&fl=id,title,summary&preferLocalShards=true

with the Accept-Encoding: gzip HTTP header included. The response will be a JSON (or XML) containing just the needed fields and next cursorMark, in compressed form over the wire.

6. Error Handling and Retries

When running many concurrent requests and deep scans, robust error handling is crucial. Our Micronaut-based system should:

By incorporating these measures, the system can gracefully handle network blips or Solr node issues. For instance, if thread 2 encounters a timeout, it can log and retry that request while threads 0,1,3,...N-1 continue fetching their data. After the parallel execution completes, you can aggregate results from all partitions (and maybe combine any errors). In Micronaut, using reactive composition (e.g. Flux.mergeDelayError) allows all streams to run to completion and still capture errors.

7. Putting It All Together (Pseudo-Code Example)

Below is a high-level pseudo-code sketch of how these pieces come together in a Micronaut service method:

@Singleton
public class SolrQueryService {

    @Inject @Client("${solr.base-url}") // Base URL of Solr server
    RxHttpClient solrClient;

    @Value("${solr.numThreads}")
    int numThreads;

    public Flowable<List<SolrDocument>> fetchAllDocuments() {
        // Launch N parallel partition queries
        return Flowable.range(0, numThreads)
            .flatMap(partition -> fetchPartition(partition)
                    .retry(3)  // retry each partition up to 3 times on error
                    .onErrorReturnItem(Collections.emptyList())  // handle failure
            )
            .toList();  // gather results from all partitions into one list
    }

    private Flowable<SolrDocument> fetchPartition(int partitionIndex) {
        // Build the partition filter query string
        String fqPartition = String.format("{!frange l=%d u=%d}mod(hash(md5_hash_field),%d)",
                                           partitionIndex, partitionIndex, numThreads);
        // Prepare initial request (cursorMark=*)
        URI uri = UriBuilder.of("/select")
                    .queryParam("q", "*:*")
                    .queryParam("fq", fqPartition)
                    .queryParam("rows", 1000)
                    .queryParam("cursorMark", "*")
                    .queryParam("fl", "id,title,summary")
                    .queryParam("preferLocalShards", "true")
                    .build();
        HttpRequest<?> req = HttpRequest.GET(uri)
                    .accept("application/json")
                    .header("Accept-Encoding", "gzip");
        
        // Use recursive or iterative approach to follow the cursorMarks
        return Flowable.generate(() -> req, (stateReq, emitter) -> {
            solrClient.retrieve(stateReq, String.class)  // get JSON as string
                      .subscribe(responseJson -> {
                          // parse JSON (e.g. with Jackson) to extract docs and nextCursorMark
                          SolrResponse solrResp = parseResponse(responseJson);
                          List<SolrDocument> docs = solrResp.getDocuments();
                          String nextCursor = solrResp.getNextCursorMark();
                          
                          // Emit the fetched documents downstream
                          docs.forEach(emitter::onNext);
                          
                          if (nextCursor == null || nextCursor.isEmpty() 
                                  || nextCursor.equals(solrResp.getCursorMarkSent())) {
                              emitter.onComplete(); // no more results
                          } else {
                              // Prepare next request with the new cursorMark
                              URI nextUri = UriBuilder.of("/select")
                                       .queryParam("q", "*:*")
                                       .queryParam("fq", fqPartition)
                                       .queryParam("rows", 1000)
                                       .queryParam("cursorMark", nextCursor)
                                       .queryParam("fl", "id,title,summary")
                                       .queryParam("preferLocalShards", "true")
                                       .build();
                              HttpRequest<?> nextReq = HttpRequest.GET(nextUri)
                                      .accept("application/json")
                                      .header("Accept-Encoding", "gzip");
                              emitter.set(nextReq); // update state for next iteration
                          }
                      }, error -> {
                          emitter.onError(error); // propagate error to trigger retry
                      });
        });
    }
}

In this outline, fetchAllDocuments() orchestrates the parallel partition fetches and combines the results. The fetchPartition(index) method handles one partition by looping through cursor-based pages (using a Flowable.generate to emit items as they come). We build the Solr query with all necessary params (including the partition filter and preferLocalShards=true and gzip header). The use of retry(3) on each partition’s Flowable ensures transient failures are retried. We also catch any final error per partition and substitute an empty list (preventing one failure from collapsing the whole stream).

Note: The actual implementation might differ (for example, using Project Reactor’s Flux.create or Micronaut’s annotation-driven client with @Get that returns a reactive type). The key is that the design maximizes parallelism (N concurrent queries, each pulling batches of data) while minimizing overhead on Solr and the network. By partitioning the data, using Solr’s efficient cursor for deep paging, and leveraging local shard execution and compressed responses, this Micronaut-based system can retrieve a large Solr result set as quickly and smoothly as possible.

8. Conclusion

By combining these strategies – mod-hash partitioning, cursorMark pagination, reactive concurrent clients, local shard preference, and lean responses with compression – the Micronaut application will query the Solr collection in parallel chunks and stream out all documents efficiently. This approach avoids the pitfalls of serial or deep offset queries and fully utilizes the SolrCloud’s replica architecture. Each thread works on a distinct subset, queries remain mostly local to the Solr node ([ Enable Solr preferLocalShards Feature – Lucidworks](https://support.lucidworks.com/hc/en-us/articles/18214080218519--Enable-Solr-preferLocalShards-Feature#:~:text=When%20a%20query%20with%20,and%20improving%20query%20response%20times)), and the data transfer is optimized to be as small and fast as possible (only required fields, gzip-compressed ([Deploying SolrCloud across multiple Data Centers (DC): Performance - Abobos Software](https://abobos.com/2015/12/04/deploying-solrcloud-across-multiple-data-centers-dc-performance/#:~:text=The%20SolrJ%20client%20needs%20to,utility%20class%20is%20being%20used))). Furthermore, the built-in error handling and retry logic ensure that if any thread encounters an issue, it can recover or fail gracefully without stopping the entire data fetch. The end result is a high-throughput Solr querying service that maximizes parallelism while maintaining efficiency and reliability.

===================

Updated Implementation:

This revision ensures only one job can run at a time, and each thread's progress is tracked individually. It also integrates with Micronaut's management health checks without marking the service as DOWN when a Solr crawl is running.


🔹 Changes from Previous Version

  1. Single Job Execution:

    • A job cannot start if one is already running.
    • New requests will return a "JOB_ALREADY_RUNNING" response.
  2. Per-Thread Status Tracking:

    • Each thread's progress is recorded separately.
    • The /status endpoint returns details for all running threads.
  3. Micronaut Health Check Integration:

    • Health status reports the job's progress.
    • The service remains UP regardless of the crawl status.

1️⃣ Fire-and-Forget Controller (with Job Locking)

import io.micronaut.http.annotation.*;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.annotation.ExecuteOn;
import jakarta.inject.Inject;
import reactor.core.publisher.Mono;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

@Controller("/solr")
public class SolrController {

    private final SolrQueryService solrQueryService;
    private final ConcurrentHashMap<Integer, SolrThreadStatus> threadStatusMap = new ConcurrentHashMap<>();
    private final AtomicBoolean jobRunning = new AtomicBoolean(false);

    @Inject
    public SolrController(SolrQueryService solrQueryService) {
        this.solrQueryService = solrQueryService;
    }

    @Post("/start")
    @ExecuteOn(TaskExecutors.IO)
    public Mono<String> startSolrProcessing() {
        if (!jobRunning.compareAndSet(false, true)) {
            return Mono.just("JOB_ALREADY_RUNNING");
        }

        String jobId = UUID.randomUUID().toString();
        solrQueryService.fetchAllDocuments(jobId, threadStatusMap, () -> jobRunning.set(false)).subscribe();
        return Mono.just("JOB_STARTED: " + jobId);
    }

    @Get("/status")
    public Mono<ConcurrentHashMap<Integer, SolrThreadStatus>> getStatus() {
        return Mono.just(threadStatusMap);
    }
}

2️⃣ Solr Query Service (Parallel Execution + Thread Tracking)

import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.RxHttpClient;
import io.micronaut.http.uri.UriBuilder;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

@Singleton
public class SolrQueryService {

    private final RxHttpClient httpClient;
    private final int numThreads;
    
    @Inject
    public SolrQueryService(@Client("${solr.base-url}") RxHttpClient httpClient,
                            @Value("${solr.numThreads}") int numThreads) {
        this.httpClient = httpClient;
        this.numThreads = numThreads;
    }

    public Mono<Void> fetchAllDocuments(String jobId, 
                                        ConcurrentHashMap<Integer, SolrThreadStatus> threadStatusMap,
                                        Consumer<Void> onComplete) {
        threadStatusMap.clear(); // Reset per-thread status
        
        return Flux.range(0, numThreads)
                   .flatMap(partition -> fetchPartition(jobId, partition, threadStatusMap))
                   .doFinally(signalType -> onComplete.accept(null))
                   .then();
    }

    private Flux<Void> fetchPartition(String jobId, int partition, 
                                      ConcurrentHashMap<Integer, SolrThreadStatus> threadStatusMap) {
        threadStatusMap.put(partition, new SolrThreadStatus(partition));

        return Flux.generate(() -> "*", (cursorMark, sink) -> {
            URI uri = UriBuilder.of("/select")
                                .queryParam("q", "*:*")
                                .queryParam("fq", "{!frange l=" + partition + " u=" + partition + "}mod(hash(md5_hash_field)," + numThreads + ")")
                                .queryParam("rows", 1000)
                                .queryParam("cursorMark", cursorMark)
                                .queryParam("fl", "id,title,summary")
                                .queryParam("preferLocalShards", "true")
                                .build();
            
            httpClient.retrieve(uri, String.class)
                      .subscribe(responseJson -> {
                          SolrResponse solrResp = parseResponse(responseJson);
                          int fetchedDocs = solrResp.getDocuments().size();
                          String nextCursorMark = solrResp.getNextCursorMark();

                          threadStatusMap.computeIfPresent(partition, (id, status) -> status.updateProgress(fetchedDocs));

                          if (nextCursorMark == null || nextCursorMark.isEmpty() || nextCursorMark.equals(cursorMark)) {
                              sink.complete();
                          } else {
                              sink.next(nextCursorMark);
                          }
                      }, sink::error);
            return cursorMark;
        });
    }

    private SolrResponse parseResponse(String json) {
        return new SolrResponse(List.of(), ""); // Implement JSON parsing
    }
}

3️⃣ Solr Thread Status Tracking

import java.util.concurrent.atomic.AtomicInteger;

public class SolrThreadStatus {
    private final int threadId;
    private String status;
    private final AtomicInteger documentsProcessed;

    public SolrThreadStatus(int threadId) {
        this.threadId = threadId;
        this.status = "RUNNING";
        this.documentsProcessed = new AtomicInteger(0);
    }

    public SolrThreadStatus updateProgress(int count) {
        this.documentsProcessed.addAndGet(count);
        return this;
    }

    public int getThreadId() {
        return threadId;
    }

    public String getStatus() {
        return status;
    }

    public int getDocumentsProcessed() {
        return documentsProcessed.get();
    }
}

4️⃣ Micronaut Health Check Integration

Micronaut’s health endpoint reports job progress without marking the service as DOWN.

import io.micronaut.health.HealthCheckResponse;
import io.micronaut.health.HealthIndicator;
import io.micronaut.health.annotation.Health;
import jakarta.inject.Singleton;

import java.util.concurrent.ConcurrentHashMap;

@Singleton
@Health
public class SolrHealthIndicator implements HealthIndicator {

    private final ConcurrentHashMap<Integer, SolrThreadStatus> threadStatusMap;
    
    public SolrHealthIndicator(SolrController controller) {
        this.threadStatusMap = controller.getThreadStatusMap();
    }

    @Override
    public HealthCheckResponse getResult() {
        HealthCheckResponse.Builder builder = HealthCheckResponse.builder("solr-crawl").up();
        
        threadStatusMap.forEach((threadId, status) ->
            builder.details("Thread-" + threadId, "Processed: " + status.getDocumentsProcessed() + " docs")
        );

        return builder.build();
    }
}

🔥 How It Works

  1. Only One Job at a Time

    • If a job is running, new requests return JOB_ALREADY_RUNNING.
    • Job completes automatically when all partitions are processed.
  2. Each Thread's Status is Tracked

    • GET /solr/status returns each thread's progress.
  3. Health Check Shows Progress

    • GET /health reports the Solr job’s status without marking the service as DOWN.

🚀 Example Flow

Start a Solr Job

curl -X POST http://localhost:8080/solr/start

Response: "JOB_STARTED: 12345-abcdef"


🔎 Check Solr Job Status

curl -X GET http://localhost:8080/solr/status

Response:

{
    "0": { "threadId": 0, "status": "RUNNING", "documentsProcessed": 5000 },
    "1": { "threadId": 1, "status": "RUNNING", "documentsProcessed": 7000 },
    "2": { "threadId": 2, "status": "RUNNING", "documentsProcessed": 3200 }
}

📡 Check Health

curl -X GET http://localhost:8080/health

Response:

{
    "name": "solr-crawl",
    "status": "UP",
    "details": {
        "Thread-0": "Processed: 5000 docs",
        "Thread-1": "Processed: 7000 docs",
        "Thread-2": "Processed: 3200 docs"
    }
}

🎯 Benefits

Fire-and-forget job execution
Only one job can run at a time
Each thread has its own progress tracking
Health check integrates cleanly with Micronaut

Would you like any logging improvements or retries added? 🚀

⚠️ **GitHub.com Fallback** ⚠️