问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

【并发编程】聊聊forkJoin的原理和最佳实践

创作时间:
作者:
@小白创作中心

【并发编程】聊聊forkJoin的原理和最佳实践

引用
CSDN
1.
https://blog.csdn.net/jia970426/article/details/146407914

对于线程池来说,其实本质就是一个生产者消费者的模式,而通过竞争的方式从队列中获取任务执行。本质上其实就是按照任务级别进行处理,但是对于一些可以分而治之的任务,传统的线程池没有办法分治处理。一是无法对大任务进行拆分,对于某个任务只能由单线程执行;二是工作线程从队列中获取任务时存在竞争情况

对于分治的思想,在算法领域 归并排序、快速排序,以及大数据领域MapReduce背后的思想体现,具体来说就是把一个大问题可以拆分成多个相同的子问题,子问题可以进一步拆分,进一步可以求解,然后合并的过程。

分支任务的模型其实就是两部分,先进行任务的分解,然后在子任务的结果进行合并。

基本使用

ForkJoin其实就是两部分,Fork:任务分解,Join:结果合并。

框架层面包含两部分一部分就是分治任务的线程池ForkJoinPool,就是用于执行任务的线程池,另外就是将任务封装的起来的ForkJoinTask任务。

public class ForkJoinTest {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        Finbonacci finbonacci = new Finbonacci(30);
        Integer result = forkJoinPool.invoke(finbonacci);
        System.out.println(result);
        Assert.notNull(result);
    }
}
class Finbonacci extends RecursiveTask<Integer> {
    final int n;
    public Finbonacci(int n) {
        this.n = n;
    }
    // 递归任务
    @Override
    protected Integer compute() {
        if (n <= 1) return n;
        Finbonacci f1 = new Finbonacci(n - 1);
        // 创建子任务
        f1.fork();
        Finbonacci f2 = new Finbonacci(n - 2);
        // 等待子任务结果,合并结果
        return f2.compute() + f1.join();
    }
}

工作原理

我们知道对于普通的ThreadPoolExecutor, 内部是有一个任务队列,多个线程进行从中获取任务执行。而对于ForkJoinPool来说内部有多个任务队列,当我们通过invoke() 或者submit提交任务的时候,ForkJoinPool会按照一定的路由规则把任务提交到一个任务队列中,对应的任务在执行过程中创建子任务,子任务会提交到工作线程对应的任务队列中。

这样就可以更细粒度的拆分任务队列,获取任务的效率更快。当有的工作线程没有任务时,会尝试回去其他任务的工作任务。这个过程就是工作窃取。

通过类图可以很清晰的看到,本质都是线程池的具体实现,这里不得不感叹 juc老爷子的设计思想,可以很好的在1.5版本的JUC结合。

核心流程图

应用场景

1.对于求解的问题可以分拆,但是需要考验程序员的分拆能力,容易引入逻辑BUG。
2.forkJoinPool适合计算密集型任务。不适合阻塞任务。

3.具体就是大规模数据计算,递归算法,文件 图片处理

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号