Parallelizer Explained - google/mug GitHub Wiki

Parallelism is not Concurrency

While sometimes used interchangeably, "concurrency" means to run more than one tasks at the same time, while "parallelism" is about splitting one single task into sub-tasks to be executed at the same time.

JDK's Executor library is designed for "concurrency", particularly, one task's failure does not prevent the other tasks from running to completion. As a result, the library swallows uncaught exceptions from the concurrent tasks.

Java 8's parallel stream library on the other hand is designed for "parallelism". Any sub task's failure fails (fast) the entire task, with the exception propagated to the caller.

But parallel stream isn't a good fit for IO-bound parallel problems.

The problem of parallelizing a large number of IO-bound tasks

Not all Java code are CPU-bound. IO-bound programs can't easily take advantage of parallel streams because:

  1. They typically want to use an existing ExecutorService to manage the thread pool.
  2. They may need to respond to interruptions, so that when these threads are waiting for an RPC to complete, the threads can respond to interruption and cancellations.

Imagine you have a Java program that needs to read a large number of request protobufs from an input file, and need to send these requests to an RPC server (for example to do AvB test). How do you do this? Conceptually it's as simple as this:

for (Request request = readNextRequest(); request != null; request = readNextRequest()) {
  send(request);
}

But this code leaves a lot to be desired:

  • send() must not be synchronous. Otherwise we'd be only sending one request a time, and the program will take days to complete.
  • If we make send() asynchronous, assuming we can read in 10000 requests per second, we don't want to send all 10000 QPS to the server. The server probably only expects us to send 20 concurrent requests at any time.
  • It's not a fire-and-forget. We will need to read the response and do some comparison.

A revised program looks like:

ExecutorService executor = Executors.newFixedThreadPool(20);  // limit 20 outgoing rpcs.
try {
  for (Request request = readNextRequest(); request != null; request = readNextRequest()) {
    executor.submit(() -> sendWaitAndCheckResponse(request);
  }
} finally {
  executor.shutdown();
}

This makes the code asynchronous and uses a fixed-size thread pool to limit the outgoing rpcs. But, if you run it, you'll eventually get OutOfMemoryError. Why?

While the outgoing rpcs are limited, we are adding all these requests into the thread pool's work queue. If we can read in 10K requests per second, and if each request has 1K bytes, the memory usage will quickly build up.

Another issue is that, now sendWaitAndCheckResponse() is executed asynchronously. If there is any bug in that function, like throwing NullPointerException or IllegalArgumentException, the program will have no idea and will continue to run even if all threads are crapping out.

How does Parallelizer solve this problem?

ExecutorService executor = Executors.newFixedThreadPool(20);  // At most 20 outgoing rpcs
try {
  Parallelizer parallelizer = new Parallelizer(executor, 1000); // At most 1000 pending requests
  parallelizer.parallelize(
      readRequestStream(),
      request -> {
        try {
          Response response = sendAndWait(request);
          checkResponseAndLog(response);
        } catch (RpcException e) {
          // Rpc exception from server is expected
          // handle it.
        }
      });
} finally {
  executor.shutdown();
}

This addresses all of the problems mentioned above:

  • The Parallelizer limits the pending queue size so we won't run into memory issues.
  • If the callback has a bug, the exception will be propagated to the main thread and the program will stop.
  • For expected exceptions such as RpcException, the callback should handle it.

Additionally, the code handles interruption, so that if this Parallelizer code itself runs in a server thread, the thread can be canceled normally.