如何使用JCTools實(shí)現(xiàn)Java并發(fā)程序
在本文中,我們將介紹JCTools(Java并發(fā)工具)庫。
簡(jiǎn)單地說,這提供了許多適用于多線程環(huán)境的實(shí)用數(shù)據(jù)結(jié)構(gòu)。
非阻塞算法傳統(tǒng)上,在可變共享狀態(tài)下工作的多線程代碼使用鎖來確保數(shù)據(jù)一致性和發(fā)布(一個(gè)線程所做的更改對(duì)另一個(gè)線程可見)。
這種方法有許多缺點(diǎn):
線程在試圖獲取鎖時(shí)可能會(huì)被阻塞,在另一個(gè)線程的操作完成之前不會(huì)取得任何進(jìn)展—這有效地防止了并行性 鎖爭(zhēng)用越重,JVM處理調(diào)度線程、管理爭(zhēng)用和等待線程隊(duì)列的時(shí)間就越多,實(shí)際工作就越少 如果涉及多個(gè)鎖,并且它們以錯(cuò)誤的順序獲取/釋放,則可能出現(xiàn)死鎖 優(yōu)先級(jí)反轉(zhuǎn)的危險(xiǎn)是可能的——高優(yōu)先級(jí)線程被鎖定,試圖獲得由低優(yōu)先級(jí)線程持有的鎖 大多數(shù)情況下,使用粗粒度鎖會(huì)嚴(yán)重?fù)p害并行性—細(xì)粒度鎖需要更仔細(xì)的設(shè)計(jì),增加鎖開銷,并且更容易出錯(cuò)另一種方法是使用非阻塞算法,即任何線程的故障或掛起都不會(huì)導(dǎo)致另一個(gè)線程的故障或掛起的算法。
如果所涉及的線程中至少有一個(gè)能夠在任意時(shí)間段內(nèi)取得進(jìn)展,即在處理過程中不會(huì)出現(xiàn)死鎖,則非阻塞算法是無鎖的。
此外,如果保證每個(gè)線程的進(jìn)程,這些算法是無等待的。
下面是一個(gè)非阻塞堆棧示例,它定義了基本狀態(tài):
public class ConcurrentStack<E> { AtomicReference<Node<E>> top = new AtomicReference<Node<E>>(); private static class Node <E> { public E item; public Node<E> next; // standard constructor }}
還有一些API方法:
public void push(E item){ Node<E> newHead = new Node<E>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while(!top.compareAndSet(oldHead, newHead));}public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) { return null; } newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item;}
我們可以看到,該算法使用細(xì)粒度比較和交換(CAS)指令,并且是無鎖的(即使多個(gè)線程調(diào)用top.compareAndSet()同時(shí),它們中的一個(gè)保證會(huì)成功)但不能無等待,因?yàn)椴荒鼙WCCAS最終會(huì)對(duì)任何特定線程成功。
依賴首先,讓我們將JCTools依賴項(xiàng)添加到pom.xml文件:
<dependency> <groupId>org.jctools</groupId> <artifactId>jctools-core</artifactId> <version>2.1.2</version></dependency>
請(qǐng)注意,Maven Central上提供了最新的可用版本。
JCTools隊(duì)列該庫提供了許多隊(duì)列以在多線程環(huán)境中使用,即一個(gè)或多個(gè)線程以線程安全的無鎖方式寫入隊(duì)列,一個(gè)或多個(gè)線程以線程安全的無鎖方式從隊(duì)列中讀取。
所有隊(duì)列實(shí)現(xiàn)的通用接口是org.jctools.queues.MessagePassingQueue。
隊(duì)列類型所有隊(duì)列都可以根據(jù)其生產(chǎn)者/消費(fèi)者策略進(jìn)行分類:
單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者?此類類使用前綴Spsc命名,例如SpscArrayQueue 單個(gè)生產(chǎn)者,多個(gè)消費(fèi)者?使用Spmc前綴,例如SpmcArrayQueue 多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者-使用Mpsc前綴,例如MpscArrayQueue 多個(gè)生產(chǎn)者、多個(gè)消費(fèi)者—使用Mpmc前綴,例如MpmcArrayQueue需要注意的是,在內(nèi)部沒有策略檢查,也就是說,如果使用不正確,隊(duì)列可能會(huì)無聲地發(fā)生故障。
例如,下面的測(cè)試從兩個(gè)線程填充單個(gè)生產(chǎn)者隊(duì)列并通過,即使不能保證使用者看到來自不同生產(chǎn)者的數(shù)據(jù):
SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);Thread producer1 = new Thread(() -> queue.offer(1));producer1.start();producer1.join();Thread producer2 = new Thread(() -> queue.offer(2));producer2.start();producer2.join();Set<Integer> fromQueue = new HashSet<>();Thread consumer = new Thread(() -> queue.drain(fromQueue::add));consumer.start();consumer.join();assertThat(fromQueue).containsOnly(1, 2);隊(duì)列實(shí)現(xiàn)
總結(jié)以上分類,以下是JCTools隊(duì)列列表:
SpscArrayQueue?單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,在內(nèi)部使用一個(gè)數(shù)組,限制容量 SpscLinkedQueue?單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,內(nèi)部使用鏈表,未綁定容量 SpscChunkedArrayQueue?單生產(chǎn)商、單消費(fèi)者,從初始容量開始,一直增長到最大容量 SpscGrowableArrayQueue?單生產(chǎn)者、單消費(fèi)者,從初始容量開始,一直增長到最大容量。這與SpscChunkedArrayQueue是相同的契約,唯一的區(qū)別是內(nèi)部塊管理。建議使用SpscChunkedArrayQueue,因?yàn)樗幸粋€(gè)簡(jiǎn)化的實(shí)現(xiàn) SpscUnboundedArrayQueue?單個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,在內(nèi)部使用數(shù)組,未綁定容量 SpmcArrayQueue?單個(gè)生產(chǎn)者、多個(gè)使用者,在內(nèi)部使用一個(gè)陣列,限制容量 MpscArrayQueue—多個(gè)生產(chǎn)者、單個(gè)消費(fèi)者在內(nèi)部使用一個(gè)陣列,限制容量 MpscLinkedQueue?多個(gè)生產(chǎn)者,單個(gè)消費(fèi)者,在內(nèi)部使用鏈表,未綁定容量 MpmcArrayQueue—多個(gè)生產(chǎn)者、多個(gè)消費(fèi)者在內(nèi)部使用一個(gè)陣列,限制容量 原子隊(duì)列前面提到的所有隊(duì)列都使用sun.misc.Unsafe. 然而,隨著java9和JEP-260的出現(xiàn),這個(gè)API在默認(rèn)情況下變得不可訪問。
因此,有其他隊(duì)列使用java.util.concurrent.atomic.AtomicLongFieldUpdater(公共API,性能較差)而不是sun.misc.Unsafe.
它們是從上面的隊(duì)列生成的,它們的名稱中間插入了單詞Atomic,例如SpscChunkedAtomicArrayQueue或MpmcAtomicArrayQueue。
如果可能,建議使用“常規(guī)”隊(duì)列,并且僅在sun.misc.Unsafe像Hot Java9+和JRockit一樣被禁止/無效。
容量所有JCTools隊(duì)列也可能具有最大容量或未綁定。當(dāng)隊(duì)列已滿且受容量限制時(shí),它將停止接受新元素。
在以下示例中,我們:
填滿隊(duì)列 確保在此之后停止接受新元素 從中排出,并確保之后可以添加更多元素請(qǐng)注意,為了可讀性,刪除了幾個(gè)代碼語句。
SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);CountDownLatch startConsuming = new CountDownLatch(1);CountDownLatch awakeProducer = new CountDownLatch(1);Thread producer = new Thread(() -> { IntStream.range(0, queue.capacity()).forEach(i -> { assertThat(queue.offer(i)).isTrue(); }); assertThat(queue.offer(queue.capacity())).isFalse(); startConsuming.countDown(); awakeProducer.await(); assertThat(queue.offer(queue.capacity())).isTrue();});producer.start();startConsuming.await();Set<Integer> fromQueue = new HashSet<>();queue.drain(fromQueue::add);awakeProducer.countDown();producer.join();queue.drain(fromQueue::add);assertThat(fromQueue).containsAll( IntStream.range(0, 17).boxed().collect(toSet()));其他數(shù)據(jù)結(jié)構(gòu)工具
JCTools還提供了一些非隊(duì)列數(shù)據(jù)結(jié)構(gòu)。
它們都列在下面:
NonBlockingHashMap?一個(gè)無鎖的ConcurrentHashMap替代方案,具有更好的伸縮性和通常更低的突變成本。它是實(shí)現(xiàn)sun.misc.Unsafe,因此,不建議在Java9+或JRockit環(huán)境中使用此類 NonBlockingHashMapLong?與NonBlockingHashMap類似,但使用基本長鍵 NonBlockingHashSet?一個(gè)簡(jiǎn)單的包裝器,圍繞著像JDK的java.util.Collections.newSetFromMap()一樣的NonBlockingHashMap NonBlockingIdentityHashMap?與NonBlockingHashMap類似,但按標(biāo)識(shí)比較鍵。 NonBlockingSetInt?一個(gè)多線程位向量集,實(shí)現(xiàn)為一個(gè)原始long數(shù)組。在無聲自動(dòng)裝箱的情況下工作無效 性能測(cè)試讓我們使用JMH來比較JDK的ArrayBlockingQueue和JCTools隊(duì)列的性能。JMH是Sun/Oracle JVM gurus提供的一個(gè)開源微基準(zhǔn)框架,它保護(hù)我們不受編譯器/JVM優(yōu)化算法的不確定性的影響。
請(qǐng)注意,為了提高可讀性,下面的代碼段遺漏了幾個(gè)語句。
public class MpmcBenchmark { @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK}) public volatile String implementation; public volatile Queue<Long> queue; @Benchmark @Group(GROUP_NAME) @GroupThreads(PRODUCER_THREADS_NUMBER) public void write(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && !queue.offer(1L)) { // intentionally left blank } } @Benchmark @Group(GROUP_NAME) @GroupThreads(CONSUMER_THREADS_NUMBER) public void read(Control control) { // noinspection StatementWithEmptyBody while (!control.stopMeasurement && queue.poll() == null) { // intentionally left blank } }}
結(jié)果:
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/opMpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/opMpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op
我們可以看到,MpmcArrayQueue的性能略好于MpmcAtomicArrayQueue,而ArrayBlockingQueue的速度慢了兩倍。
使用JCTools的缺點(diǎn)使用JCTools有一個(gè)重要的缺點(diǎn)——不可能強(qiáng)制正確使用庫類。例如,考慮在我們的大型成熟項(xiàng)目中開始使用MpscArrayQueue的情況(注意,必須有一個(gè)使用者)。
不幸的是,由于項(xiàng)目很大,有可能有人出現(xiàn)編程或配置錯(cuò)誤,現(xiàn)在從多個(gè)線程讀取隊(duì)列。這個(gè)系統(tǒng)看起來像以前一樣工作,但現(xiàn)在有可能消費(fèi)者錯(cuò)過了一些信息。這是一個(gè)真正的問題,可能會(huì)有很大的影響,是很難調(diào)試。
理想情況下,應(yīng)該可以運(yùn)行具有特定系統(tǒng)屬性的系統(tǒng),該屬性強(qiáng)制JCTools確保線程訪問策略。例如,本地/測(cè)試/暫存環(huán)境(而不是生產(chǎn)環(huán)境)可能已啟用它。遺憾的是,JCTools沒有提供這樣的屬性。
另一個(gè)需要考慮的問題是,盡管我們確保JCTools比JDK的對(duì)應(yīng)工具快得多,但這并不意味著我們的應(yīng)用程序獲得了與我們開始使用自定義隊(duì)列實(shí)現(xiàn)時(shí)相同的速度。大多數(shù)應(yīng)用程序不會(huì)在線程之間交換很多對(duì)象,而且大多是I/O綁定的。
結(jié)論現(xiàn)在,我們對(duì)JCTools提供的實(shí)用程序類有了基本的了解,并了解了它們?cè)谥剌d下與JDK的對(duì)應(yīng)類相比的性能。
總之,只有當(dāng)我們?cè)诰€程之間交換大量對(duì)象時(shí),才有必要使用該庫,即使這樣,也有必要非常小心地保留線程訪問策略。
以上示例的完整源代碼地址:https://github.com/eugenp/tutorials/tree/master/libraries-5
JCTools git地址:https://github.com/JCTools/JCTools
以上就是如何使用JCTools實(shí)現(xiàn)Java并發(fā)程序的詳細(xì)內(nèi)容,更多關(guān)于使用JCTools實(shí)現(xiàn)Java并發(fā)程序的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. JAMon(Java Application Monitor)備忘記2. SpringBoot+TestNG單元測(cè)試的實(shí)現(xiàn)3. Java GZip 基于內(nèi)存實(shí)現(xiàn)壓縮和解壓的方法4. IntelliJ IDEA設(shè)置默認(rèn)瀏覽器的方法5. Docker容器如何更新打包并上傳到阿里云6. VMware中如何安裝Ubuntu7. Springboot 全局日期格式化處理的實(shí)現(xiàn)8. python 浮點(diǎn)數(shù)四舍五入需要注意的地方9. idea配置jdk的操作方法10. 完美解決vue 中多個(gè)echarts圖表自適應(yīng)的問題
