FutureTask原理分析 - 969251639/study GitHub Wiki

FutureTask是jdk用来支持线程返回值的异步任务,它需要结合Callable来一起配合使用
FutureTask实现了RunnableFuture接口,RunnableFuture又继承Runnable接口和Future接口,所以FutureTask本身就是一个线程的执行体,但又根据Future扩展了它的功能

public class FutureTask<V> implements RunnableFuture<V> {
    ...
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);//取消任务
    boolean isCancelled();//是否取消
    boolean isDone();//是否执行完成
    V get() throws InterruptedException, ExecutionException;//获取执行后的返回值
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;//在指定的时间范围内获取执行后的返回值
}

它有两个构造方法

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;//设置调用体
        this.state = NEW;       // 状态初始化为NEW
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);//通过Executors创建一个调用体
        this.state = NEW;       // 状态初始化为NEW
    }
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);//通过RunnableAdapter包装一个调用体
    }

    static final class RunnableAdapter<T> implements Callable<T> {//Callable的适配器
        final Runnable task;//真正的执行体
        final T result;//需要返回的值
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {//适配Callable的call方法
            task.run();//执行
            return result;//返回
        }
    }

同时它的内部共有6种状态的跳转

    private static final int NEW          = 0;//初始状态
    private static final int COMPLETING   = 1;//已完成
    private static final int NORMAL       = 2;//正常退出
    private static final int EXCEPTIONAL  = 3;//意外退出
    private static final int CANCELLED    = 4;//取消
    private static final int INTERRUPTING = 5;//中断中
    private static final int INTERRUPTED  = 6;//已中断

可能跳转为

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

另外还提供了state,runner,waiters这几个成员变量的内存偏移量,用来cas修改这些成员变量

    private volatile int state;//状态
    private volatile Thread runner;//执行的线程
    private volatile WaitNode waiters;//等待节点
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;//state偏移量
    private static final long runnerOffset;//runner偏移量
    private static final long waitersOffset;//waiters偏移量
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

另外还有一个重要的成员变量用来存储返回值

private Object outcome; // non-volatile, protected by state reads/writes

如果把一个FutureTask丢给线程池,那么线程池在在执行的时候,会调用任务的run方法

    public void run() {
        if (state != NEW ||//状态不是NEW状态则表示任务已经在执行了,直接返回
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))//通过cas修改runner偏移量,修改runner为当前线程,如果修改失败,则表示已有其他线程在执行该任务,也直接返回
            return;
        try {
            Callable<V> c = callable;//获取调度体
            if (c != null && state == NEW) {//如果未初始状态
                V result;//缓存返回的执行结果
                boolean ran;//标记是否执行成功,用来控制状态跳转
                try {
                    result = c.call();//执行任务,将返回值赋值给result变量
                    ran = true;//如果没有抛异常,则任务执行成功
                } catch (Throwable ex) {
                    result = null;//抛异常则返回null
                    ran = false;//设置任务执行失败
                    setException(ex);//处理异常情况
                }
                if (ran)//如果执行成功
                    set(result);//设置结果
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;//设置运行的线程为null
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)//如果中断
                handlePossibleCancellationInterrupt(s);//处理中断情况
        }
    }

首先来看正常的执行情况,也就是最后可以成功调用到set方法
流程为:NEW -> COMPLETING -> NORMAL

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//设置状态为NEW到COMPLETING
            outcome = v;//设置返回值
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //设置正常流程下的结束状态NORMAL
            finishCompletion();//处理执行成功的操作
        }
    }
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {//如果有线程在等待获取结果的话
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//将waiters设置为null
                for (;;) {//自旋
                    Thread t = q.thread;//等待线程
                    if (t != null) {//如果等待的线程不为空
                        q.thread = null;//将该线程的引用设置为null,GC回收
                        LockSupport.unpark(t);//唤醒该阻塞线程,说执行完了,可以去获取返回值了
                    }
                    WaitNode next = q.next;//获取下一个等待线程
                    if (next == null)//如果没有下一个等待线程,则跳出
                        break;
                    q.next = null; // unlink to help gc 将下一个节点悬空,GC回收
                    q = next;//继续唤醒下一个等待线程节点
                }
                break;
            }
        }

        done();//钩子方法,空实现

        callable = null;        // to reduce footprint
    }
    protected void done() { }

接下来看它的异常流程,也就是走到run方法中的setException方法
流程为:NEW -> COMPLETING -> EXCEPTIONAL

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//设置状态为NEW到COMPLETING
            outcome = t;//设置返回值
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //设置异常流程下的结束状态EXCEPTIONAL
            finishCompletion();//和set方法的一样都要唤醒等待返回值的线程,只不过这里返回的都是null
        }
    }

最后看他的中断流程,也就是走到run方法中的handlePossibleCancellationInterrupt方法

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)//如果已经处于中断中
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt 让出cpu
    }

接下来看get方法,该方法可以用来阻塞式的获取返回结果
get有两种方法,一个带超时,一个不带超时

    public V get() throws InterruptedException, ExecutionException {
        int s = state;//获取当前状态
        if (s <= COMPLETING)//如果还处于初始状态,也就是还没有执行完
            s = awaitDone(false, 0L);//进入等待获取
        return report(s);//根据状态获取相应的返回值
    }
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)//如果时间单位为null,则抛出空指针异常
            throw new NullPointerException();
        int s = state;//获取当前状态
        if (s <= COMPLETING &&//如果还处于初始状态,也就是还没有执行完
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//进入等待获取,如果超时了还未执行完,则抛出超时异常
            throw new TimeoutException();
        return report(s);
    }

可以看到等待获取的核心代码落在awaitDone方法上
在分析之前需要看下它的一个WaitNode内部类,它可以用来支撑做多个线程等待获取执行结果,内部通过单向链表串联起来

    static final class WaitNode {
        volatile Thread thread;//等待的线程
        volatile WaitNode next;//下一个等待线程
        WaitNode() { thread = Thread.currentThread(); }//初始化
    }

再继续看awaitDone方法

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;//是否需要超时控制
        WaitNode q = null;//等待节点
        boolean queued = false;//是否有等待节点队列
        for (;;) {//自旋
            if (Thread.interrupted()) {//如果等待的线程被中断了
                removeWaiter(q);//移除该节点
                throw new InterruptedException();//抛出InterruptedException
            }

            int s = state;//获取状态
            if (s > COMPLETING) {//如果已完成了
                if (q != null)//如果等待节点不为空
                    q.thread = null;//设置等待线程为null,GC回收
                return s;//返回状态,可以根据状态返回具体返回值
            }
            else if (s == COMPLETING) // cannot time out yet //还没有真正的执行完,正准备给返回值赋值状态
                Thread.yield();//让出CPU
            else if (q == null)//等待节点队列为空
                q = new WaitNode();//创建一个节点
            else if (!queued)//如果等待队列不为空
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);//设置当前节点到队尾
            else if (timed) {//处理超时情况
                nanos = deadline - System.nanoTime();//计算剩余时间,小于等于0超时
                if (nanos <= 0L) {//超时
                    removeWaiter(q);//移除等待节点
                    return state;//返回该状态
                }
                LockSupport.parkNanos(this, nanos);//休眠nanos纳秒后醒来
            }
            else
                LockSupport.park(this);//休眠
        }
    }

取消:

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))//如果状态不为初始状态或者cas修改状态从NEW到INTERRUPTING或CANCELLED根据mayInterruptIfRunning参数
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//中断取消
                try {
                    Thread t = runner;
                    if (t != null)//执行的线程不为空,中断
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);//强制修改state的值为INTERRUPTED
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }