电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Java并發(fā)工具類(lèi)LongAdder原理實(shí)例解析

瀏覽:100日期:2022-09-01 14:35:43

LongAdder實(shí)現(xiàn)原理圖

Java并發(fā)工具類(lèi)LongAdder原理實(shí)例解析

Java并發(fā)工具類(lèi)LongAdder原理實(shí)例解析

高并發(fā)下N多線程同時(shí)去操作一個(gè)變量會(huì)造成大量線程CAS失敗,然后處于自旋狀態(tài),導(dǎo)致嚴(yán)重浪費(fèi)CPU資源,降低了并發(fā)性。既然AtomicLong性能問(wèn)題是由于過(guò)多線程同時(shí)去競(jìng)爭(zhēng)同一個(gè)變量的更新而降低的,那么如果把一個(gè)變量分解為多個(gè)變量,讓同樣多的線程去競(jìng)爭(zhēng)多個(gè)資源。

LongAdder則是內(nèi)部維護(hù)一個(gè)Cells數(shù)組,每個(gè)Cell里面有一個(gè)初始值為0的long型變量,在同等并發(fā)量的情況下,爭(zhēng)奪單個(gè)變量的線程會(huì)減少,這是變相的減少了爭(zhēng)奪共享資源的并發(fā)量,另外多個(gè)線程在爭(zhēng)奪同一個(gè)原子變量時(shí)候,如果失敗并不是自旋CAS重試,而是嘗試獲取其他原子變量的鎖,最后當(dāng)獲取當(dāng)前值時(shí)候是把所有變量的值累加后再加上base的值返回的。

LongAdder維護(hù)了要給延遲初始化的原子性更新數(shù)組和一個(gè)基值變量base數(shù)組的大小保持是2的N次方大小,數(shù)組表的下標(biāo)使用每個(gè)線程的hashcode值的掩碼表示,數(shù)組里面的變量實(shí)體是Cell類(lèi)型。

Cell 類(lèi)型是Atomic的一個(gè)改進(jìn),用來(lái)減少緩存的爭(zhēng)用,對(duì)于大多數(shù)原子操作字節(jié)填充是浪費(fèi)的,因?yàn)樵硬僮鞫际菬o(wú)規(guī)律的分散在內(nèi)存中進(jìn)行的,多個(gè)原子性操作彼此之間是沒(méi)有接觸的,但是原子性數(shù)組元素彼此相鄰存放將能經(jīng)常共享緩存行,也就是偽共享。所以這在性能上是一個(gè)提升。

另外由于Cells占用內(nèi)存是相對(duì)比較大的,所以一開(kāi)始并不創(chuàng)建,而是在需要時(shí)候再創(chuàng)建,也就是惰性加載,當(dāng)一開(kāi)始沒(méi)有空間時(shí)候,所有的更新都是操作base變量。

java.util.concurrency.atomic.LongAdder是Java8新增的一個(gè)類(lèi),提供了原子累計(jì)值的方法。根據(jù)文檔的描述其性能要優(yōu)于AtomicLong

這里測(cè)試時(shí)基于JDK1.8進(jìn)行的,AtomicLong 從Java8開(kāi)始針對(duì)x86平臺(tái)進(jìn)行了優(yōu)化,使用XADD替換了CAS操作,我們知道JUC下面提供的原子類(lèi)都是基于Unsafe類(lèi)實(shí)現(xiàn)的,并由Unsafe來(lái)提供CAS的能力。CAS (compare-and-swap)本質(zhì)上是由現(xiàn)代CPU在硬件級(jí)實(shí)現(xiàn)的原子指令,允許進(jìn)行無(wú)阻塞,多線程的數(shù)據(jù)操作同時(shí)兼顧了安全性以及效率。大部分情況下,CAS都能夠提供不錯(cuò)的性能,但是在高競(jìng)爭(zhēng)的情況下開(kāi)銷(xiāo)可能會(huì)成倍增長(zhǎng),具體的研究可以參考這篇文章, 我們直接看下代碼:

public class AtomicLong {public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; }}public final class Unsafe {public final long getAndAddLong(Object var1, long var2, long var4) { long var6; do { var6 = this.getLongVolatile(var1, var2); } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; }}

getAndAddLong方法會(huì)以volatile的語(yǔ)義去讀需要自增的域的最新值,然后通過(guò)CAS去嘗試更新,正常情況下會(huì)直接成功后返回,但是在高并發(fā)下可能會(huì)同時(shí)有很多線程同時(shí)嘗試這個(gè)過(guò)程,也就是說(shuō)線程A讀到的最新值可能實(shí)際已經(jīng)過(guò)期了,因此需要在while循環(huán)中不斷的重試,造成很多不必要的開(kāi)銷(xiāo),而xadd的相對(duì)來(lái)說(shuō)會(huì)更高效一點(diǎn),偽碼如下,最重要的是下面這段代碼是原子的,也就是說(shuō)其他線程不能打斷它的執(zhí)行或者看到中間值,這條指令是在硬件級(jí)直接支持的:

function FetchAndAdd(address location, int inc) { int value := *location *location := value + inc return value}

而LongAdder的性能比上面那種還要好很多,于是就研究了一下。首先它有一個(gè)基礎(chǔ)的值base,在發(fā)生競(jìng)爭(zhēng)的情況下,會(huì)有一個(gè)Cell數(shù)組用于將不同線程的操作離散到不同的節(jié)點(diǎn)上去(會(huì)根據(jù)需要擴(kuò)容,最大為CPU核數(shù)),sum()會(huì)將所有Cell數(shù)組中的value和base累加作為返回值。核心的思想就是將AtomicLong一個(gè)value的更新壓力分散到多個(gè)value中去,從而降低更新熱點(diǎn)。

Java并發(fā)工具類(lèi)LongAdder原理實(shí)例解析

public class LongAdder extends Striped64 implements Serializable {//...}

LongAdder繼承自Striped64,Striped64內(nèi)部維護(hù)了一個(gè)懶加載的數(shù)組以及一個(gè)額外的base實(shí)例域,數(shù)組的大小是2的N次方,使用每個(gè)線程Thread內(nèi)部的哈希值訪問(wèn)。

abstract class Striped64 extends Number {/** Number of CPUS, to place bound on table size */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * Table of cells. When non-null, size is a power of 2. */ transient volatile Cell[] cells; @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField('value')); } catch (Exception e) {throw new Error(e); } } }}

數(shù)組的元素是Cell類(lèi),可以看到Cell類(lèi)用Contended注解修飾,這里主要是解決false sharing(偽共享的問(wèn)題),不過(guò)個(gè)人認(rèn)為偽共享翻譯的不是很好,或者應(yīng)該是錯(cuò)誤的共享,比如兩個(gè)volatile變量被分配到了同一個(gè)緩存行,但是這兩個(gè)的更新在高并發(fā)下會(huì)競(jìng)爭(zhēng),比如線程A去更新變量a,線程B去更新變量b,但是這兩個(gè)變量被分配到了同一個(gè)緩存行,因此會(huì)造成每個(gè)線程都去爭(zhēng)搶緩存行的所有權(quán),例如A獲取了所有權(quán)然后執(zhí)行更新這時(shí)由于volatile的語(yǔ)義會(huì)造成其刷新到主存,但是由于變量b也被緩存到同一個(gè)緩存行,因此就會(huì)造成cache miss,這樣就會(huì)造成極大的性能損失,因此有一些類(lèi)庫(kù)的作者,例如JUC下面的、Disruptor等都利用了插入dummy 變量的方式,使得緩存行被其獨(dú)占,比如下面這種代碼:

static final class Cell { volatile long p0, p1, p2, p3, p4, p5, p6; volatile long value; volatile long q0, q1, q2, q3, q4, q5, q6; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try {UNSAFE = getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField('value')); } catch (Exception e) {throw new Error(e); } } }

但是這種方式畢竟不通用,例如32、64位操作系統(tǒng)的緩存行大小不一樣,因此JAVA8中就增加了一個(gè)注@sun.misc.Contended解用于解決這個(gè)問(wèn)題,由JVM去插入這些變量,具體可以參考o(jì)penjdk.java.net/jeps/142 ,但是通常來(lái)說(shuō)對(duì)象是不規(guī)則的分配到內(nèi)存中的,但是數(shù)組由于是連續(xù)的內(nèi)存,因此可能會(huì)共享緩存行,因此這里加一個(gè)Contended注解以防cells數(shù)組發(fā)生偽共享的情況。

/** * 底競(jìng)爭(zhēng)下直接更新base,類(lèi)似AtomicLong * 高并發(fā)下,會(huì)將每個(gè)線程的操作hash到不同的 * cells數(shù)組中,從而將AtomicLong中更新 * 一個(gè)value的行為優(yōu)化之后,分散到多個(gè)value中 * 從而降低更新熱點(diǎn),而需要得到當(dāng)前值的時(shí)候,直接 * 將所有cell中的value與base相加即可,但是跟 * AtomicLong(compare and change -> xadd)的CAS不同, * incrementAndGet操作及其變種 * 可以返回更新后的值,而LongAdder返回的是void */public class LongAdder { public void add(long x) { Cell[] as; long b, v; int m; Cell a; /** * 如果是第一次執(zhí)行,則直接case操作base */ if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; /** * as數(shù)組為空(null或者size為0) * 或者當(dāng)前線程取模as數(shù)組大小為空 * 或者cas更新Cell失敗 */ if (as == null || (m = as.length - 1) < 0 ||(a = as[getProbe() & m]) == null ||!(uncontended = a.cas(v = a.value, v + x)))longAccumulate(x, null, uncontended); } } public long sum() { //通過(guò)累加base與cells數(shù)組中的value從而獲得sum Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null) sum += a.value; } } return sum; }}/** * openjdk.java.net/jeps/142 */@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField('value')); } catch (Exception e) { throw new Error(e); } }}abstract class Striped64 extends Number { final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { /** * 若getProbe為0,說(shuō)明需要初始化 */ ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false;// True if last slot nonempty /** * 失敗重試 */ for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) {/** * 若as數(shù)組已經(jīng)初始化,(n-1) & h 即為取模操作,相對(duì) % 效率要更高 */if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) {//這里casCellsBusy的作用其實(shí)就是一個(gè)spin lock //可能會(huì)有多個(gè)線程執(zhí)行了`Cell r = new Cell(x);`, //因此這里進(jìn)行cas操作,避免線程安全的問(wèn)題,同時(shí)前面在判斷一次 //避免正在初始化的時(shí)其他線程再進(jìn)行額外的cas操作 boolean created = false; try {// Recheck under lockCell[] rs; int m, j;//重新檢查一下是否已經(jīng)創(chuàng)建成功了if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true;} } finally {cellsBusy = 0; } if (created)break; continue; // Slot 現(xiàn)在是非空了,continue到下次循環(huán)重試 } } collide = false;}else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehashelse if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break;//若cas更新成功則跳出循環(huán),否則繼續(xù)重試else if (n >= NCPU || cells != as) // 最大只能擴(kuò)容到CPU數(shù)目, 或者是已經(jīng)擴(kuò)容成功,這里只有的本地引用as已經(jīng)過(guò)期了 collide = false; // At max size or staleelse if (!collide) collide = true;else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // 擴(kuò)容 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i)rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table}//重新計(jì)算hash(異或)從而嘗試找到下一個(gè)空的sloth = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try { // Initialize table if (cells == as) { /** * 默認(rèn)size為2 */ Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; }} finally { cellsBusy = 0;}if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : // 若已經(jīng)有另一個(gè)線程在初始化,那么嘗試直接更新base fn.applyAsLong(v, x))))break; // Fall back on using base } } final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } static final int getProbe() { /** * 通過(guò)Unsafe獲取Thread中threadLocalRandomProbe的值 */ return UNSAFE.getInt(Thread.currentThread(), PROBE); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> sk = Striped64.class;BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField('base'));CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField('cellsBusy'));Class<?> tk = Thread.class;//返回Field在內(nèi)存中相對(duì)于對(duì)象內(nèi)存地址的偏移量PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField('threadLocalRandomProbe')); } catch (Exception e) {throw new Error(e); } }}

由于Cell相對(duì)來(lái)說(shuō)比較占內(nèi)存,因此這里采用懶加載的方式,在無(wú)競(jìng)爭(zhēng)的情況下直接更新base域,在第一次發(fā)生競(jìng)爭(zhēng)的時(shí)候(CAS失敗)就會(huì)創(chuàng)建一個(gè)大小為2的cells數(shù)組,每次擴(kuò)容都是加倍,只到達(dá)到CPU核數(shù)。同時(shí)我們知道擴(kuò)容數(shù)組等行為需要只能有一個(gè)線程同時(shí)執(zhí)行,因此需要一個(gè)鎖,這里通過(guò)CAS更新cellsBusy來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的spin lock。

數(shù)組訪問(wèn)索引是通過(guò)Thread里的threadLocalRandomProbe域取模實(shí)現(xiàn)的,這個(gè)域是ThreadLocalRandom更新的,cells的數(shù)組大小被限制為CPU的核數(shù),因?yàn)榧词褂谐^(guò)核數(shù)個(gè)線程去更新,但是每個(gè)線程也只會(huì)和一個(gè)CPU綁定,更新的時(shí)候頂多會(huì)有cpu核數(shù)個(gè)線程,因此我們只需要通過(guò)hash將不同線程的更新行為離散到不同的slot即可。

我們知道線程、線程池會(huì)被關(guān)閉或銷(xiāo)毀,這個(gè)時(shí)候可能這個(gè)線程之前占用的slot就會(huì)變成沒(méi)人用的,但我們也不能清除掉,因?yàn)橐话鉾eb應(yīng)用都是長(zhǎng)時(shí)間運(yùn)行的,線程通常也會(huì)動(dòng)態(tài)創(chuàng)建、銷(xiāo)毀,很可能一段時(shí)間后又會(huì)被其他線程占用,而對(duì)于短時(shí)間運(yùn)行的,例如單元測(cè)試,清除掉有啥意義呢?

總結(jié)

總的來(lái)說(shuō),LongAdder從性能上來(lái)說(shuō)要遠(yuǎn)遠(yuǎn)好于AtomicLong,一般情況下是可以直接替代AtomicLong使用的,Netty也通過(guò)一個(gè)接口封裝了這兩個(gè)類(lèi),在Java8下直接采用LongAdder,但是AtomicLong的一系列方法不僅僅可以自增,還可以獲取更新后的值,如果是例如獲取一個(gè)全局唯一的ID還是采用AtomicLong會(huì)方便一點(diǎn)。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 青岛美佳乐清洁工程有限公司|青岛油烟管道清洗|酒店|企事业单位|学校工厂厨房|青岛油烟管道清洗 插针变压器-家用电器变压器-工业空调变压器-CD型电抗器-余姚市中驰电器有限公司 | 钛板_钛管_钛棒_钛盘管-无锡市盛钛科技有限公司 | 工业铝型材-铝合金电机壳-铝排-气动执行器-山东永恒能源集团有限公司 | 螺杆真空泵_耐腐蚀螺杆真空泵_水环真空泵_真空机组_烟台真空泵-烟台斯凯威真空 | 房间温控器|LonWorks|海思 | 金环宇|金环宇电线|金环宇电缆|金环宇电线电缆|深圳市金环宇电线电缆有限公司|金环宇电缆集团 | 干粉砂浆设备_干混砂浆生产线_腻子粉加工设备_石膏抹灰砂浆生产成套设备厂家_干粉混合设备_砂子烘干机--郑州铭将机械设备有限公司 | 灌木树苗-绿化苗木-常绿乔木-价格/批发/基地 - 四川成都途美园林 | 天津蒸汽/热水锅炉-电锅炉安装维修直销厂家-天津鑫淼暖通设备有限公司 | 高温链条油|高温润滑脂|轴承润滑脂|机器人保养用油|干膜润滑剂-东莞卓越化学 | 液压扳手-高品质液压扳手供应商 - 液压扳手, 液压扳手供应商, 德国进口液压拉马 | 铝镁锰板_铝镁锰合金板_铝镁锰板厂家_铝镁锰金属屋面板_安徽建科 | 电主轴,车床电磨头,变频制动电机-博山鸿达特种电机 | 仿清水混凝土_清水混凝土装修_施工_修饰_保护剂_修补_清水混凝土修复-德州忠岭建筑装饰工程 | 超声波清洗机_大型超声波清洗机_工业超声波清洗设备-洁盟清洗设备 | 编织人生 - 权威手工编织网站,编织爱好者学习毛衣编织的门户网站,织毛衣就上编织人生网-编织人生 | 房屋质量检测-厂房抗震鉴定-玻璃幕墙检测-房屋安全鉴定机构 | 热镀锌槽钢|角钢|工字钢|圆钢|H型钢|扁钢|花纹板-天津千百顺钢铁贸易有限公司 | 定制/定做衬衫厂家/公司-衬衫订做/订制价格/费用-北京圣达信 | 垃圾压缩设备_垃圾处理设备_智能移动式垃圾压缩设备--山东明莱环保设备有限公司 | Eiafans.com_环评爱好者 环评网|环评论坛|环评报告公示网|竣工环保验收公示网|环保验收报告公示网|环保自主验收公示|环评公示网|环保公示网|注册环评工程师|环境影响评价|环评师|规划环评|环评报告|环评考试网|环评论坛 - Powered by Discuz! | 电磁流量计厂家_涡街流量计厂家_热式气体流量计-青天伟业仪器仪表有限公司 | 佛山市德信昌电子有限公司| 仿真植物|仿真树|仿真花|假树|植物墙 - 广州天昆仿真植物有限公司 | 云南成人高考_云南成考网| 重庆监控_电子围栏设备安装公司_门禁停车场管理系统-劲浪科技公司 | PVC地板|PVC塑胶地板|PVC地板厂家|地板胶|防静电地板-无锡腾方装饰材料有限公司-咨询热线:4008-798-128 | 电动球阀_不锈钢电动球阀_电动三通球阀_电动调节球阀_上海湖泉阀门有限公司 | 小港信息港-鹤壁信息港 鹤壁老百姓便民生活信息网站 | 桁架机器人_桁架机械手_上下料机械手_数控车床机械手-苏州清智科技装备制造有限公司 | Dataforth隔离信号调理模块-信号放大模块-加速度振动传感器-北京康泰电子有限公司 | 广州工业氧气-工业氩气-工业氮气-二氧化碳-广州市番禺区得力气体经营部 | 温州中研白癜风专科_温州治疗白癜风_温州治疗白癜风医院哪家好_温州哪里治疗白癜风 | 卫生纸复卷机|抽纸机|卫生纸加工设备|做卫生纸机器|小型卫生纸加工需要什么设备|卫生纸机器设备多少钱一台|许昌恒源纸品机械有限公司 | 考勤系统_人事考勤管理系统_本地部署BS考勤系统_考勤软件_天时考勤管理专家 | 烘干设备-热泵烘干机_广东雄贵能源设备有限公司 | 带式压滤机_污泥压滤机_污泥脱水机_带式过滤机_带式压滤机厂家-河南恒磊环保设备有限公司 | 苗木价格-苗木批发-沭阳苗木基地-沭阳花木-长之鸿园林苗木场 | 土壤检测仪器_行星式球磨仪_土壤团粒分析仪厂家_山东莱恩德智能科技有限公司 | 广域铭岛Geega(际嘉)工业互联网平台-以数字科技引领行业跃迁 | 密封无忧网 _ 专业的密封产品行业信息网 |