Simple Usage of CompletableFuture - HolmesJJ/OOP-FP GitHub Wiki

A.java

import java.util.concurrent.CompletableFuture;

class A {

    private final int x;

    A() {
        this(0);
    }

    A(int x) {
        this.x = x;
    }

    void sleep() {
        System.out.println(Thread.currentThread().getName() + " " + x);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
    }

    A incr() {
        sleep();
        return new A(this.x + 1);
    }

    A decr() {
        sleep();
        if (x < 0) {
            throw new IllegalStateException();
        }
        return new A(this.x - 1);
    }

    /**
     * A foo(A a) {
     *     return a.incr().decr();
     * }
     */

    /**
     * CompletableFuture<A> foo(A a) {
     *     return CompletableFuture.supplyAsync(() -> a.incr().decr());
     * }
     */

    /**
     * CompletableFuture<A> foo(A a) {
     *     return CompletableFuture.supplyAsync(a::incr).thenApply(A::decr);
     * }
     */

    CompletableFuture<A> foo(A a) {
        return CompletableFuture.supplyAsync(a::incr).thenApplyAsync(A::decr);
    }

    /**
     * A bar(A a) {
     *     return a.incr();
     * }
     */

    CompletableFuture<A> bar(A a) {
        return CompletableFuture.supplyAsync(a::incr);
    }

    /**
     * A baz(A a, int x) {
     *     if (x == 0) {
     *         return new A(0);
     *     } else {
     *         return a.incr().decr();
     *     }
     * }
     */

    CompletableFuture<A> baz(A a, int x) {
        if (x == 0) {
            return CompletableFuture.completedFuture(new A(0));
        } else {
            return CompletableFuture.supplyAsync(() -> a.incr().decr());
        }
    }

    B f(A a) {
        System.out.println(Thread.currentThread().getName() + " A: " + a);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        return new B(2);
    }

    C g(B b) {
        System.out.println(Thread.currentThread().getName() + " B: " + b);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        return new C(3);
    }

    D h(B b) {
        System.out.println(Thread.currentThread().getName() + " B: " + b);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        return new D(3);
    }

    D h(C c) {
        System.out.println(Thread.currentThread().getName() + " C: " + c);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        return new D(4);
    }

    E i(C c, D d) {
        System.out.println(Thread.currentThread().getName() + " C: " + c + ", D: " + d);
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        return new E(Integer.parseInt(c.toString()) + Integer.parseInt(d.toString()));
    }

    @Override
    public String toString() {
        return "" + x;
    }
}

B.java

public class B {

    private final int x;

    B() {
        this(0);
    }

    B(int x) {
        this.x = x;
    }

    @Override
    public String toString() {
        return "" + x;
    }
}

C.java

public class C {

    private final int x;

    C() {
        this(0);
    }

    C(int x) {
        this.x = x;
    }

    @Override
    public String toString() {
        return "" + x;
    }
}

D.java

public class D {

    private final int x;

    D() {
        this(0);
    }

    D(int x) {
        this.x = x;
    }

    @Override
    public String toString() {
        return "" + x;
    }
}

E.java

public class E {

    private final int x;

    E() {
        this(0);
    }

    E(int x) {
        this.x = x;
    }

    @Override
    public String toString() {
        return "" + x;
    }
}

Main.java

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {

    public static void main(String[] args) {

        // q1a();
        // q1b();
        // q1d();
        // q1e();
        // q2a();
        // q2b();
        q2c();
    }

    public static void q1a() {

        final long START_TIME = System.currentTimeMillis();

        A a = new A();
        CompletableFuture<A> cf = a.foo(a);

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!cf.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }

        try {
            System.out.println(cf.get());
        } catch (InterruptedException | ExecutionException e) {
            e.fillInStackTrace();
        }
    }

    public static void q1b() {

        final long START_TIME = System.currentTimeMillis();

        A a = new A();
        CompletableFuture<A> cf = a.foo(new A()).thenCompose(a::bar);

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!cf.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }

        try {
            System.out.println(cf.get());
        } catch (InterruptedException | ExecutionException e) {
            e.fillInStackTrace();
        }
    }

    public static void q1d() {

        final long START_TIME = System.currentTimeMillis();

        A a = new A();
        CompletableFuture<Void> allCFs = CompletableFuture.allOf(
                a.foo(new A()),
                a.bar(new A()),
                a.baz(new A(), 1));

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!allCFs.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }
    }

    public static void q1e() {

        final long START_TIME = System.currentTimeMillis();

        CompletableFuture<A> cf = CompletableFuture
                .supplyAsync(() -> new A().decr().decr())
                .handle((result, exception) -> {
                    if (result == null) {
                        System.out.println("ERROR: " + exception);
                        return new A();
                    } else {
                        return result;
                    }
                });

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!cf.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }

        try {
            System.out.println(cf.get());
        } catch (InterruptedException | ExecutionException e) {
            e.fillInStackTrace();
        }
    }

    public static void q2a() {

        final long START_TIME = System.currentTimeMillis();

        A a = new A(1);

        /**
         * B b = a.f(a);
         * C c = a.g(b);
         * D d = a.h(c);
         */

        CompletableFuture<D> cf = CompletableFuture
                .supplyAsync(() -> a.f(a))
                .thenApply(a::g)
                .thenApply(a::h);

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!cf.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }

        try {
            System.out.println(cf.get());
        } catch (InterruptedException | ExecutionException e) {
            e.fillInStackTrace();
        }
    }

    public static void q2b() {

        final long START_TIME = System.currentTimeMillis();

        A a = new A(1);

        /**
         * B b = a.f(a);
         * C c = a.g(b);
         * a.h(c); // no return value
         */

        CompletableFuture<Void> cf = CompletableFuture
                .supplyAsync(() -> a.f(a))
                .thenApply(a::g)
                .thenAccept(a::h);

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!cf.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }
    }

    public static void q2c() {

        final long START_TIME = System.currentTimeMillis();

        A a = new A(1);

        /**
         * B b = f(a);
         * C c = g(b);
         * D d = h(b);
         * E e = i(c, d);
         */

        CompletableFuture<B> cfb = CompletableFuture
                .supplyAsync(() -> a.f(a));
        CompletableFuture<C> cfc = cfb.thenApply(a::g);
        CompletableFuture<D> cfd = cfb.thenApply(a::h);
        CompletableFuture<E> cfe = cfc.thenCombine(cfd, a::i);

        // Busy waiting
        // Do not end the main thread immediately,
        // otherwise the thread pool used by CompletableFuture
        // by default will be closed immediately
        while (!cfe.isDone()) {
            try {
                System.out.println("CompletableFuture Processing in other thread... (" +
                        (System.currentTimeMillis() - START_TIME) + ")");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.fillInStackTrace();
            }
        }

        try {
            System.out.println(cfe.get());
        } catch (InterruptedException | ExecutionException e) {
            e.fillInStackTrace();
        }
    }
}
⚠️ **GitHub.com Fallback** ⚠️