Parallel Stream - HolmesJJ/OOP-FP GitHub Wiki

Definition

Parallel Stream provides parallel processing of streams, implemented by Fork/Join framework. It is an implementation of asynchronous programming.

Fork/Join Framework

The Fork/Join framework is a parallel execution framework provided in Java7. Its strategy is to Divide-and-Conquer, which is to divide a large task into many small subtasks, and then merge the results after the subtasks are executed.

upper_bounded_wildcard

Simple Example

public static void main(String[] args) {

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    sequentialStream();
    parallelStream();

    System.out.println("Main Thread End");
}

public static void sequentialStream() {
    long start = System.currentTimeMillis();
    IntStream.rangeClosed(1, 10).forEach(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println(x + " Sequential Stream Id : " + Thread.currentThread());
    });
    long end = System.currentTimeMillis();
    System.out.println("Sequential Stream: " + (end - start));
}

public static void parallelStream() {
    long start = System.currentTimeMillis();
    IntStream.rangeClosed(1, 10).parallel().forEach(x -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println(x + " Parallel Stream Id : " + Thread.currentThread());
    });
    long end = System.currentTimeMillis();
    System.out.println("Parallel Stream: " + (end - start));
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
1 Sequential Stream Id : Thread[main,5,main]
2 Sequential Stream Id : Thread[main,5,main]
3 Sequential Stream Id : Thread[main,5,main]
4 Sequential Stream Id : Thread[main,5,main]
5 Sequential Stream Id : Thread[main,5,main]
6 Sequential Stream Id : Thread[main,5,main]
7 Sequential Stream Id : Thread[main,5,main]
8 Sequential Stream Id : Thread[main,5,main]
9 Sequential Stream Id : Thread[main,5,main]
10 Sequential Stream Id : Thread[main,5,main]
Sequential Stream: 10057
4 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-3,5,main]
2 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-27,5,main]
6 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-23,5,main]
8 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-9,5,main]
3 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
10 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-31,5,main]
5 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-13,5,main]
7 Parallel Stream Id : Thread[main,5,main]
1 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-17,5,main]
9 Parallel Stream Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
Parallel Stream: 1019
Main Thread End

The syntax difference between sequentialStream and parallelStream is small. From the execution result, the sequentialStream is output in order, while the parallelStream is output out of order; parallelStream takes one-fifth of execution time of sequentialStream.
In the above code, the forEach method will create different subtasks to operate each element, which will be processed by the ForkJoinPool mentioned above.

View and Update the number of default thread pools of ForkJoinPool

View

ForkJoinPool.commonPool().getParallelism()

Update

// By setting the system property during program run
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")
// or including the following flag when running the program
-Djava.util.concurrent.ForkJoinPool.common.parallelism = {n}

Side Effect of Parallel Stream

  • Not Thread Safe

Simple Example

public static void main(String[] args) {

    List<Integer> sequentialList = new ArrayList<>();
    List<Integer> parallelList = new ArrayList<>();

    IntStream.range(0, 1000).forEach(sequentialList::add);
    IntStream.range(0, 1000).parallel().forEach(parallelList::add);

    System.out.println("Sequential List size: " + sequentialList.size());
    System.out.println("Parallel List size: " + parallelList.size());
}

/* Output */
Sequential List size: 1000
Parallel List size: 778

Detail Example

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
    
    List<Integer> parallelList = new ArrayList<>();
    IntStream.range(0, 10).parallel().forEach(x -> {
        List<Integer> tmp = new ArrayList<>(parallelList);
        System.out.println((System.currentTimeMillis() - START_TIME) + 
                " Asynchronous Thread Id : " + Thread.currentThread() + " " + tmp.toString());
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        parallelList.add(x);
    });

    System.out.println(parallelList.toString() + " Parallel List size :" + parallelList.size());
}

/* Output */
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-3,5,main] []
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main] []
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main] []
5 Asynchronous Thread Id : Thread[main,5,main] []
5 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-7,5,main] []
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main] [8]
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main] [8]
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-3,5,main] [8]
533 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-7,5,main] [8]
1045 Asynchronous Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main] [8, 0, 9, 3]
[8, 0, 9, 3, 7] Parallel List size: 5

The operations executed in IntStream.range(0, 10).parallel().forEach() are not thread safe. There is no data synchronization between different threads at the same time.
According to Fork/Join Framework, we know that IntStream.range(0, 10).parallel().forEach() will be split into multiple subtasks, each subtask is only responsible for processing a small part of the data, and then process these subtasks with asynchronous multithreading. The problem is that parallelList is not a thread safe container, and concurrently calling add() will cause thread safety problems.

/* add() source code */
public boolean add(E e) {
    modCount++;
    add(e, elementData, size);
    return true;
}

private void add(E e, Object[] elementData, int s) {
    if (s == elementData.length)
        elementData = grow();
    elementData[s] = e;
    size = s + 1;
}

add() consists of the following 4 steps:

  1. Determine the length of elementData. If the capacity of the current array is full, grow() will automatically expand the new capacity to 1.5 times of the original capacity, and then copy the elements in the original array to the new array to complete the expansion.
  2. Add e to the position of size, that is, elementData[size] = e
  3. Read size
  4. size + 1

Due to the memory visibility problem, when thread A reads the size from the memory, it adds 1 to the size and then writes it to the memory. In this process, thread B may also modify the size and write to the memory. At this time, the update of thread A will overwrite the update of thread B, which also explains why the length of parallelList is smaller than the IntStream.range(0, 10) after the Parallel Stream is completed.

If you need thread safety, you can do the following:

  1. Use collect() and reduce() interfaces (support stateful operations)
public static void main(String[] args) {

    List<Integer> list = IntStream.rangeClosed(1, 100)
            .boxed()
            .parallel()
            .collect(Collectors.toList());
    System.out.println(list);
    System.out.println(list.size());
}

/* Output */
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
20
  1. Try to use forEach() as little as possible for some stateful operations during concurrency. If you have to use it, you can use forEachOrdered() instead
public static void main(String[] args) {

    List<Integer> list = new ArrayList<>();
    IntStream.rangeClosed(1, 20)
            .parallel()
            .forEachOrdered(list::add);
    System.out.println(list);
    System.out.println(list.size());
}

/* Output */
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
20
  1. If you must use forEach(), you can use safe shared variable
public static void main(String[] args) {

    List<Integer> list1 =  Collections.synchronizedList(new ArrayList<>());
    IntStream.rangeClosed(1, 20)
            .parallel()
            .forEach(list1::add);

    System.out.println(list1);
    System.out.println(list1.size());
}

/* Output */
[13, 15, 14, 11, 12, 1, 16, 17, 20, 19, 3, 7, 5, 4, 8, 6, 18, 2, 9, 10]
20
  1. If you must use forEach(), you can also use thread safe CopyOnWriteArrayList()
public static void main(String[] args) {

    List<Integer> list = new CopyOnWriteArrayList<>();
    IntStream.rangeClosed(1, 20)
            .parallel()
            .forEach(list::add);
    System.out.println(list);
    System.out.println(list.size());
}

/* Output */
[13, 15, 14, 12, 11, 7, 1, 17, 3, 8, 20, 10, 16, 19, 5, 4, 6, 9, 2, 18]
20
  1. If you must use forEach(), you can also add lock in forEach()
public static void main(String[] args) {

    Lock lock = new ReentrantLock();
    List<Integer> list =  new ArrayList<>();
    IntStream.rangeClosed(1, 20)
            .parallel()
            .forEach(e -> {
                lock.lock();
                list.add(e);
                lock.unlock();
            });
    System.out.println(list);
    System.out.println(list.size());
}

/* Output */
[13, 15, 14, 11, 12, 3, 18, 4, 8, 16, 20, 19, 6, 17, 7, 2, 9, 10, 5, 1]
20

In Summary

  • Use Parallel Stream to write concurrent code concisely and efficiently.
  • The execution of Parallel Stream is out of order.
  • When using Parallel Stream, tasks should be stateless, because Parallel Stream is non thread safe, which may bring uncertainty in the results with stateful operations.
⚠️ **GitHub.com Fallback** ⚠️