从接口说起

在Java中,要在一个线程中运行一段代码,需要一个对象实现Runnable或Callable接口,然后将该类作为参数传给一个线程运行;或者继承Thread类,直接start运行。其实本质上,Thread类也是实现了Runnable接口,在运行时运行重载的run方法。所以我们就先从Runnable和Callable接口说起。

Runnable接口

先看Runnable接口,Runnable接口其实是非常简单的:

1
2
3
public interface Runnable {
    public abstract void run();
}

这个接口的作用也非常简单,用来创建一个新的线程,然后在新的线程中执行run中的方法。下面举一个简单的例子,创建一个线程,实现1-100的累加。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// 实现Runnable接口
public class SumRunnable implements Runnable{

    /**
     * 线程执行入口
     */
    @SneakyThrows
    @Override
    public void run() {
        int result = 0;
        for (int i = 0; i < 100; i++) {
            result += i;
        }
        System.out.println(result);
    }

    public static void main(String[] args) {
        SumRunnable sum = new SumRunnable();
        new Thread(sum).run();
    }
}

Callable接口

Runnable接口虽然简单,但是其提供的功能相对就很少的。如果我们不希望在另一个线程内打印结果,我们要把打印的部分收敛到主线程。这就要求计算线程能将结果传给主线程。如果使用Runnable来实现,那就要我们自己实现一套线程同步机制,传递计算结果。所以Runnable接口最大的缺点在于不能直接获得执行结果;另外,Runnable接口也无法获取任务执行中发生的异常。

于是,在Java1.5中新加入了Callable接口,我们来看下Callable的源码:

1
2
3
public interface Callable<V> {
    V call() throws Exception;
}

可以看到,Callable接口可以返回返回值并抛出异常。下面仍然以求和举例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public class SumCallable implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        int result = 0;
        for (int i = 0; i < 100; i++) {
            result += i;
        }
        return result;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        SumCallable sum = new SumCallable();
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 提交任务
        Future<Integer> future = executor.submit(sum);
        // 获取任务返回结果
        Integer integer = future.get();
        System.out.println(integer);
    }
}

我们首先创建了一个线程池,然后将求和的类提交给线程池,线程池返回给我们一个Future对象,我们从这个Future对象中得到了我们计算的结果。那么这个新引入的Future对象的作用是什么?在参数传递的过程中扮演了怎样的角色呢?

Future接口

我们在将任务提交给线程池后,线程池返回给我们一个Future对象,我们通过get得到任务执行结束后返回的结果。所以其实Future的主要作用就是给我们提供了一种方式让我们可以获得另一个线程的执行结果。但Future的作用远不止如此,我们先来看下Future接口都提供了哪些方法。

1
2
3
4
5
6
7
8
public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看到,除了获取结果外,Future还给我们提供了判断任务是否结束、是否取消、和取消任务的能力。我们下面来看下各个方法的作用。

cancel()方法可以用来取消一个任务。对于未执行的task,task直接进入结束状态,并且永远不会执行。对于已经开始运行的任务,cancel通过mayInterruptIfRunning()参数控制是否立刻中断task还是等待任务自然结束。如果task已经结束,无论是自然结束还是被中断,cancel会返回false表示取消执行失败。

isCancelled()方法用于判断一个任务是否在正常结束之前被取消了。

isDone()方法用一个判断一个任务是否完成。这里的完成包括了正常结束,或在运行期间抛出异常,或被取消。

get()方法是Future的核心方法,用于获取一个异步线程的结果。调用该方法后,调用线程会阻塞,直到任务结束。如果任务正常结束,则get()会返回结果;如果任务被取消,get()会抛出CancellationException异常;如果任务执行时抛出了异常,那get()方法会抛出ExecutionException异常,通过getCause()方法可以得到抛出的异常;如果当前线程在等待执行结果时被中断了,会抛出InterruptedException异常。

get(long timeout, TimeUnit unit)get()方法的重载方法,允许当前线程等待指定的时间后,如果任务还没有结束并返回,那么会抛出TimeoutException异常。

所以其实Future方法不仅仅提供了一个获取结果的方式,其实还代表了一个任务的生命周期,让我们可以知道任务当前是什么状态。但是Future毕竟只是一个接口,最终的行为还是取决于具体的实现,所以我们接下来就看一下Java提供的Future实现类——FutureTask

Future的具体实现——FutureTask

我们先来看下FutureTask的类图,他同时实现了Runnable和Future两个接口。

FutureTask类图

那么就是说,当FutureTask提交给另一个线程执行时,其实运行的是Runnablerun()方法。我们下面来看下他的构造函数和run()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public void run() {
        // ...
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // ...
        }
    }

所以他是以Callable为参数,在run()中执行Callable,然后返回执行结果。

上面是实现Runnable接口的部分,下面来看下实现Future接口的部分,先来看下核心的get()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

不论是阻塞的get()还是有超时的get(),最后都会调用awaitDone()方法。下面是awaitDone()方法的结构体,看下面代码之前我们需要记住,调用get()方法的线程和执行run()方法的线程不是一个线程。下面的WaitNode是一个单链表,用于存放等待当前任务执行结果的线程,每当有一个线程调用FutureTaskget()方法时,就会初始化一个WaitNode并将自己加入到该链表中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 先判断执行get的线程是否被中断,如果中断则从链表中移除
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) { 
            // 线程已经结束,返回
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 线程正在结束,需要设置结果或设置异常,当前线程让出CPU,等待下次循环
            Thread.yield();
        else if (q == null) // 当前线程第一次调用get方法,初始化一个WaitNode变量
            q = new WaitNode();
        else if (!queued) // WaitNode已初始化,加入到等待队列中;这里采用的是头插法。
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        else if (timed) {
            // 如果是有时间中断的,那么等待指定时间
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // 等待时间够了,移出等待队列
                removeWaiter(q);
                return state;
            }
            // 阻塞剩余的时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this); // 当前线程阻塞,直到任务运行结束。
    }
}

我们再来看一下cancel()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean cancel(boolean mayInterruptIfRunning) {
    // 判断是否可以执行中断,这里是对状态为NEW的任务进行处理
    // 如果一个任务的状态为NEW,并且可以将状态设置为INTERRUPTING或CANCELLED,那么返回True
    // 如果这个过程失败了,返回false
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // 尝试中断正在执行的任务
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 任务完成
        finishCompletion();
    }
    return true;
}

在出现异常、任务完成、任务取消的时候,都会调用finishCompletion()方法,该方法的作用是遍历等待队列并唤醒在等待队列中的线程们。

更强大的CompletableFuture类

虽然FutureTask实现的Future接口能够让我们获取进程运行的返回值,但是FutureTask有个最大的问题在于,我们无法获得FutureTask准确结束的时间。如果我们要获得FutureTask的返回值并对结果进行处理,要么不断调用get()方法阻塞当前线程,要么不断调用isDone()方法,等待任务完成。二者都会阻塞当前线程。

另外FutureTask其实仅仅代表了一个任务,如果需要组合多个并行的任务,比如连续执行任务A,任务B,任务C,就需要创建三个FutureTask并阻塞当前线程三次;或者任务A和任务B并行执行,二者都结束后执行任务C,那么就需要在主线程中等待两个任务结束,再创建一个FutureTask执行任务。

因此,Java 8添加了一个新的Future实现类——CompletableFuture,加入了更多的新功能:

  1. 手动结束任务,允许用户不提供运行的代码,只提供结果即可结束一个任务。
  2. 允许传递回调函数。
  3. 自由组合多个任务。

手动结束任务

如果我们要使用一个FutureTask,那么必须要给该类提供一个实现Callable或Runnable接口的类才行。而CompletableFuture允许开发者直接调用complete手动结束Future,我们仍然以求和为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class SumCompeletableFuture {

    public static Integer sum() {
        // ...
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        new Thread(() -> {
            Integer result = SumCompeletableFuture.sum();
            // 在另一个线程内手动结束任务
            future.complete(result);
        }).run();
        System.out.println(future.get());
    }
}

上述代码还可以结合Lambda表达式简化为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class SumCompeletableFuture {

    public static Integer sum() {
        // ...
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(SumCompeletableFuture::sum);
        System.out.println(future.get());
    }
}

其中CompletableFuture.supplyAsync会使用ForkJoinPool默认的线程池执行。CompletableFuture的各种方法也允许开发者使用自定义的线程池。

允许回调函数

CompletableFuture另一个增强是允许用户使用thenAccept传递回调方法,当任务结束时,自动调用该回调方法。

比如我们可以将上述代码改为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class SumCompeletableFuture {

    public static Integer sum() {
        // ...
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(SumCompeletableFuture::sum);
        // 自动回调打印任务
        future.thenAccept(System.out::println);
    }
}

组合多个任务

这个应该是CompletableFuture带来的最为强大的功能了。

CompletableFuture允许组合两个串行执行的任务,然后利用二者的结果执行回调函数。该功能的函数为thenComposethenCompose默认执行的线程是上一个任务的线程,可以减少线程切换的损耗。但CompletableFuture仍提供了一个兄弟方法thenCombineAsync,将会使用一个新的线程执行该任务。

thenCompose的第一个参数是要串行执行的CompletableFuture,第二参数是一个以二者返回值为参数的BiFunction,thenCombine的返回值是一个新的CompletableFuture。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class SumCompose {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(Sum::sum);
        CompletableFuture<Integer> future = task.thenCombine(
                CompletableFuture.supplyAsync(Sum::sum),
                Integer::sum
        );
        System.out.println(future.get());
    }
}

CompletableFuture还可以将多个CompletableFuture任务组合到一起并发执行。比如我们用一个Task执行1-50的求和,另一个任务执行51-100的。如果使用FutureTask我们需要手动管理二者的关系,使用CompletableFuture我们可以直接调用thenCombine。thenCombine的第一个参数是要并发执行的CompletableFuture,第二参数是一个以二者返回值为参数的BiFunction,thenCombine的返回值是一个新的CompletableFuture。

1
2
3
4
5
6
7
8
public class SumCombine {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(Sum::sum);
        CompletableFuture<Integer> future = task.thenCombine(CompletableFuture.supplyAsync(Sum::sum), Integer::sum);
        System.out.println(future.get());
    }
}

CompletableFuture还提供了很多方法,具体这里不再展开,其他函数可参考Java doc

总结

ok,以上就是本文的全部内容,本文首先从线程执行相关的接口讲起,介绍了无返回值的Runnable接口和有返回值的Callable接口,然后介绍了用于获取返回值的Future接口并重点分析了Future的实现类FutureTask的源码。最后介绍了Java8中新提供的CompelableFutureTask的使用。

参考

  1. 使用CompletableFuture - 廖雪峰的官方网站
  2. Java的Future机制详解 - 知乎
  3. Callable、Future、RunnableFuture、FutureTask的原理及应用 - nullzx - 博客园
  4. Java并发编程:Callable、Future和FutureTask - Matrix海子 - 博客园
  5. JCIP 6.3.2
  6. Java sun.misc.Unsafe类的学习笔记 - 大新博客 - 博客园
  7. jdk8u_jdk/RunnableFuture.java at master · JetBrains/jdk8u_jdk · GitHub
  8. jdk8u-jdk/FutureTask.java at master · frohoff/jdk8u-jdk · GitHub