CompletableFuture - HolmesJJ/OOP-FP GitHub Wiki

Definition

  • Create an asynchronous operation
  • Functor and Monad

Create CompletableFuture

new CompletableFuture()

Simply create a CompletableFuture using a parameterless constructor:

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    CompletableFuture<Integer> cf = calculate(Integer::sum, 1, 2);;

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf.isDone()) {
        try {
            System.out.println("Processing in asynchronous thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }

    System.out.println("Main Thread End");
}

public static CompletableFuture<Integer> calculate(
        BiFunction<Integer, Integer, Integer> fn, int value1, int value2) {

    // This is a parameterless constructor
    CompletableFuture<Integer> cf = new CompletableFuture<Integer>();

    // The CompletableFuture will not stop and this thread will be
    // blocked until the CompletableFuture completes.
    new Thread(() -> {
        System.out.println("Asynchronous Thread Start");
        System.out.println("Asynchronous Thread Id : " + Thread.currentThread());

        int result = fn.apply(value1, value2);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }

        // At here, you can call cf.complete() to complete the
        // CompletableFuture after the calculation is done.
        cf.complete(result);

        System.out.println("Hello World");
        System.out.println("Asynchronous Thread End");
    }).start();

    return cf;
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
Asynchronous Thread Start
Asynchronous Thread Id : Thread[Thread-0,5,main]
Processing in asynchronous thread... (22)
Processing in asynchronous thread... (1031)
Processing in asynchronous thread... (2031)
Hello World
Asynchronous Thread End
Main Thread End

completedFuture()

Simply create a completed CompletableFuture with the given value

public static void main(String[] args) {

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    System.out.println("CompletableFuture Start");
    CompletableFuture<String> cf = CompletableFuture.completedFuture("CF is completed");

    try {
        System.out.println(cf.get());
    } catch (ExecutionException | InterruptedException e) {
        e.fillInStackTrace();
    }

    System.out.println("CompletableFuture End");
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CF is completed
CompletableFuture End
Main Thread End
Method Parameter Type Return Type Description
CompletableFuture<Void> runAsync(Runnable runnable) Runnable runnable CompletableFuture<Void> Evaluate the CompletableFuture without callback

runAsync()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Run a task specified by a Runnable Object asynchronously.
    CompletableFuture<Void> cf = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
            System.out.println("CompletableFuture Start");
            System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            System.out.println("I'll run in a separate thread than the main thread.");
            System.out.println("CompletableFuture End");
        }
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
CompletableFuture Processing in other thread... (1022)
CompletableFuture Processing in other thread... (2025)
I'll run in a separate thread than the main thread.
CompletableFuture End
Main Thread End

Callback on CompletableFuture

Method Parameter Type Return Type Description
<U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) Function<? super T, ? extends U> fn CompletableFuture<U> Process the result from CompletableFuture and return
CompletableFuture<Void> thenAccept(Consumer<? super T> action) Consumer<? super T> action CompletableFuture<Void> 1. Method will be executed after the CompletableFuture are done
2. Consumer the result from CompletableFuture without return
3. Often used as the last callback function at the end of the method chains
CompletableFuture<Void> thenRun(Runnable action) Runnable action CompletableFuture<Void> 1. Method will be executed after the CompletableFuture are done
2. No Processing the result from CompletableFuture nor return
3. Often used as the last callback function at the end of the method chains

thenApply()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Attach a callback to cf1 using thenApply()
    CompletableFuture<String> cf2 = cf1.thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println(x + " World");
        return x + " World";
    });

    // Attach a callback to cf2 using thenApply()
    CompletableFuture<String> cf3 = cf2.thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println(x + "!");
        System.out.println("CompletableFuture End");
        return x + "!";
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
CompletableFuture Processing in other thread... (1013)
CompletableFuture Processing in other thread... (2016)
Hello
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (3022)
CompletableFuture Processing in other thread... (4036)
CompletableFuture Processing in other thread... (5050)
Hello World
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (6056)
CompletableFuture Processing in other thread... (7060)
CompletableFuture Processing in other thread... (8067)
Hello World!
CompletableFuture End
Main Thread End

thenAccept()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Attach a callback to cf1 using thenAccept()
    CompletableFuture<Void> cf2 = cf1.thenAccept(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println(x + "!");
        System.out.println("CompletableFuture End");
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf2.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
CompletableFuture Processing in other thread... (1020)
CompletableFuture Processing in other thread... (2025)
Hello
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (3035)
CompletableFuture Processing in other thread... (4037)
CompletableFuture Processing in other thread... (5042)
Hello!
CompletableFuture End
Main Thread End

thenRun()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());
    
    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Attach a callback to cf1 using thenRun()
    CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Done!");
        System.out.println("CompletableFuture End");
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf2.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
CompletableFuture Processing in other thread... (1011)
CompletableFuture Processing in other thread... (2014)
Hello
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (3020)
CompletableFuture Processing in other thread... (4024)
CompletableFuture Processing in other thread... (5029)
Done!
CompletableFuture End
Main Thread End

Difference between thenApply() and thenApplyAsync()

We can find that these three methods all have a method with a suffix of Async, such as thenApplyAsync(). So what is the difference between the method with Async and the method without this suffix?
For example, for thenApply() and thenApplyAsync(), the difference between these two methods is who performs the task. If you use thenApplyAsync(), the thread of execution is to obtain a different thread from ForkJoinPool.commonPool() for execution; if you use thenApply(), then the thread of execution is the same as the thread executed by supplyAsync().

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "supplyAsync() Thread Id : " + Thread.currentThread();
    });

    CompletableFuture<String> thenApply = supplyAsync.thenApply(name ->
            name + " ------ thenApply() Thread Id: " + Thread.currentThread());

    CompletableFuture<String> thenApplyAsync = supplyAsync.thenApplyAsync(name ->
            name + "------ thenApplyAsync() Thread Id : " + Thread.currentThread());

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!thenApply.isDone() || !thenApplyAsync.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }

    System.out.println("Main Thread Id: "+ Thread.currentThread());
    try {
        System.out.println(thenApply.get());
        System.out.println(thenApplyAsync.get());
    } catch (InterruptedException | ExecutionException e) {
        e.fillInStackTrace();
    }
    
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
CompletableFuture Processing in other thread... (1013)
CompletableFuture Processing in other thread... (2018)
Main Thread Id: Thread[main,5,main]
supplyAsync() Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main] ------ 
thenApply() Thread Id: Thread[ForkJoinPool.commonPool-worker-19,5,main]
supplyAsync() Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]------ 
thenApplyAsync() Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
Main Thread End

Process two CompletableFutures

Method Parameter Type Return Type Description
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) Function<? super T, ? extends CompletionStage<U>> fn CompletableFuture<U> Process the result from CompletableFuture with another CompletableFuture and return
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn CompletableFuture<V> 1. Method will be executed after both CompletableFutures are done
2. Process the results from two CompletableFutures and return
<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) CompletableFuture<Void> 1. Method will be executed after both CompletableFutures are done
2. Consumer the results from two CompletableFutures without return
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) CompletionStage<? extends T> other, Function<? super T, U> fn CompletableFuture<U> 1. Method will be executed after any of CompletableFutures is done
2. Process the result from any of CompletableFutures and return
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) CompletionStage<? extends T> other, Consumer<? super T> action CompletableFuture<Void> 1. Method will be executed after any of CompletableFutures is done
2. Consumer the result from any of CompletableFutures without return
CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) CompletionStage<?> other, Runnable action CompletableFuture<Void> 1. Method will be executed after any of CompletableFutures is done
2. No processing the result from any of CompletableFutures nor return
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) CompletionStage<?> other, Runnable action CompletableFuture<Void> 1. Method will be executed after both CompletableFutures are done
2. No processing the results from two CompletableFutures nor return

thenCompose()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    CompletableFuture<String> cf2 = cf1.thenCompose(new Function<String, CompletionStage<String>>() {
        @Override
        public CompletionStage<String> apply(String x) {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.fillInStackTrace();
                }
                System.out.println(x + " World");
                System.out.println("CompletableFuture End");
                return x + " World";
            });
        }
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf2.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (8)
CompletableFuture Processing in other thread... (1023)
CompletableFuture Processing in other thread... (2025)
Hello
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Processing in other thread... (3030)
CompletableFuture Processing in other thread... (4034)
CompletableFuture Processing in other thread... (5042)
Hello World
CompletableFuture End
Main Thread End

thenCombine()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Note: cf1 and cf2 are processing at the same time
    CompletableFuture<String> cf3 = cf1.thenCombine(cf2, (x1, x2) -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println(x1 + " " + x2);
        System.out.println("CompletableFuture End");
        return x1 + " " + x2;
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Processing in other thread... (3)
Hello
CompletableFuture Processing in other thread... (1013)
World
CompletableFuture Processing in other thread... (2018)
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Processing in other thread... (3027)
CompletableFuture Processing in other thread... (4030)
Hello World
CompletableFuture End
Main Thread End

thenAcceptBoth()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Note: cf1 and cf2 are processing at the same time
    CompletableFuture<Void> cf3 = cf1.thenAcceptBoth(cf2, (x1, x2) -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println(x1 + " " + x2);
        System.out.println("CompletableFuture End");
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
Hello
CompletableFuture Processing in other thread... (1009)
World
CompletableFuture Processing in other thread... (2011)
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
Hello World
CompletableFuture End
Main Thread End

applyToEither()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Note: cf1 and cf2 are processing at the same time
    CompletableFuture<String> cf3 = cf1.applyToEither(cf2, x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println(x);
        System.out.println("CompletableFuture End");
        return x;
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
Hello
CompletableFuture Processing in other thread... (1009)
World
CompletableFuture Processing in other thread... (2011)
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
Hello World
CompletableFuture End
Main Thread End

acceptEither()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Note: cf1 and cf2 are processing at the same time
    CompletableFuture<Void> cf3 = cf1.acceptEither(cf2, x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println(x);
        System.out.println("CompletableFuture End");
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
Hello
CompletableFuture Processing in other thread... (1010)
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
Hello
CompletableFuture End
Main Thread End

runAfterEither()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Note: cf1 and cf2 are processing at the same time
    CompletableFuture<Void> cf3 = cf1.runAfterEither(cf2, () -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println("Any of CompletableFutures is done");
        System.out.println("CompletableFuture End");
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
Hello
CompletableFuture Processing in other thread... (1017)
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
Any of CompletableFutures is done
CompletableFuture End
Main Thread End

runAfterBoth()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Note: cf1 and cf2 are processing at the same time
    CompletableFuture<Void> cf3 = cf1.runAfterBoth(cf2, () -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println("Both CompletableFutures are done");
        System.out.println("CompletableFuture End");
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf3.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Processing in other thread... (4)
Hello
CompletableFuture Processing in other thread... (1015)
CompletableFuture Processing in other thread... (2022)
World
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
Both CompletableFutures are done
CompletableFuture End
Main Thread End

thenApply() VS thenCompose()

Method Function Input Function Output Return Description
thenApply() ? super T ? extends U CompletableFuture<U> It is equivalent to the map()
thenCompose() ? super T ? extends CompletionStage<U> CompletableFuture<U> It is equivalent to the flatMap()

Note:

  • CompletionStage is an interface implemented by CompletableFuture.
  • thenApply() converts the type in the CompletableFuture, like T to U, which is the same CompletableFuture.
  • thenCompose() is used to connect two CompletableFutures to generate a new CompletableFuture.

Process multiple CompletableFutures

Method Parameter Type Return Type Description
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) CompletableFuture<?>... cfs CompletableFuture<Void> Method will be executed after all CompletableFutures are done
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) CompletableFuture<?>... cfs CompletableFuture<Object> Method will be executed after any of CompletableFutures is done

allOf()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("!");
        return "!";
    });

    CompletableFuture<Void> allCFs = CompletableFuture.allOf(cf1, cf2, cf3);
    CompletableFuture<String> result = allCFs.thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            System.out.println(cf1.get() + " " + cf2.get() + cf3.get());
            return cf1.get() + " " + cf2.get() + cf3.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("CompletableFuture End");
        return "";
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!result.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-23,5,main]
CompletableFuture Processing in other thread... (4)
Hello
CompletableFuture Processing in other thread... (1023)
World
CompletableFuture Processing in other thread... (2028)
!
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-23,5,main]
Hello World!
Main Thread End

anyOf()

public static void main(String[] args) {

    final long START_TIME = System.currentTimeMillis();

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("Hello");
        return "Hello";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("World");
        return "World";
    });

    // Create a CompletableFuture
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
        System.out.println("!");
        System.out.println("CompletableFuture End");
        return "!";
    });

    CompletableFuture<Object> anyOfCF = CompletableFuture.anyOf(cf1, cf2, cf3);

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!anyOfCF.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread... (" +
                    (System.currentTimeMillis() - START_TIME) + ")");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-5,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-23,5,main]
CompletableFuture Processing in other thread... (4)
Hello
Main Thread End

CompletableFuture Exception Handling

Note: If an error occurs in supplyAsync(), thenApply() after the method chains will not be called and an exception will be thrown. If an error occurs in the first thenApply(), the second and third thenApply() after the method chains will not be called and an exception will be thrown.

Method Parameter Type Return Type Description
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) Function<Throwable, ? extends T> fn CompletableFuture<T> 1. It will be called when an exception occurs
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) BiFunction<? super T, Throwable, ? extends U> fn CompletableFuture<U> 1. It will be called regardless of whether an exception occurs
2. If an exception occurs, the result will be null, otherwise, the exception will be null

exceptionally()

public static void main(String[] args) {

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println("Hello");
        return "Hello";
    }).thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        for(int i = 0; i < x.length(); i++) {
            char c = x.charAt(i);
            if (!Character.isUpperCase(c)) {
                throw new IllegalArgumentException("HELLO must be uppercase");
            }
        }
        System.out.println(x + " World");
        return x + " World";
    }).thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println(x + "!");
        return x + "!";
    }).exceptionally(ex -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println("Oops! " + ex.getMessage());
        System.out.println("CompletableFuture End");
        return "Unknown!";
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf1.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
Hello
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Thread Id : Thread[main,5,main]
Oops! java.lang.IllegalArgumentException: HELLO must be uppercase
CompletableFuture End
Main Thread End

handle()

public static void main(String[] args) {

    System.out.println("Main Thread Start");
    System.out.println("Main Thread Id : " + Thread.currentThread());

    // Create a CompletableFuture
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("CompletableFuture Start");
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println("Hello");
        return "Hello";
    }).thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        for(int i = 0; i < x.length(); i++) {
            char c = x.charAt(i);
            if (!Character.isUpperCase(c)) {
                throw new IllegalArgumentException("HELLO must be uppercase");
            }
        }
        System.out.println(x + " World");
        return x + " World";
    }).thenApply(x -> {
        System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
        System.out.println(x + "!");
        return x + "!";
    }).handle((result, exception) -> {
        if(exception != null) {
            System.out.println("CompletableFuture Thread Id : " + Thread.currentThread());
            System.out.println("Oops! " + exception.getMessage());
            System.out.println("CompletableFuture End");
            return "Unknown!";
        }
        return result;
    });

    // Busy waiting
    // Do not end the main thread immediately,
    // otherwise the thread pool used by CompletableFuture
    // by default will be closed immediately
    while (!cf1.isDone()) {
        try {
            System.out.println("CompletableFuture Processing in other thread...");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.fillInStackTrace();
        }
    }
    System.out.println("Main Thread End");
}

/* Output */
Main Thread Start
Main Thread Id : Thread[main,5,main]
CompletableFuture Start
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
Hello
CompletableFuture Thread Id : Thread[ForkJoinPool.commonPool-worker-19,5,main]
CompletableFuture Thread Id : Thread[main,5,main]
Oops! java.lang.IllegalArgumentException: HELLO must be uppercase
CompletableFuture End
Main Thread End
⚠️ **GitHub.com Fallback** ⚠️