java中并發(fā)Queue種類與各自API特點(diǎn)以及使用場景說明
隊(duì)列是一種數(shù)據(jù)結(jié)構(gòu).它有兩個基本操作:在隊(duì)列尾部加入一個元素,和從隊(duì)列頭部移除一個元素(注意不要弄混隊(duì)列的頭部和尾部)
就是說,隊(duì)列以一種先進(jìn)先出的方式管理數(shù)據(jù),如果你試圖向一個 已經(jīng)滿了的阻塞隊(duì)列中添加一個元素或者是從一個空的阻塞隊(duì)列中移除一個元索,將導(dǎo)致線程阻塞.
在多線程進(jìn)行合作時,阻塞隊(duì)列是很有用的工具。工作者線程可以定期地把中間結(jié)果存到阻塞隊(duì)列中而其他工作者線程把中間結(jié)果取出并在將來修改它們。隊(duì)列會自動平衡負(fù)載。
如果第一個線程集運(yùn)行得比第二個慢,則第二個 線程集在等待結(jié)果時就會阻塞。如果第一個線程集運(yùn)行得快,那么它將等待第二個線程集趕上來.
說白了,就是先進(jìn)先出,線程安全!
java中并發(fā)隊(duì)列都是在java.util.concurrent并發(fā)包下的,Queue接口與List、Set同一級別,都是繼承了Collection接口,最近學(xué)習(xí)了java中的并發(fā)Queue的所有子類應(yīng)用場景,這里記錄分享一下:
1.1 這里可以先用wait與notify(腦忒fai) 模擬一下隊(duì)列的增刪數(shù)據(jù),簡單了解一下隊(duì)列:import java.util.LinkedList;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * 模擬隊(duì)列增刪數(shù)據(jù) * @author houzheng */public class MyQueue { //元素集合 private LinkedList<Object> list=new LinkedList<Object>(); //計(jì)數(shù)器(同步),判斷集合元素?cái)?shù)量 private AtomicInteger count=new AtomicInteger(); //集合上限與下限,final必須指定初值 private final int minSize=0; private final int maxSize; //構(gòu)造器指定最大值 public MyQueue(int maxSize) {this.maxSize = maxSize; } //初始化對象,用于加鎖,也可直接用this private Object lock=new Object(); //put方法:往集合中添加元素,如果集合元素已滿,則此線程阻塞,直到有空間再繼續(xù) public void put(Object obj){synchronized (lock) { while(count.get()==this.maxSize){try { lock.wait();} catch (InterruptedException e) { e.printStackTrace();} } list.add(obj); //計(jì)數(shù)器加一 count.incrementAndGet(); System.out.println('放入元素:'+obj); //喚醒另一個線程,(處理極端情況:集合一開始就是空,此時take線程會一直等待) lock.notify();} } //take方法:從元素中取數(shù)據(jù),如果集合為空,則線程阻塞,直到集合不為空再繼續(xù) public Object take(){Object result=null;synchronized(lock){ while(count.get()==this.minSize){try { lock.wait();} catch (InterruptedException e) { e.printStackTrace();} } //移除第一個 result=list.removeFirst(); //計(jì)數(shù)器減一 count.decrementAndGet(); System.out.println('拿走元素:'+result); //喚醒另一個線程,(處理極端情況:集合一開始就是滿的,此時put線程會一直等待) lock.notify();}return result; } public int getSize(){return this.count.get(); } public static void main(String[] args) {//創(chuàng)建集合容器MyQueue queue=new MyQueue(5);queue.put('1');queue.put('2');queue.put('3');queue.put('4');queue.put('5');System.out.println('當(dāng)前容器長度為:'+queue.getSize());Thread t1=new Thread(()->{ queue.put('6'); queue.put('7');},'t1');Thread t2=new Thread(()->{ Object take1 = queue.take(); Object take2 = queue.take();},'t2');//測試極端情況,兩秒鐘后再執(zhí)行另一個線程t1.start();try { TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) { e.printStackTrace();}t2.start(); }}
這里用線程通信的方式簡單模擬了隊(duì)列的進(jìn)出,那么接下來就正式進(jìn)入java的并發(fā)隊(duì)列:
二 并發(fā)QueueJDK中并發(fā)隊(duì)列提供了兩種實(shí)現(xiàn),一種是高性能隊(duì)列ConcurrentLinkedQueue,一種是阻塞隊(duì)列BlockingQueue,兩種都繼承自Queue:
1ConcurrentLinkedQueue這是一個使用于高并發(fā)場景的隊(duì)列(額,各位看這塊博客的小朋友,最好對線程基礎(chǔ)比較熟悉再來看,當(dāng)然我也在拼命學(xué)習(xí)啦,哈哈哈),主要是無鎖的方式,他的想能要比BlockingQueue好
是基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列,先進(jìn)先出,不允許有null元素,廢話不多說,上demo:
這種queue比較簡單,沒什么好說的,和ArrayList一樣用就可以,關(guān)鍵是BlockingQUeue
2BlockingQueueblockingQueue主要有5中實(shí)現(xiàn),我感覺都挺有意思的,其中幾種還比較常用就都學(xué)習(xí)了下,這里都介紹下:
2.1ArrayBlockingQueue
@Testpublic void test02() throws Exception{ //必須指定隊(duì)列長度 ArrayBlockingQueue<String> abq=new ArrayBlockingQueue<String>(2); abq.add('a'); //add :添加元素,如果BlockingQueue可以容納,則返回true,否則拋異常,支持添加集合 System.out.println(abq.offer('b'));//容量如果不夠,返回false //offer: 如果可能的話,添加元素,即如果BlockingQueue可以容納,則返回true,否則返回false,支持設(shè)置超時時間 //設(shè)置超時,如果超過時間就不添加,返回false, abq.offer('d', 2, TimeUnit.SECONDS);// 添加的元素,時長,單位 //put 添加元素,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù). abq.put('d');//會一直等待 //poll 取走頭部元素,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null,支持設(shè)置超時時間 abq.poll(); abq.poll(2,TimeUnit.SECONDS);//兩秒取不到返回null //take() 取走頭部元素,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到Blocking有新的對象被加入為止 abq.take(); //取出頭部元素,但不刪除 abq.element(); //drainTo() //一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù)),通過該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。 List list=new ArrayList(); abq.drainTo(list,2);//將隊(duì)列中兩個元素取到list中,取走后隊(duì)列中就沒有取走的元素 System.out.println(list); //[a,b] System.out.println(abq); //[]}
2.2 LinkedBlockingQueue
@Testpublic void test03(){ LinkedBlockingQueue lbq=new LinkedBlockingQueue();//可指定容量,也可不指定 lbq.add('a'); lbq.add('b'); lbq.add('c'); //API與ArrayBlockingQueue相同 //是否包含 System.out.println(lbq.contains('a')); //移除頭部元素或者指定元素 remove('a') System.out.println(lbq.remove()); //轉(zhuǎn)數(shù)組 Object[] array = lbq.toArray(); //element 取出頭部元素,但不刪除 System.out.println(lbq.element()); System.out.println(lbq.element()); System.out.println(lbq.element());}
2.3 SynchronousQueue
public static void main(String[] args) { SynchronousQueue<String> sq=new SynchronousQueue<String>(); // iterator() 永遠(yuǎn)返回空,因?yàn)槔锩鏇]東西。 // peek() 永遠(yuǎn)返回null /** * isEmpty()永遠(yuǎn)是true。 * remainingCapacity() 永遠(yuǎn)是0。 * remove()和removeAll() 永遠(yuǎn)是false。 */ new Thread(()->{try { //取出并且remove掉queue里的element(認(rèn)為是在queue里的。。。),取不到東西他會一直等。 System.out.println(sq.take());} catch (InterruptedException e) { e.printStackTrace();} }).start(); new Thread(()->{try { //offer() 往queue里放一個element后立即返回, //如果碰巧這個element被另一個thread取走了,offer方法返回true,認(rèn)為offer成功;否則返回false //true ,上面take線程一直在等, ////下面剛offer進(jìn)去就被拿走了,返回true,如果offer線程先執(zhí)行,則返回false System.out.println(sq.offer('b')); } catch (Exception e) { e.printStackTrace();} }).start(); new Thread(()->{try { //往queue放進(jìn)去一個element以后就一直wait直到有其他thread進(jìn)來把這個element取走 sq.put('a');} catch (Exception e) { e.printStackTrace();} }).start();}
2.4 PriorityBlockingQueue
@Testpublic void test04() throws Exception{ //隊(duì)列里元素必須實(shí)現(xiàn)Comparable接口,用來決定優(yōu)先級 PriorityBlockingQueue<String> pbq=new PriorityBlockingQueue<String>(); pbq.add('b'); pbq.add('g'); pbq.add('a'); pbq.add('c'); //獲取的時候會根據(jù)優(yōu)先級取元素,插入的時候不會排序,節(jié)省性能 //System.out.println(pbq.take());//a,獲取時會排序,按優(yōu)先級獲取 System.out.println(pbq.toString());//如果前面沒有取值,直接syso也不會排序 Iterator<String> iterator = pbq.iterator(); while(iterator.hasNext()){System.out.println(iterator.next()); }}@Testpublic void test05(){ PriorityBlockingQueue<Person> pbq=new PriorityBlockingQueue<Person>(); Person p2=new Person('姚振',20); Person p1=new Person('侯征',24); Person p3=new Person('何毅',18); Person p4=new Person('李世彪',22); pbq.add(p1); pbq.add(p2); pbq.add(p3); pbq.add(p4); System.out.println(pbq);//沒有按優(yōu)先級排序 try {//只要take獲取元素就會按照優(yōu)先級排序,獲取一次就全部排好序了,后面就會按優(yōu)先級迭代pbq.take(); } catch (InterruptedException e) {e.printStackTrace(); } //按年齡排好了序 for (Iterator iterator = pbq.iterator(); iterator.hasNext();) {Person person = (Person) iterator.next();System.out.println(person); }}
2.5 最后說一下DelayQueue ,這里用個網(wǎng)上很經(jīng)典的例子,網(wǎng)吧上網(wǎng)計(jì)時
網(wǎng)民實(shí)體queue中元素
//網(wǎng)民public class Netizen implements Delayed { //身份證private String ID;//名字private String name;//上網(wǎng)截止時間private long playTime;//比較優(yōu)先級,時間最短的優(yōu)先@Overridepublic int compareTo(Delayed o) { Netizen netizen=(Netizen) o; return this.getDelay(TimeUnit.SECONDS)-o.getDelay(TimeUnit.SECONDS)>0?1:0;}public Netizen(String iD, String name, long playTime) { ID = iD; this.name = name; this.playTime = playTime;}//獲取上網(wǎng)時長,即延時時長@Overridepublic long getDelay(TimeUnit unit) { //上網(wǎng)截止時間減去現(xiàn)在當(dāng)前時間=時長 return this.playTime-System.currentTimeMillis();}
網(wǎng)吧類:
//網(wǎng)吧public class InternetBar implements Runnable { //網(wǎng)民隊(duì)列,使用延時隊(duì)列private DelayQueue<Netizen> dq=new DelayQueue<Netizen>();//上網(wǎng)public void startPlay(String id,String name,Integer money){ //截止時間= 錢數(shù)*時間+當(dāng)前時間(1塊錢1秒) Netizen netizen=new Netizen(id,name,1000*money+System.currentTimeMillis()); System.out.println(name+'開始上網(wǎng)計(jì)費(fèi)......'); dq.add(netizen);}//時間到下機(jī)public void endTime(Netizen netizen){ System.out.println(netizen.getName()+'余額用完,下機(jī)');}@Overridepublic void run() { //線程,監(jiān)控每個網(wǎng)民上網(wǎng)時長 while(true){try { //除非時間到.否則會一直等待,直到取出這個元素為止 Netizen netizen=dq.take(); endTime(netizen);}catch (InterruptedException e) { e.printStackTrace();} }}public static void main(String[] args) { //新建一個網(wǎng)吧 InternetBar internetBar=new InternetBar(); //來了三個網(wǎng)民上網(wǎng) internetBar.startPlay('001','侯征',3); internetBar.startPlay('002','姚振',7); internetBar.startPlay('003','何毅',5);Thread t1=new Thread(internetBar);t1.start(); }}
這樣就可以完美實(shí)現(xiàn)業(yè)務(wù)需求了
結(jié)果
,
這塊東西比較深,還需要不斷加強(qiáng)學(xué)習(xí)實(shí)踐才行!!
Java中的Queue和自定義堆棧Queue:單向- 隊(duì)列通常 FIFO (先進(jìn)先出)
- 優(yōu)先級隊(duì)列和堆棧 LIFO (后進(jìn)先出)
package com.bjsxt.others.que;import java.util.ArrayDeque;import java.util.Queue;/** * 使用隊(duì)列模擬銀行存款業(yè)務(wù) * @author Administrator * */public class Demo01 { /** * @param args */ public static void main(String[] args) { Queue<Request> que =new ArrayDeque<Request>(); //模擬排隊(duì)情況 for(int i=0;i<10;i++){ final int num =i; que.offer(new Request(){//應(yīng)用匿名內(nèi)部類對象只能訪問 final 修飾的變量 @Override public void deposit() { System.out.println('第'+num+'個人,辦理存款業(yè)務(wù),存款額度為:'+(Math.random()*10000)); } }); } dealWith(que); } //處理業(yè)務(wù) public static void dealWith(Queue<Request> que){ Request req =null; while(null!=(req=que.poll())){ req.deposit(); } }}interface Request{ //存款 void deposit();}自定義堆棧
package com.bjsxt.others.que;import java.util.ArrayDeque;import java.util.Deque;/** * 使用隊(duì)列實(shí)現(xiàn)自定義堆棧 * 1、彈 * 2、壓 * 3、獲取頭 * @author Administrator * * @param <E> */public class MyStack<E> { //容器 private Deque<E> container =new ArrayDeque<E>(); //容量 private int cap; public MyStack(int cap) { super(); this.cap = cap; } //壓棧 public boolean push(E e){ if(container.size()+1>cap){ return false; } return container.offerLast(e); } //彈棧 public E pop(){ return container.pollLast(); } //獲取 public E peek(){ return container.peekLast(); } public int size(){ return this.container.size(); }} package com.bjsxt.others.que;//測試自定義堆棧public class Demo02 { /** * @param args */ public static void main(String[] args) { MyStack<String> backHistory =new MyStack<String>(3); backHistory.push('www.baidu.com'); backHistory.push('www.google.com'); backHistory.push('www.sina.com'); backHistory.push('www.bjsxt.cn'); System.out.println('大小:'+backHistory.size()); //遍歷 String item=null; while(null!=(item=backHistory.pop())){ System.out.println(item); } }}
以上為個人經(jīng)驗(yàn),希望能給大家一個參考,也希望大家多多支持好吧啦網(wǎng)。
相關(guān)文章:
1. 存儲于xml中需要的HTML轉(zhuǎn)義代碼2. python 浮點(diǎn)數(shù)四舍五入需要注意的地方3. Java GZip 基于內(nèi)存實(shí)現(xiàn)壓縮和解壓的方法4. python開發(fā)一款翻譯工具5. 利用CSS制作3D動畫6. jsp+servlet簡單實(shí)現(xiàn)上傳文件功能(保存目錄改進(jìn))7. Springboot 全局日期格式化處理的實(shí)現(xiàn)8. 完美解決vue 中多個echarts圖表自適應(yīng)的問題9. SpringBoot+TestNG單元測試的實(shí)現(xiàn)10. PHP實(shí)現(xiàn)簡單線性回歸之?dāng)?shù)學(xué)庫的重要性
