电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術(shù)文章
文章詳情頁

Java并發(fā)中的Fork/Join 框架機制詳解

瀏覽:7日期:2022-08-08 17:21:52
什么是 Fork/Join 框架

Fork/Join 框架是一種在 JDk 7 引入的線程池,用于并行執(zhí)行把一個大任務(wù)拆成多個小任務(wù)并行執(zhí)行,最終匯總每個小任務(wù)結(jié)果得到大任務(wù)結(jié)果的特殊任務(wù)。通過其命名也很容易看出框架主要分為 Fork 和 Join 兩個階段,第一階段 Fork 是把一個大任務(wù)拆分為多個子任務(wù)并行的執(zhí)行,第二階段 Join 是合并這些子任務(wù)的所有執(zhí)行結(jié)果,最后得到大任務(wù)的結(jié)果。

這里不難發(fā)現(xiàn)其執(zhí)行主要流程:首先判斷一個任務(wù)是否足夠小,如果任務(wù)足夠小,則直接計算,否則,就拆分成幾個更小的小任務(wù)分別計算,這個過程可以反復(fù)的拆分成一系列小任務(wù)。Fork/Join 框架是一種基于 分治 的算法,通過拆分大任務(wù)成多個獨立的小任務(wù),然后并行執(zhí)行這些小任務(wù),最后合并小任務(wù)的結(jié)果得到大任務(wù)的最終結(jié)果,通過并行計算以提高效率。。

Fork/Join 框架使用示例

下面通過一個計算列表中所有元素的總和的示例來看看 Fork/Join 框架是如何使用的,總的思路是:將這個列表分成許多子列表,然后對每個子列表的元素進行求和,然后,我們再計算所有這些值的總和就得到原始列表的和了。Fork/Join 框架中定義了 ForkJoinTask 來表示一個 Fork/Join 任務(wù),其提供了 fork()、join() 等操作,通常情況下,我們并不需要直接繼承這個 ForkJoinTask 類,而是使用框架提供的兩個 ForkJoinTask 的子類:

RecursiveAction 用于表示沒有返回結(jié)果的 Fork/Join 任務(wù)。 RecursiveTask 用于表示有返回結(jié)果的 Fork/Join 任務(wù)。

很顯然,在這個示例中是需要返回結(jié)果的,可以定義 SumAction 類繼承自 RecursiveTask,代碼入下:

/** * @author mghio * @since 2021-07-25 */public class SumTask extends RecursiveTask<Long> { private static final int SEQUENTIAL_THRESHOLD = 50; private final List<Long> data; public SumTask(List<Long> data) { this.data = data; } @Override protected Long compute() { if (data.size() <= SEQUENTIAL_THRESHOLD) { long sum = computeSumDirectly(); System.out.format('Sum of %s: %dn', data.toString(), sum); return sum; } else { int mid = data.size() / 2; SumTask firstSubtask = new SumTask(data.subList(0, mid)); SumTask secondSubtask = new SumTask(data.subList(mid, data.size())); // 執(zhí)行子任務(wù) firstSubtask.fork(); secondSubtask.fork(); // 等待子任務(wù)執(zhí)行完成,并獲取結(jié)果 long firstSubTaskResult = firstSubtask.join(); long secondSubTaskResult = secondSubtask.join(); return firstSubTaskResult + secondSubTaskResult; } } private long computeSumDirectly() { long sum = 0; for (Long l : data) { sum += l; } return sum; } public static void main(String[] args) { Random random = new Random(); List<Long> data = random.longs(1_000, 1, 100).boxed().collect(Collectors.toList()); ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(data); pool.invoke(task); System.out.println('Sum: ' + pool.invoke(task)); }}

這里當(dāng)列表大小小于 SEQUENTIAL_THRESHOLD 變量的值(閾值)時視為小任務(wù),直接計算求和列表元素結(jié)果,否則再次拆分為小任務(wù),運行結(jié)果如下:

Java并發(fā)中的Fork/Join 框架機制詳解

通過這個示例代碼可以發(fā)現(xiàn),F(xiàn)ork/Join 框架 中 ForkJoinTask 任務(wù)與平常的一般任務(wù)的主要不同點在于:ForkJoinTask 需要實現(xiàn)抽象方法 compute() 來定義計算邏輯,在這個方法里一般通用的實現(xiàn)模板是,首先先判斷當(dāng)前任務(wù)是否是小任務(wù),如果是,就執(zhí)行執(zhí)行任務(wù),如果不是小任務(wù),則再次拆分為兩個子任務(wù),然后當(dāng)每個子任務(wù)調(diào)用 fork() 方法時,會再次進入到 compute() 方法中,檢查當(dāng)前任務(wù)是否需要再拆分為子任務(wù),如果已經(jīng)是小任務(wù),則執(zhí)行當(dāng)前任務(wù)并返回結(jié)果,否則繼續(xù)分割,最后調(diào)用 join() 方法等待所有子任務(wù)執(zhí)行完成并獲得執(zhí)行結(jié)果。偽代碼如下:

if (problem is small) { directly solve problem.} else { Step 1. split problem into independent parts. Step 2. fork new subtasks to solve each part. Step 3. join all subtasks. Step 4. compose result from subresults.}Fork/Join 框架設(shè)計

Fork/Join 框架核心思想是把一個大任務(wù)拆分成若干個小任務(wù),然后匯總每個小任務(wù)的結(jié)果最終得到大任務(wù)的結(jié)果,如果讓你設(shè)計一個這樣的框架,你會如何實現(xiàn)呢?(建議思考一下),F(xiàn)ork/Join 框架的整個流程正如其名所示,分為兩個步驟:

大任務(wù)分割 需要有這么一個的類,用來將大任務(wù)拆分為子任務(wù),可能一次拆分后的子任務(wù)還是比較大,需要多次拆分,直到拆分出來的子任務(wù)符合我們定義的小任務(wù)才結(jié)束。 執(zhí)行任務(wù)并合并任務(wù)結(jié)果 第一步拆分出來的子任務(wù)分別存放在一個個 雙端隊列 里面(P.S. 這里為什么要使用雙端隊列請看下文),然后每個隊列啟動一個線程從隊列中獲取任務(wù)執(zhí)行。這些子任務(wù)的執(zhí)行結(jié)果都會放到一個統(tǒng)一的隊列中,然后再啟動一個線程從這個隊列中拿數(shù)據(jù),最后合并這些數(shù)據(jù)返回。

Fork/Join 框架使用了如下兩個類來完成以上兩個步驟:

ForkJoinTask 類 在上文的實例中也有提到,表示 ForkJoin 任務(wù),在使用框架時首先必須先定義任務(wù),通常只需要繼承自 ForkJoinTask 類的子類 RecursiveAction(無返回結(jié)果) 或者 RecursiveTask(有返回結(jié)果)即可。 ForkJoinPool 從名字也可以猜到一二了,就是用來執(zhí)行 ForkJoinTask 的線程池。大任務(wù)拆分出的子任務(wù)會添加到當(dāng)前線程的雙端隊列的頭部。

喜歡思考的你,心中想必會想到這么一種場景,當(dāng)我們需要完成一個大任務(wù)時,會先把這個大任務(wù)拆分為多個獨立的子任務(wù),這些子任務(wù)會放到獨立的隊列中,并為每個隊列都創(chuàng)建一個單獨的線程去執(zhí)行隊列里的任務(wù),即這里線程和隊列時一對一的關(guān)系,那么當(dāng)有的線程可能會先把自己隊列的任務(wù)執(zhí)行完成了,而有的線程則沒有執(zhí)行完成,這就導(dǎo)致一些先執(zhí)行完任務(wù)的線程干等了,這是個好問題。

既然是做并發(fā)的,肯定要最大程度壓榨計算機的性能,對于這種場景并發(fā)大師 Doug Lea 使用了工作竊取算法處理,使用工作竊取算法后,先完成自己隊列中任務(wù)的線程會去其它線程的隊列中”竊取“一個任務(wù)來執(zhí)行,哈哈,一方有難,八方支援。但是此時這個線程和隊列的持有線程會同時訪問同一個隊列,所以為了減少竊取任務(wù)的線程和被竊取任務(wù)的線程之間的競爭,F(xiàn)orkJoin 選擇了雙端隊列這種數(shù)據(jù)結(jié)構(gòu),這樣就可以按照這種規(guī)則執(zhí)行任務(wù)了:被竊取任務(wù)的線程始終從隊列頭部獲取任務(wù)并執(zhí)行,竊取任務(wù)的線程使用從隊列尾部獲取任務(wù)執(zhí)行。這個算法在絕大部分情況下都可以充分利用多線程進行并行計算,但是在雙端隊列里只有一個任務(wù)等極端情況下還是會存在一定程度的競爭。

Java并發(fā)中的Fork/Join 框架機制詳解

Fork/Join 框架實現(xiàn)原理

Fork/Join 框架的實現(xiàn)核心是 ForkJoinPool 類,該類的重要組成部分為 ForkJoinTask 數(shù)組和 ForkJoinWorkerThread 數(shù)組,其中 ForkJoinTask 數(shù)組用來存放框架使用者給提交給 ForkJoinPool 的任務(wù),F(xiàn)orkJoinWorkerThread 數(shù)組則負責(zé)執(zhí)行這些任務(wù)。任務(wù)有如下四種狀態(tài):

NORMAL 已完成

CANCELLED 被取消

SIGNAL 信號

EXCEPTIONAL 發(fā)生異常

下面來看看這兩個類的核心方法實現(xiàn)原理,首先來看 ForkJoinTask 的 fork() 方法,源碼如下:

Java并發(fā)中的Fork/Join 框架機制詳解

方法對于 ForkJoinWorkerThread 類型的線程,首先會調(diào)用 ForkJoinWorkerThread 的 workQueue 的 push() 方法異步的去執(zhí)行這個任務(wù),然后馬上返回結(jié)果。繼續(xù)跟進 ForkJoinPool 的 push() 方法,源碼如下:

Java并發(fā)中的Fork/Join 框架機制詳解

方法將當(dāng)前任務(wù)添加到 ForkJoinTask 任務(wù)隊列數(shù)組中,然后再調(diào)用 ForkJoinPool 的 signalWork 方法創(chuàng)建或者喚醒一個工作線程來執(zhí)行該任務(wù)。然后再來看看 ForkJoinTask 的 join() 方法,方法源碼如下:

Java并發(fā)中的Fork/Join 框架機制詳解

Java并發(fā)中的Fork/Join 框架機制詳解

方法首先調(diào)用了 doJoin() 方法,該方法返回當(dāng)前任務(wù)的狀態(tài),根據(jù)返回的任務(wù)狀態(tài)做不同的處理:

已完成狀態(tài)則直接返回結(jié)果 被取消狀態(tài)則直接拋出異常(CancellationException) 發(fā)生異常狀態(tài)則直接拋出對應(yīng)的異常

繼續(xù)跟進 doJoin() 方法,方法源碼如下:

Java并發(fā)中的Fork/Join 框架機制詳解

方法首先判斷當(dāng)前任務(wù)狀態(tài)是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成則直接返回任務(wù)狀態(tài)。如果沒有執(zhí)行完成,則從任務(wù)數(shù)組中(workQueue)取出任務(wù)并執(zhí)行,任務(wù)執(zhí)行完成則設(shè)置任務(wù)狀態(tài)為 NORMAL,如果出現(xiàn)異常則記錄異常并設(shè)置任務(wù)狀態(tài)為 EXCEPTIONAL(在 doExec() 方法中)。

總結(jié)

本文主要介紹了 Java 并發(fā)框架中的 Fork/Join 框架的基本原理和其使用的工作竊取算法(work-stealing)、設(shè)計方式和部分實現(xiàn)源碼。Fork/Join 框架在 JDK 的官方標(biāo)準(zhǔn)庫中也有應(yīng)用。比如 JDK 1.8+ 標(biāo)準(zhǔn)庫提供的 Arrays.parallelSort(array) 可以進行并行排序,它的原理就是內(nèi)部通過 Fork/Join 框架對大數(shù)組分拆進行并行排序,可以提高排序的速度,還有集合中的 Collection.parallelStream() 方法底層也是基于 Fork/Join 框架實現(xiàn)的,最后就是定義小任務(wù)的閾值往往是需要通過測試驗證才能合理給出,并且保證程序可以達到最好的性能。

到此這篇關(guān)于Java 并發(fā)中的Fork/Join 框架機制詳解的文章就介紹到這了,更多相關(guān)Java Fork/Join 框架內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 太阳能发电系统-太阳能逆变器,控制器-河北沐天太阳能科技首页 | 固诺家居-全屋定制十大品牌_整体衣柜木门橱柜招商加盟 | 回转支承-转盘轴承-回转驱动生产厂家-洛阳隆达轴承有限公司 | 复合土工膜厂家|hdpe防渗土工膜|复合防渗土工布|玻璃纤维|双向塑料土工格栅-安徽路建新材料有限公司 | 哈尔滨京科脑康神经内科医院-哈尔滨治疗头痛医院-哈尔滨治疗癫痫康复医院 | 无锡网站建设_小程序制作_网站设计公司_无锡网络公司_网站制作 | 污水处理设备维修_污水处理工程改造_机械格栅_过滤设备_气浮设备_刮吸泥机_污泥浓缩罐_污水处理设备_污水处理工程-北京龙泉新禹科技有限公司 | 泥浆在线密度计厂家-防爆数字压力表-膜盒-远传压力表厂家-江苏大亚自控设备有限公司 | 正压密封性测试仪-静态发色仪-导丝头柔软性测试仪-济南恒品机电技术有限公司 | 空心明胶胶囊|植物胶囊|清真胶囊|浙江绿键胶囊有限公司欢迎您! | 智慧钢琴-电钢琴-便携钢琴-数码钢琴-深圳市特伦斯乐器有限公司 | 成都离婚律师|成都结婚律师|成都离婚财产分割律师|成都律师-成都离婚律师网 | AR开发公司_AR增强现实_AR工业_AR巡检|上海集英科技 | 304不锈钢无缝管_不锈钢管厂家 - 隆达钢业集团有限公司 | 物联网卡_物联网卡购买平台_移动物联网卡办理_移动联通电信流量卡通信模组采购平台? | 济南电缆桥架|山东桥架-济南航丰实业有限公司 | 电竞馆加盟,沈阳网吧加盟费用选择嘉棋电竞_售后服务一体化 | 广州办公室设计,办公室装修,写字楼设计,办公室装修公司_德科 | 焊管生产线_焊管机组_轧辊模具_焊管设备_焊管设备厂家_石家庄翔昱机械 | 空心明胶胶囊|植物胶囊|清真胶囊|浙江绿键胶囊有限公司欢迎您! | 无线讲解器-导游讲解器-自助讲解器-分区讲解系统 品牌生产厂家[鹰米讲解-合肥市徽马信息科技有限公司] | 玻璃钢格栅盖板|玻璃钢盖板|玻璃钢格栅板|树篦子-长沙川皖玻璃钢制品有限公司 | loft装修,上海嘉定酒店式公寓装修公司—曼城装饰 | 垃圾处理设备_餐厨垃圾处理设备_厨余垃圾处理设备_果蔬垃圾处理设备-深圳市三盛环保科技有限公司 | 谷歌关键词优化-外贸网站优化-Google SEO小语种推广-思亿欧外贸快车 | 河南橡胶接头厂家,河南波纹补偿器厂家,河南可曲挠橡胶软连接,河南套筒补偿器厂家-河南正大阀门 | 长沙发电机-湖南发电机-柴油发电机供应厂家-长沙明邦智能科技 | 小型玉石雕刻机_家用玉雕机_小型万能雕刻机_凡刻雕刻机官网 | 伟秀电气有限公司-10kv高低压开关柜-高低压配电柜-中置柜-充气柜-欧式箱变-高压真空断路器厂家 | 全自动过滤器_反冲洗过滤器_自清洗过滤器_量子除垢环_量子环除垢_量子除垢 - 安士睿(北京)过滤设备有限公司 | 山西3A认证|太原AAA信用认证|投标AAA信用证书-山西AAA企业信用评级网 | 蒜肠网-动漫,二次元,COSPLAY,漫展以及收藏型模型,手办,玩具的新媒体.(原变形金刚变迷TF圈) | 杭州中央空调维修_冷却塔/新风机柜/热水器/锅炉除垢清洗_除垢剂_风机盘管_冷凝器清洗-杭州亿诺能源有限公司 | 进口试验机价格-进口生物材料试验机-西安卡夫曼测控技术有限公司 | 品牌策划-品牌设计-济南之式传媒广告有限公司官网-提供品牌整合丨影视创意丨公关活动丨数字营销丨自媒体运营丨数字营销 | 牛奶检测仪-乳成分分析仪-北京海谊 | 【法利莱住人集装箱厂家】—活动集装箱房,集装箱租赁_大品牌,更放心 | 无缝钢管-聊城无缝钢管-小口径无缝钢管-大口径无缝钢管 - 聊城宽达钢管有限公司 | 加热制冷恒温循环器-加热制冷循环油浴-杭州庚雨仪器有限公司 | 代办建筑资质升级-建筑资质延期就找上海国信启航 | 电磁铁_小型推拉电磁铁_电磁阀厂家-深圳市宗泰电机有限公司 |