电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術(shù)文章
文章詳情頁

C和Java沒那么香了,Serverless時代Rust即將稱王?

瀏覽:86日期:2022-08-10 09:21:47
目錄高并發(fā)模式初探C語言的高并發(fā)案例Java的高并發(fā)實(shí)現(xiàn)Go的高并發(fā)實(shí)現(xiàn)Rust的高并發(fā)實(shí)現(xiàn)總結(jié)高并發(fā)模式初探

在這個高并發(fā)時代最重要的設(shè)計模式無疑是生產(chǎn)者、消費(fèi)者模式,比如著名的消息隊列kafka其實(shí)就是一個生產(chǎn)者消費(fèi)者模式的典型實(shí)現(xiàn)。其實(shí)生產(chǎn)者消費(fèi)者問題,也就是有限緩沖問題,可以用以下場景進(jìn)行簡要描述,生產(chǎn)者生成一定量的產(chǎn)品放到庫房,并不斷重復(fù)此過程;與此同時,消費(fèi)者也在緩沖區(qū)消耗這些數(shù)據(jù),但由于庫房大小有限,所以生產(chǎn)者和消費(fèi)者之間步調(diào)協(xié)調(diào),生產(chǎn)者不會在庫房滿的情況放入端口,消費(fèi)者也不會在庫房空時消耗數(shù)據(jù)。詳見下圖:

C和Java沒那么香了,Serverless時代Rust即將稱王?

而如果在生產(chǎn)者與消費(fèi)者之間完美協(xié)調(diào)并保持高效,這就是高并發(fā)要解決的本質(zhì)問題。

C語言的高并發(fā)案例

筆者在前文曾經(jīng)介紹過TDEngine的相關(guān)代碼,其中Sheduler模塊的相關(guān)調(diào)度算法就使用了生產(chǎn)、消費(fèi)者模式進(jìn)行消息傳遞功能的實(shí)現(xiàn),也就是有多個生產(chǎn)者(producer)生成并不斷向隊列中傳遞消息,也有多個消費(fèi)者(consumer)不斷從隊列中取消息。

后面我們也會說明類型功能在Go、Java等高級語言中類似的功能已經(jīng)被封裝好了,但是在C語言中你就必須要用好互斥體( mutex)和信號量(semaphore)并協(xié)調(diào)他們之間的關(guān)系。由于C語言的實(shí)現(xiàn)是最復(fù)雜的,先來看結(jié)構(gòu)體設(shè)計和他的注釋:

typedef struct { char label[16];//消息內(nèi)容 sem_t emptySem;//此信號量代表隊列的可寫狀態(tài) sem_t fullSem;//此信號量代表隊列的可讀狀態(tài) pthread_mutex_t queueMutex;//此互斥體為保證消息不會被誤修改,保證線程程安全 int fullSlot;//隊尾位置 int emptySlot;//隊頭位置 int queueSize;#隊列長度 int numOfThreads;//同時操作的線程數(shù)量 pthread_t * qthread;//線程指針 SSchedMsg * queue;//隊列指針} SSchedQueue;

再來看Shceduler初始化函數(shù),這里需要特別說明的是,兩個信號量的創(chuàng)建,其中emptySem是隊列的可寫狀態(tài),初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度,fullSem是隊列的可讀狀態(tài),初始化時其值為0,即初始時隊列不可讀。具體代碼及我的注釋如下:

void *taosInitScheduler(int queueSize, int numOfThreads, char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; pSched->numOfThreads = numOfThreads; strcpy(pSched->label, label); if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { pError('init %s:queueMutex failed, reason:%s', pSched->label, strerror(errno)); goto _error; } //emptySem是隊列的可寫狀態(tài),初始化時其值為queueSize,即初始時隊列可寫,可接受消息長度為隊列長度。 if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { pError('init %s:empty semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; } //fullSem是隊列的可讀狀態(tài),初始化時其值為0,即初始時隊列不可讀 if (sem_init(&pSched->fullSem, 0, 0) != 0) { pError('init %s:full semaphore failed, reason:%s', pSched->label, strerror(errno)); goto _error; } if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { pError('%s: no enough memory for queue, reason:%s', pSched->label, strerror(errno)); goto _error; } memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0;//實(shí)始化時隊列為空,故隊頭和隊尾的位置都是0 pSched->emptySlot = 0;//實(shí)始化時隊列為空,故隊頭和隊尾的位置都是0 pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; i < pSched->numOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError('%s: failed to create rpc thread, reason:%s', pSched->label, strerror(errno)); goto _error; } } pTrace('%s scheduler is initialized, numOfThreads:%d', pSched->label, pSched->numOfThreads); return (void *)pSched;_error: taosCleanUpScheduler(pSched); return NULL;}

再來看讀消息的taosProcessSchedQueue函數(shù)這其實(shí)是消費(fèi)者一方的實(shí)現(xiàn),這個函數(shù)的主要邏輯是

1.使用無限循環(huán),只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)向下處理

2.在操作msg前,加入互斥體防止msg被誤用。

3.讀操作完畢后修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。同時退出互斥體。

4.對emptySem進(jìn)行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態(tài),且能接受的消息數(shù)量為6

具體代碼及注釋如下:

void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; //注意這里是個無限循環(huán),只要隊列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)處理 while (1) { if (sem_wait(&pSched->fullSem) != 0) { pError('wait %s fullSem failed, errno:%d, reason:%s', pSched->label, errno, strerror(errno)); if (errno == EINTR) {/* sem_wait is interrupted by interrupt, ignore and continue */continue; } } //加入互斥體防止msg被誤用。 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); //讀取完畢修改fullSlot的值,注意這為避免fullSlot溢出,需要對于queueSize取余。 pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; //讀取完畢修改退出互斥體 if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%sn', pSched->label, strerror(errno)); //讀取完畢對emptySem進(jìn)行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個消息后,emptySem的值為6,即可寫狀態(tài),且能接受的消息數(shù)量為6 if (sem_post(&pSched->emptySem) != 0) pError('post %s emptySem failed, reason:%sn', pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); }}

最后寫消息的taosScheduleTask函數(shù)也就是生產(chǎn)的實(shí)現(xiàn),其基本邏輯是

1.寫隊列前先對emptySem進(jìn)行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進(jìn)行post操作后,隊列才能進(jìn)行可寫狀態(tài)。

2.加入互斥體防止msg被誤操作,寫入完成后退出互斥體

3.寫隊列完成后對fullSem進(jìn)行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)向下。

int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)qhandle; if (pSched == NULL) { pError('sched is not ready, msg:%p is dropped', pMsg); return 0; } //在寫隊列前先對emptySem進(jìn)行減1操作,如emptySem原值為1,那么減1后為0,也就是隊列已滿,必須在讀取消息后,即emptySem進(jìn)行post操作后,隊列才能進(jìn)行可寫狀態(tài)。 if (sem_wait(&pSched->emptySem) != 0) pError('wait %s emptySem failed, reason:%s', pSched->label, strerror(errno));//加入互斥體防止msg被誤操作 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError('lock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError('unlock %s queueMutex failed, reason:%s', pSched->label, strerror(errno)); //在寫隊列前先對fullSem進(jìn)行加1操作,如fullSem原值為0,那么加1后為1,也就是隊列可讀,咱們上面介紹的讀取函數(shù)可以進(jìn)行處理。 if (sem_post(&pSched->fullSem) != 0) pError('post %s fullSem failed, reason:%s', pSched->label, strerror(errno)); return 0;}Java的高并發(fā)實(shí)現(xiàn)

從并發(fā)模型來看,Go和Rust都有channel這個概念,也都是通過Channel來實(shí)現(xiàn)線(協(xié))程間的同步,由于channel帶有讀寫狀態(tài)且保證數(shù)據(jù)順序,而且channel的封裝程度和效率明顯可以做的更高,因此Go和Rust官方都會建議使用channel(通信)來共享內(nèi)存,而不是使用共享內(nèi)存來通信。

為了讓幫助大家找到區(qū)別,我們先以Java為例來,看一下沒有channel的高級語言Java,生產(chǎn)者消費(fèi)者該如何實(shí)現(xiàn),代碼及注釋如下:

public class Storage { // 倉庫最大存儲量 private final int MAX_SIZE = 10; // 倉庫存儲的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 鎖 private final Lock lock = new ReentrantLock(); // 倉庫滿的信號量 private final Condition full = lock.newCondition(); // 倉庫空的信號量 private final Condition empty = lock.newCondition(); public void produce() {// 獲得鎖lock.lock();while (list.size() + 1 > MAX_SIZE) { System.out.println('【生產(chǎn)者' + Thread.currentThread().getName() + '】倉庫已滿'); try {full.await(); } catch (InterruptedException e) {e.printStackTrace(); }}list.add(new Object());System.out.println('【生產(chǎn)者' + Thread.currentThread().getName() + '】生產(chǎn)一個產(chǎn)品,現(xiàn)庫存' + list.size()); empty.signalAll();lock.unlock(); } public void consume() {// 獲得鎖lock.lock();while (list.size() == 0) { System.out.println('【消費(fèi)者' + Thread.currentThread().getName() + '】倉庫為空'); try {empty.await(); } catch (InterruptedException e) {e.printStackTrace(); }}list.remove();System.out.println('【消費(fèi)者' + Thread.currentThread().getName() + '】消費(fèi)一個產(chǎn)品,現(xiàn)庫存' + list.size()); full.signalAll();lock.unlock(); }}

在Java、C#這種面向?qū)ο螅菦]有channel語言中,生產(chǎn)者、消費(fèi)者模式至少要借助一個lock和兩個信號量共同完成。其中鎖的作用是保證同是時間,倉庫中只有一個用戶進(jìn)行數(shù)據(jù)的修改,而還需要表示倉庫滿的信號量,一旦達(dá)到倉庫滿的情況則將此信號量置為阻塞狀態(tài),從而阻止其它生產(chǎn)者再向倉庫運(yùn)商品了,反之倉庫空的信號量也是一樣,一旦倉庫空了,也要阻其它消費(fèi)者再前來消費(fèi)了。

Go的高并發(fā)實(shí)現(xiàn)

我們剛剛也介紹過了Go語言中官方推薦使用channel來實(shí)現(xiàn)協(xié)程間通信,所以不需要再添加lock和信號量就能實(shí)現(xiàn)模式了,以下代碼中我們通過子goroutine完成了生產(chǎn)者的功能,在在另一個子goroutine中實(shí)現(xiàn)了消費(fèi)者的功能,注意要阻塞主goroutine以確保子goroutine能夠執(zhí)行,從而輕而易舉的就這完成了生產(chǎn)者消費(fèi)者模式。下面我們就通過具體實(shí)踐中來看一下生產(chǎn)者消費(fèi)者模型的實(shí)現(xiàn)。

package mainimport ('fmt''time')func Product(ch chan<- int) { //生產(chǎn)者for i := 0; i < 3; i++ {fmt.Println('Product produceed', i)ch <- i //由于channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.}}func Consumer(ch <-chan int) {for i := 0; i < 3; i++ {j := <-ch //由于channel是goroutine安全的,所以此處沒有必要必須加鎖或者加lock操作.fmt.Println('Consmuer consumed ', j)}}func main() {ch := make(chan int)go Product(ch)//注意生產(chǎn)者與消費(fèi)者放在不同goroutine中g(shù)o Consumer(ch)//注意生產(chǎn)者與消費(fèi)者放在不同goroutine中time.Sleep(time.Second * 1)//防止主goroutine退出/*運(yùn)行結(jié)果并不確定,可能為Product produceed 0Product produceed 1Consmuer consumed 0Consmuer consumed 1Product produceed 2Consmuer consumed 2*/}

可以看到和Java比起來使用GO來實(shí)現(xiàn)并發(fā)式的生產(chǎn)者消費(fèi)者模式的確是更為清爽了。

Rust的高并發(fā)實(shí)現(xiàn)

不得不說Rust的難度實(shí)在太高了,雖然筆者之前在匯編、C、Java等方面的經(jīng)驗可以幫助我快速掌握Go語言。但是假期看了兩天Rust真想大呼告辭,這尼瑪也太勸退了。在Rust官方提供的功能中,其實(shí)并不包括多生產(chǎn)者、多消費(fèi)者的channel,std:sync空間下只有一個多生產(chǎn)者單消費(fèi)者(mpsc)的channel。其樣例實(shí)現(xiàn)如下:

use std::sync::mpsc;use std::thread;use std::time::Duration;fn main() { let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); let tx2 = mpsc::Sender::clone(&tx); thread::spawn(move || {let vals = vec![ String::from('1'), String::from('3'), String::from('5'), String::from('7'),];for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1));} }); thread::spawn(move || {let vals = vec![ String::from('11'), String::from('13'), String::from('15'), String::from('17'),];for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1));} }); thread::spawn(move || {let vals = vec![ String::from('21'), String::from('23'), String::from('25'), String::from('27'),];for val in vals { tx2.send(val).unwrap(); thread::sleep(Duration::from_secs(1));} }); for rec in rx {println!('Got: {}', rec); }}

可以看到在Rust下實(shí)現(xiàn)生產(chǎn)者消費(fèi)者是不難的,但是生產(chǎn)者可以clone多個,不過消費(fèi)者卻只能有一個,究其原因是因為Rust下沒有GC也就是垃圾回收功能,而想保證安全Rust就必須要對于變更使用權(quán)限進(jìn)行嚴(yán)格管理。在Rust下使用move關(guān)鍵字進(jìn)行變更的所有權(quán)轉(zhuǎn)移,但是按照Rust對于變更生產(chǎn)周期的管理規(guī)定,線程間權(quán)限轉(zhuǎn)移的所有權(quán)接收者在同一時間只能有一個,這也是Rust官方只提供MPSC的原因,

use std::thread;fn main() { let s = 'hello'; let handle = thread::spawn(move || {println!('{}', s); }); handle.join().unwrap();}

當(dāng)然Rust下有一個API比較貼心就是join,他可以所有子線程都執(zhí)行結(jié)束再退出主線程,這比Go中要手工阻塞還是要有一定的提高。而如果你想用多生產(chǎn)者、多消費(fèi)者的功能,就要入手crossbeam模塊了,這個模塊掌握起來難度也真的不低。

總結(jié)

通過上面的比較我們可以用一張表格來說明幾種主流語言的情況對比:

語言 安全性 運(yùn)行速度 進(jìn)程啟動速度 學(xué)習(xí)難度 C 低 極快 極快 困難 Java 高 一般 一般 一般 Go 高 較快 較快 一般 Rust 高 極快(基本比肩C) 極快(基本比肩C) 極困難

可以看到Rust以其高安全性、基本比肩C的運(yùn)行及啟動速度必將在Serverless的時代獨(dú)占鰲頭,Go基本也能緊隨其后,而C語言程序中難以避免的野指針,Java相對較低的運(yùn)行及啟動速度,可能都不太適用于函數(shù)式運(yùn)算的場景,Java在企業(yè)級開發(fā)的時代打敗各種C#之類的對手,但是在云時代好像還真沒有之前統(tǒng)治力那么強(qiáng)了,真可謂是打敗你的往往不是你的對手,而是其它空間的降維打擊。

這篇文章的內(nèi)容就到這了,希望能給你帶來幫助,也希望您可以多多關(guān)注好吧啦網(wǎng)的更多內(nèi)容!

標(biāo)簽: Java
主站蜘蛛池模板: 建大仁科-温湿度变送器|温湿度传感器|温湿度记录仪_厂家_价格-山东仁科 | 天津次氯酸钠酸钙溶液-天津氢氧化钠厂家-天津市辅仁化工有限公司 | 船用泵,船用离心泵,船用喷射泵,泰州隆华船舶设备有限公司 | 赛尔特智能移动阳光房-阳光房厂家-赛尔特建筑科技(广东)有限公司 | 广州展览设计公司_展台设计搭建_展位设计装修公司-众派展览装饰 广州展览制作工厂—[优简]直营展台制作工厂_展会搭建资质齐全 | 电缆故障测试仪_电缆故障定位仪_探测仪_检测仪器_陕西意联电气厂家 | 长沙印刷厂-包装印刷-画册印刷厂家-湖南省日大彩色印务有限公司 青州搬家公司电话_青州搬家公司哪家好「鸿喜」青州搬家 | 鑫铭东办公家具一站式定制采购-深圳办公家具厂家直销 | ETFE膜结构_PTFE膜结构_空间钢结构_膜结构_张拉膜_浙江萬豪空间结构集团有限公司 | 陕西鹏展科技有限公司| 查分易-成绩发送平台官网 | 食安观察网 | 骨密度仪-骨密度测定仪-超声骨密度仪-骨龄测定仪-天津开发区圣鸿医疗器械有限公司 | 青岛球场围网,青岛车间隔离网,青岛机器人围栏,青岛水源地围网,青岛围网,青岛隔离栅-青岛晟腾金属制品有限公司 | 北京宣传片拍摄_产品宣传片拍摄_宣传片制作公司-现像传媒 | 棉柔巾代加工_洗脸巾oem_一次性毛巾_浴巾生产厂家-杭州禾壹卫品科技有限公司 | 河南中专学校|职高|技校招生-河南中职中专网 | 首页_中夏易经起名网| 超声骨密度仪,双能X射线骨密度仪【起草单位】,骨密度检测仪厂家 - 品源医疗(江苏)有限公司 | 福建成考网-福建成人高考网 | 【化妆品备案】进口化妆品备案流程-深圳美尚美化妆品有限公司 | 高效复合碳源-多核碳源生产厂家-污水处理反硝化菌种一长隆科技库巴鲁 | 世界箱包品牌十大排名,女包小众轻奢品牌推荐200元左右,男包十大奢侈品牌排行榜双肩,学生拉杆箱什么品牌好质量好 - Gouwu3.com | 校服厂家,英伦校服定做工厂,园服生产定制厂商-东莞市艾咪天使校服 | 电主轴-高速精密电主轴-高速电机厂家-瑞德沃斯品牌有限公司 | 皮带输送机-大倾角皮带输送机-皮带输送机厂家-河南坤威机械 | HYDAC过滤器,HYDAC滤芯,现货ATOS油泵,ATOS比例阀-东莞市广联自动化科技有限公司 | 超声波焊接机,振动摩擦焊接机,激光塑料焊接机,超声波焊接模具工装-德召尼克(常州)焊接科技有限公司 | 西安标准厂房_陕西工业厂房_西咸新区独栋厂房_长信科技产业园官方网站 | 杭州中央空调维修_冷却塔/新风机柜/热水器/锅炉除垢清洗_除垢剂_风机盘管_冷凝器清洗-杭州亿诺能源有限公司 | 皮带式输送机械|链板式输送机|不锈钢输送机|网带输送机械设备——青岛鸿儒机械有限公司 | 冷油器-冷油器换管改造-连云港灵动列管式冷油器生产厂家 | 动力配电箱-不锈钢配电箱-高压开关柜-重庆宇轩机电设备有限公司 聚天冬氨酸,亚氨基二琥珀酸四钠,PASP,IDS - 远联化工 | 杭州顺源过滤机械有限公司官网-压滤机_板框压滤机_厢式隔膜压滤机厂家 | 中高频感应加热设备|高频淬火设备|超音频感应加热电源|不锈钢管光亮退火机|真空管烤消设备 - 郑州蓝硕工业炉设备有限公司 | 高低温万能试验机-复合材料万能试验机-馥勒仪器 | 高低温老化试验机-步入式/低温恒温恒湿试验机-百科 | 艾默生变频器,艾默生ct,变频器,ct驱动器,广州艾默生变频器,供水专用变频器,风机变频器,电梯变频器,艾默生变频器代理-广州市盟雄贸易有限公司官方网站-艾默生变频器应用解决方案服务商 | 厂房出租_厂房出售_产业园区招商_工业地产&nbsp;-&nbsp;中工招商网 | 真空泵维修保养,普发,阿尔卡特,荏原,卡西亚玛,莱宝,爱德华干式螺杆真空泵维修-东莞比其尔真空机电设备有限公司 | 钢结构-钢结构厂房-钢结构工程[江苏海逵钢构厂] |