伍佰目录 短网址
  当前位置:海洋目录网 » 站长资讯 » 站长资讯 » 文章详细 订阅RssFeed

自定义 ForkJoinPool 提升并行流 ParallelStream 执行速度

来源:本站原创 浏览:108次 时间:2022-10-11
简介

在 java8 中 添加了流Stream,可以让你以一种声明的方式处理数据。使用起来非常简单优雅。ParallelStream 则是一个并行执行的流,采用 ForkJoinPool 并行执行任务,提高执行速度。

下面我们看看2个简单的示例:

示例1 (list)


Arrays.asList(1,2,3,4,5,6)    .parallelStream()    .forEach((value) -> {        String name = Thread.currentThread().getName();        System.out.println("示例1 Thread:" + name + " value:" + value);    });
示例2 (array)


Stream.of(1,2,3,4,5,6)    .parallel()    .forEach((value) -> {        String name = Thread.currentThread().getName();        System.out.println("示例2 Thread:" + name + " value:" + value);    });
问题引出

笔者最佳在做一些爬虫相关的业务,其核心工具已开源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,经过2个版本的迭代已经发展成了一个强大非账号爬虫利器,赶紧来试试吧。

我们采集了大量的代理 ip 用来供爬虫使用,其中有个定时任务每 5 分钟去检测代理是否失效,代理 ip 检测比较费时,我们给每个检测的请求 设定了 2s 的超时,这样单线程的话 1000 个 ip 就得消耗半个多小时,当然笔者在校验的时候采用的 parallel Stream 简化开发。

然后发现效果并不明显,代理 ip 数量上来之后 5 分钟完全检测不完,导致任务堆积。明明用了并发流为什么没有明显的提高执行速度呢?

下面我们来看看刚刚的“示例”打印出的信息:


示例1 Thread:main value:4示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1示例1 Thread:main value:6示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5示例1 Thread:main value:3示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2示例2 Thread:main value:4示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6

我们可以看到 Parallel Stream,默认采用的是一个 ForkJoinPool.commonPool 的线程池,这样我们就算使用了 Parallel Stream, 整个个 jvm 也都是采用的 commonPool 这样一个线程池,在校验代理 ip 的时候我们还有采集代理等其他的任务中也大量使用了并发流, 这样也就印证了为什么会任务堆积了。

解决问题

使用自定义 ForkJoinPool 执行速度。示例代码如下:


// 示例:自定义线程池ForkJoinPool forkJoinPool = new ForkJoinPool(8);
// 这里是从数据库里查出来的一批代理 ipList<ProxyList> records = new ArrayList<>();
// 找出失效的代理 ipList<String> needDeleteList = forkJoinPool.submit(() -> records.parallelStream()    .map(ProxyList::getIpPort)    .filter(IProxyListTask::isFailed)    .collect(Collectors.toList())).join();
// 删除失效的代理

整个代码依然比较优雅,在使用自定义的 ForkJoin 线程池之后,执行速度有了明显的提升。以前 5 分钟执行不完的任务现在 2 分钟之内就能全部执行完毕。

结论

java8 的并发流在大批量数据处理时可简化多线程的使用,在遇到耗时业务或者重度使用并发流不妨根据业务情况采用自定义线程池来提示处理速度。


  推荐站点

  • At-lib分类目录At-lib分类目录

    At-lib网站分类目录汇集全国所有高质量网站,是中国权威的中文网站分类目录,给站长提供免费网址目录提交收录和推荐最新最全的优秀网站大全是名站导航之家

    www.at-lib.cn
  • 中国链接目录中国链接目录

    中国链接目录简称链接目录,是收录优秀网站和淘宝网店的网站分类目录,为您提供优质的网址导航服务,也是网店进行收录推广,站长免费推广网站、加快百度收录、增加友情链接和网站外链的平台。

    www.cnlink.org
  • 35目录网35目录网

    35目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向35目录推荐、提交优秀网站。

    www.35mulu.com
  • 就要爱网站目录就要爱网站目录

    就要爱网站目录,按主题和类别列出网站。所有提交的网站都经过人工审查,确保质量和无垃圾邮件的结果。

    www.912219.com
  • 伍佰目录伍佰目录

    伍佰网站目录免费收录各类优秀网站,全力打造互动式网站目录,提供网站分类目录检索,关键字搜索功能。欢迎您向伍佰目录推荐、提交优秀网站。

    www.wbwb.net