并发编程-FutureTask解析

这篇具有很好参考价值的文章主要介绍了并发编程-FutureTask解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1、FutureTask对象介绍

Future对象大家都不陌生,是JDK1.5提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。

在Java中想要通过线程执行一个任务,离不开Runnable与Callable这两个接口。

Runnable与Callable的区别在于,Runnable接口只有一个run方法,该方法用来执行逻辑,但是并没有返回值;而Callable的call方法,同样用来执行业务逻辑,但是是有一个返回值的。

Callable执行任务过程中可以通过FutureTask获得任务的执行状态,并且可以在执行完成后通过Future.get()方式获取执行结果。

Future是一个接口,而FutureTask就是Future的实现类。并且FutureTask实现了 RunnableFuture(Runnable + Future),说明我们可以创建一个FutureTask并直接把它放到线程池执行,然后获取FutureTask的执行结果。

2、FutureTask源码解析

2.1 主要方法和属性

那么FutureTask是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下FutureTask中的属性。

// FutureTask的状态及其常量
private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
    
    // callable对象,执行完后置空
    private Callable<V> callable;
    // 要返回的结果或要引发的异常来自 get() 方法
    private Object outcome; // non-volatile, protected by state reads/writes
    // 执行Callable的线程
    private volatile Thread runner;
    // 等待线程的一个链表结构
    private volatile WaitNode waiters;


FutureTask中几个比较重要的方法。

// 取消任务的执行
boolean cancel(boolean mayInterruptIfRunning);
// 返回任务是否已经被取消
boolean isCancelled();
// 返回任务是否已经完成,任务状态不为NEW即为完成
boolean isDone();
// 通过get方法获取任务的执行结果
V get() throws InterruptedException, ExecutionException;
// 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;


2.2 FutureTask执行

当我们在线程池中执行一个Callable方法时,其实是将Callable任务封装成一个RunnableFuture对象去执行,同时将这个RunnableFuture对象返回,这样我们就拿到了FutureTask的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。

以下为ThreadPoolExecutor线程池提交一个callable方法的源码。

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
	
	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }


2.3 run方法介绍

RunnableFuture其实也是一个可以执行的runnable,我们看下他的run方法。其主要流程就是执行call方法,正常执行完毕后将result结果赋值到outcome属性上。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // 将callable赋值到本地变量
            Callable<V> c = callable;
            // 判断callable不为空并且FutureTask的状态必须为新创建
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行call方法(用户自己实现的call逻辑),并获取到result结果
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    // 如果执行过程出现异常,则将异常对象赋值到outcome上
                    setException(ex);
                }
                // 如果正常执行完毕,则将result赋值到outcome属性上
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }


以下逻辑为正常执行完成后赋值的逻辑。

// 如果任务没有被取消,将future执行完的返回值赋值给result结果
// FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态
// 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束
protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }


以下为执行异常时赋值逻辑,直接将Throwable对象赋值到outcome属性上。

protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


无论是正常执行还是异常执行,最终都会调用一个finishCompletion方法,用来做工作的收尾工作。

2.4 get方法介绍

Future的get方法有两个重载的方法,一个是get()获取结果,一个是get(long, TimeUnit)带有超时时间的获取结果,我们看下FutureTask中的这两个方法是如何实现的。

// 不带有超时时间,一直阻塞直到获取结果
public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 等待结果完成,带有超时的get方法也是调用的awaitDone方法
            s = awaitDone(false, 0L);
        // 返回结果
        return report(s);
    }

// 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 如果任务未中断,调用awaitDone方法等待任务结果
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        // 返回结果
        return report(s);
    }


我们主要看下awaitDone方法的执行逻辑。此方法会通过for循环的方式一直阻塞等待任务执行完成。如果带有超时时间,则超过截止时间后会直接返回。

// timed:是否需要超时获取
// nanos:超时时间单位纳秒
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 任务状态大于COMPLETING,则表明任务结束,直接返回
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                // Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
                // COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL
                Thread.yield();
            else if (q == null)
                // 每调用一次get方法,都会创建一个WaitNode等待节点
                q = new WaitNode();
            else if (!queued)
                // 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }


我们在看下report方法,在调用get方法时是如何返回结果的。

这里首先获取outcome的值,并判断任务是否已经执行完成,如果执行完成,则将outcome对象强转成泛型指定的类型;如果任务被取消了,则抛出一个CancellationException异常;如果都不是,则说明任务在执行过程中发生了异常,此时任务状态位EXCEPTIONAL,此时的outcome即为Throwable对象,所以将outcome强转为Throwable并抛出异常。

由此可以知道,我们将一个FutureTask任务submit到线程池中执行的时候,如果发生了异常,是会在调用get方法的时候抛出的。

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }


2.5 cancel方法介绍

cancel方法用于取消正在运行的任务,如果任务取消成功,则返回TRUE,如果取消失败则返回FALSE。

// mayInterruptIfRunning:允许中断正在运行的任务
public boolean cancel(boolean mayInterruptIfRunning) {
        // mayInterruptIfRunning如果为true则将状态置为INTERRUPTING,如果未false则将状态置为CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        // 如果状态修改成功后,判断是否允许中断线程,如果允许,则调用Thread的interrupt方法中断
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 取消后的收尾工作
            finishCompletion();
        }
        return true;
    }


2.6 isDone/isCancelled方法介绍

isDone方法用于判断FutureTask是否已经完成;isCancelled方法用来判断FutureTask是否已经取消,这两个方法都是通过状态位来判断的。

public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }


2.7 finishCompletion方法介绍

我们看下finishCompletion方法都做了哪些工作。

// 删除所有等待线程并发出信号,最后执行done方法
private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }


我们看到done方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。

protected void done() { }

3、总结

通过源码解读可以了解到Future的原理:

第一步:主线程将任务封装成一个Callable对象,通过submit方法提交到线程池去执行。

第二步:线程池执行任务的run方法,主线程则可以继续执行其他逻辑。

第三步:线程池中方法执行完成后将结果赋值到outcome属性上,并修改任务状态。

第四步:主线程在需要拿到异步任务结果的时候,主动调用fugure.get()方法来获取结果。

第五步:如果异步线程在执行过程中发生异常,则会在调用future.get()方法的时候抛出来。

以上就是对于FutureTask的分析,我们可以了解FutureTask任务执行的方式以及Future.get已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解FutureTask的任务执行状态以及状态的变化过程。

作者:京东物流 丁冬

来源:京东云开发者社区 自猿其说Tech文章来源地址https://www.toymoban.com/news/detail-606907.html

到了这里,关于并发编程-FutureTask解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • java JUC并发编程 第九章 对象内存布局与对象头

    第一章 java JUC并发编程 Future: link 第二章 java JUC并发编程 多线程锁: link 第三章 java JUC并发编程 中断机制: link 第四章 java JUC并发编程 java内存模型JMM: link 第五章 java JUC并发编程 volatile与JMM: link 第六章 java JUC并发编程 CAS: link 第七章 java JUC并发编程 原子操作类增强: link 第八章

    2024年02月07日
    浏览(43)
  • 掌握Go并发:Go语言并发编程深度解析

    🏷️ 个人主页 :鼠鼠我捏,要死了捏的主页  🏷️ 系列专栏 :Golang全栈-专栏 🏷️ 个人学习笔记,若有缺误,欢迎评论区指正   前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站AI学习网站。 当我们开发一个W

    2024年02月20日
    浏览(69)
  • GO语言网络编程(并发编程)并发介绍,Goroutine

    进程和线程 并发和并行 协程和线程 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。 线程:一个线程上可以跑多个协程,协程是轻量级的线程。 goroutine 只是由官方实现的超级\\\"线程池\\\"。 每个

    2024年02月09日
    浏览(58)
  • 并发编程-CompletableFuture解析

    CompletableFuture对象是JDK1.8版本新引入的类,这个类实现了两个接口,一个是Future接口,一个是CompletionStage接口。 CompletionStage接口是JDK1.8版本提供的接口,用于异步执行中的阶段处理,CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对

    2024年02月15日
    浏览(34)
  • 【Java 并发编程】CAS 原理解析

    悲观锁 的原理是每次实现数据库的增删改的时候都进⾏阻塞,防⽌数据发⽣脏读。 乐观锁 的原理是在数据库更新的时候,⽤⼀个 version 字段来记录版本号,然后通过⽐较是不是⾃⼰要修改的版本号再进⾏修改。这其中就引出了⼀种⽐较交换的思路来实现数据的⼀致性,事实

    2024年02月06日
    浏览(38)
  • 深入理解高并发编程 - 深度解析ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor 并实现了 ScheduledExecutorService 接口,这使得它可以同时充当线程池和定时任务调度器。 构造方法接收一个 corePoolSize 参数,它表示线程池中的核心线程数。核心线程是一直保持存活的线程,即使没有任务执行,以便支持定时任务的调

    2024年02月12日
    浏览(48)
  • 深入理解高并发编程 - 深度解析Thread 类的源码

    先看源码:Thread 类实现了 Runnable 接口 而 Runnable 被@FunctionalInterface 注解标记为函数式接口,Runnable 接口源代码 再来看看@FunctionalInterface 注解的源码 FunctionalInterface 用于标记类或接口是否是一个函数式接口,并在运行时可用于反射获取信息。 这段代码是 Thread 类的一个静态初

    2024年02月09日
    浏览(48)
  • java并发编程:多线程基础知识介绍

    最初的计算机只能接受一些特定的指令,用户每输入一个指令,计算机就做出一个操作。当用户在思考或者输入时,计算机就在等待。这样效率非常低下,在很多时候,计算机都处在等待状态。 后来有了 批处理操作系统 ,把一系列需要操作的指令写下来,形成一个清单,一次

    2024年02月07日
    浏览(52)
  • Python异步编程高并发执行爬虫采集,用回调函数解析响应

    异步技术是Python编程中对提升性能非常重要的一项技术。在实际应用,经常面临对外发送网络请求,调用外部接口,或者不断更新数据库或文件等操作。 这这些操作,通常90%以上时间是在等待,如通过REST, gRPC向服务器发送请求,通常可能等待几十毫秒至几秒,甚至更长。如

    2024年02月08日
    浏览(54)
  • 深入源码解析 ReentrantLock、AQS:掌握 Java 并发编程关键技术

    🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者 📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代 🌲文章所在专栏:JUC 🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识 💬 向我询问任何您想要的

    2024年02月11日
    浏览(51)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包