电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Python如何實(shí)現(xiàn)線程間通信

瀏覽:5日期:2022-07-15 17:49:12

問題

你的程序中有多個(gè)線程,你需要在這些線程之間安全地交換信息或數(shù)據(jù)

解決方案

從一個(gè)線程向另一個(gè)線程發(fā)送數(shù)據(jù)最安全的方式可能就是使用 queue 庫(kù)中的隊(duì)列了。創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對(duì)象,這些線程通過使用 put() 和 get() 操作來向隊(duì)列中添加或者刪除元素。 例如:

from queue import Queuefrom threading import Thread# A thread that produces datadef producer(out_q): while True: # Produce some data ... out_q.put(data)# A thread that consumes datadef consumer(in_q): while True:# Get some data data = in_q.get() # Process the data ...# Create the shared queue and launch both threadsq = Queue()t1 = Thread(target=consumer, args=(q,))t2 = Thread(target=producer, args=(q,))t1.start()t2.start()

Queue 對(duì)象已經(jīng)包含了必要的鎖,所以你可以通過它在多個(gè)線程間多安全地共享數(shù)據(jù)。 當(dāng)使用隊(duì)列時(shí),協(xié)調(diào)生產(chǎn)者和消費(fèi)者的關(guān)閉問題可能會(huì)有一些麻煩。一個(gè)通用的解決方法是在隊(duì)列中放置一個(gè)特殊的值,當(dāng)消費(fèi)者讀到這個(gè)值的時(shí)候,終止執(zhí)行。例如:

from queue import Queuefrom threading import Thread# Object that signals shutdown_sentinel = object()# A thread that produces datadef producer(out_q): while running: # Produce some data ... out_q.put(data) # Put the sentinel on the queue to indicate completion out_q.put(_sentinel)# A thread that consumes datadef consumer(in_q): while True: # Get some data data = in_q.get() # Check for termination if data is _sentinel: in_q.put(_sentinel) break # Process the data ...

本例中有一個(gè)特殊的地方:消費(fèi)者在讀到這個(gè)特殊值之后立即又把它放回到隊(duì)列中,將之傳遞下去。這樣,所有監(jiān)聽這個(gè)隊(duì)列的消費(fèi)者線程就可以全部關(guān)閉了。 盡管隊(duì)列是最常見的線程間通信機(jī)制,但是仍然可以自己通過創(chuàng)建自己的數(shù)據(jù)結(jié)構(gòu)并添加所需的鎖和同步機(jī)制來實(shí)現(xiàn)線程間通信。最常見的方法是使用 Condition 變量來包裝你的數(shù)據(jù)結(jié)構(gòu)。下邊這個(gè)例子演示了如何創(chuàng)建一個(gè)線程安全的優(yōu)先級(jí)隊(duì)列

import heapqimport threadingclass PriorityQueue: def __init__(self): self._queue = [] self._count = 0 self._cv = threading.Condition() def put(self, item, priority): with self._cv: heapq.heappush(self._queue, (-priority, self._count, item)) self._count += 1 self._cv.notify() def get(self): with self._cv: while len(self._queue) == 0:self._cv.wait() return heapq.heappop(self._queue)[-1]

使用隊(duì)列來進(jìn)行線程間通信是一個(gè)單向、不確定的過程。通常情況下,你沒有辦法知道接收數(shù)據(jù)的線程是什么時(shí)候接收到的數(shù)據(jù)并開始工作的。不過隊(duì)列對(duì)象提供一些基本完成的特性,比如下邊這個(gè)例子中的 task_done() 和 join() :

from queue import Queuefrom threading import Thread# A thread that produces datadef producer(out_q): while running: # Produce some data ... out_q.put(data)# A thread that consumes datadef consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ... # Indicate completion in_q.task_done()# Create the shared queue and launch both threadsq = Queue()t1 = Thread(target=consumer, args=(q,))t2 = Thread(target=producer, args=(q,))t1.start()t2.start()# Wait for all produced items to be consumedq.join()

如果一個(gè)線程需要在一個(gè)“消費(fèi)者”線程處理完特定的數(shù)據(jù)項(xiàng)時(shí)立即得到通知,你可以把要發(fā)送的數(shù)據(jù)和一個(gè) Event 放到一起使用,這樣“生產(chǎn)者”就可以通過這個(gè)Event對(duì)象來監(jiān)測(cè)處理的過程了。示例如下:

from queue import Queuefrom threading import Thread, Event# A thread that produces datadef producer(out_q): while running: # Produce some data ... # Make an (data, event) pair and hand it to the consumer evt = Event() out_q.put((data, evt)) ... # Wait for the consumer to process the item evt.wait()# A thread that consumes datadef consumer(in_q): while True: # Get some data data, evt = in_q.get() # Process the data ... # Indicate completion evt.set()

討論

基于簡(jiǎn)單隊(duì)列編寫多線程程序在多數(shù)情況下是一個(gè)比較明智的選擇。從線程安全隊(duì)列的底層實(shí)現(xiàn)來看,你無需在你的代碼中使用鎖和其他底層的同步機(jī)制,這些只會(huì)把你的程序弄得亂七八糟。此外,使用隊(duì)列這種基于消息的通信機(jī)制可以被擴(kuò)展到更大的應(yīng)用范疇,比如,你可以把你的程序放入多個(gè)進(jìn)程甚至是分布式系統(tǒng)而無需改變底層的隊(duì)列結(jié)構(gòu)。 使用線程隊(duì)列有一個(gè)要注意的問題是,向隊(duì)列中添加數(shù)據(jù)項(xiàng)時(shí)并不會(huì)復(fù)制此數(shù)據(jù)項(xiàng),線程間通信實(shí)際上是在線程間傳遞對(duì)象引用。如果你擔(dān)心對(duì)象的共享狀態(tài),那你最好只傳遞不可修改的數(shù)據(jù)結(jié)構(gòu)(如:整型、字符串或者元組)或者一個(gè)對(duì)象的深拷貝。例如:

from queue import Queuefrom threading import Threadimport copy# A thread that produces datadef producer(out_q): while True: # Produce some data ... out_q.put(copy.deepcopy(data))# A thread that consumes datadef consumer(in_q): while True: # Get some data data = in_q.get() # Process the data ...

Queue 對(duì)象提供一些在當(dāng)前上下文很有用的附加特性。比如在創(chuàng)建 Queue 對(duì)象時(shí)提供可選的 size 參數(shù)來限制可以添加到隊(duì)列中的元素?cái)?shù)量。對(duì)于“生產(chǎn)者”與“消費(fèi)者”速度有差異的情況,為隊(duì)列中的元素?cái)?shù)量添加上限是有意義的。比如,一個(gè)“生產(chǎn)者”產(chǎn)生項(xiàng)目的速度比“消費(fèi)者” “消費(fèi)”的速度快,那么使用固定大小的隊(duì)列就可以在隊(duì)列已滿的時(shí)候阻塞隊(duì)列,以免未預(yù)期的連鎖效應(yīng)擴(kuò)散整個(gè)程序造成死鎖或者程序運(yùn)行失常。在通信的線程之間進(jìn)行“流量控制”是一個(gè)看起來容易實(shí)現(xiàn)起來困難的問題。如果你發(fā)現(xiàn)自己曾經(jīng)試圖通過擺弄隊(duì)列大小來解決一個(gè)問題,這也許就標(biāo)志著你的程序可能存在脆弱設(shè)計(jì)或者固有的可伸縮問題。 get() 和 put() 方法都支持非阻塞方式和設(shè)定超時(shí),例如:

import queueq = queue.Queue()try: data = q.get(block=False)except queue.Empty: ...try: q.put(item, block=False)except queue.Full: ...try: data = q.get(timeout=5.0)except queue.Empty: ...

這些操作都可以用來避免當(dāng)執(zhí)行某些特定隊(duì)列操作時(shí)發(fā)生無限阻塞的情況,比如,一個(gè)非阻塞的 put() 方法和一個(gè)固定大小的隊(duì)列一起使用,這樣當(dāng)隊(duì)列已滿時(shí)就可以執(zhí)行不同的代碼。比如輸出一條日志信息并丟棄。

def producer(q): ... try: q.put(item, block=False) except queue.Full: log.warning(’queued item %r discarded!’, item)

如果你試圖讓消費(fèi)者線程在執(zhí)行像 q.get() 這樣的操作時(shí),超時(shí)自動(dòng)終止以便檢查終止標(biāo)志,你應(yīng)該使用 q.get() 的可選參數(shù) timeout ,如下:

_running = Truedef consumer(q): while _running: try: item = q.get(timeout=5.0) # Process item ... except queue.Empty: pass

最后,有 q.qsize() , q.full() , q.empty() 等實(shí)用方法可以獲取一個(gè)隊(duì)列的當(dāng)前大小和狀態(tài)。但要注意,這些方法都不是線程安全的。可能你對(duì)一個(gè)隊(duì)列使用 empty() 判斷出這個(gè)隊(duì)列為空,但同時(shí)另外一個(gè)線程可能已經(jīng)向這個(gè)隊(duì)列中插入一個(gè)數(shù)據(jù)項(xiàng)。所以,你最好不要在你的代碼中使用這些方法。

以上就是Python如何實(shí)現(xiàn)線程間通信的詳細(xì)內(nèi)容,更多關(guān)于Python 線程間通信的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 北京普辉律师事务所官网_北京律师24小时免费咨询|法律咨询 | 今日娱乐圈——影视剧集_八卦娱乐_明星八卦_最新娱乐八卦新闻 | 微型气象仪_气象传感器_防爆气象传感器-天合传感器大全 | 小型单室真空包装机,食品单室真空包装机-百科 | 氧化铝球_高铝球_氧化铝研磨球-淄博誉洁陶瓷新材料有限公司 | 双吸泵,双吸泵厂家,OS双吸泵-山东博二泵业有限公司 | 工业机械三维动画制作 环保设备原理三维演示动画 自动化装配产线三维动画制作公司-南京燃动数字 聚合氯化铝_喷雾聚氯化铝_聚合氯化铝铁厂家_郑州亿升化工有限公司 | 爱佩恒温恒湿测试箱|高低温实验箱|高低温冲击试验箱|冷热冲击试验箱-您身边的模拟环境试验设备技术专家-合作热线:400-6727-800-广东爱佩试验设备有限公司 | 大连海岛旅游网>>大连旅游,大连海岛游,旅游景点攻略,海岛旅游官网 | 浙江筋膜枪-按摩仪厂家-制造商-肩颈按摩仪哪家好-温州市合喜电子科技有限公司 | 美国PARKER齿轮泵,美国PARKER柱塞泵,美国PARKER叶片泵,美国PARKER电磁阀,美国PARKER比例阀-上海维特锐实业发展有限公司二部 | 阿尔法-MDR2000无转子硫化仪-STM566 SATRA拉力试验机-青岛阿尔法仪器有限公司 | WF2户外三防照明配电箱-BXD8050防爆防腐配电箱-浙江沃川防爆电气有限公司 | 电动葫芦|环链电动葫芦-北京凌鹰名优起重葫芦 | 不锈钢轴流风机,不锈钢电机-许昌光维防爆电机有限公司(原许昌光维特种电机技术有限公司) | 混合反应量热仪-高温高压量热仪-微机差热分析仪DTA|凯璞百科 | uv固化机-丝印uv机-工业烤箱-五金蚀刻机-分拣输送机 - 保定市丰辉机械设备制造有限公司 | 自动售货机_无人售货机_专业的自动售货机运营商_免费投放售货机-广州富宏主官网 | 集菌仪_智能集菌仪_全封闭集菌仪_无菌检查集菌仪厂家-那艾 | 青海电动密集架_智能密集架_密集架价格-盛隆柜业青海档案密集架厂家 | 磁力抛光机_磁力研磨机_磁力去毛刺机_精密五金零件抛光设备厂家-冠古科技 | 青州开防盗门锁-配汽车芯片钥匙-保险箱钥匙-吉祥修锁店 | 打包钢带,铁皮打包带,烤蓝打包带-高密市金和金属制品厂 | ERP企业管理系统永久免费版_在线ERP系统_OA办公_云版软件官网 | 世纪豪门官网 世纪豪门集成吊顶加盟电话 世纪豪门售后电话 | 昆明挖掘机修理厂_挖掘机翻新再制造-昆明聚力工程机械维修有限公司 | 除甲醛公司-甲醛检测治理-杭州创绿家环保科技有限公司-室内空气净化十大品牌 | 防爆电机_ybx3系列电机_河南省南洋防爆电机有限公司 | 并离网逆变器_高频UPS电源定制_户用储能光伏逆变器厂家-深圳市索克新能源 | 耐高温风管_耐高温软管_食品级软管_吸尘管_钢丝软管_卫生级软管_塑料波纹管-东莞市鑫翔宇软管有限公司 | 西宁装修_西宁装修公司-西宁业之峰装饰-青海业之峰墅级装饰设计公司【官网】 | 成都租车_成都租车公司_成都租车网_众行宝 | 贴片电感_贴片功率电感_贴片绕线电感_深圳市百斯特电子有限公司 贴片电容代理-三星电容-村田电容-风华电容-国巨电容-深圳市昂洋科技有限公司 | 厚壁钢管-厚壁无缝钢管-小口径厚壁钢管-大口径厚壁钢管 - 聊城宽达钢管有限公司 | 洛阳网站建设_洛阳网站优化_网站建设平台_洛阳香河网络科技有限公司 | 丙烷/液氧/液氮气化器,丙烷/液氧/液氮汽化器-无锡舍勒能源科技有限公司 | 消防设施操作员考试报名时间,报名入口,报考条件 | 济南铝方通-济南铝方通价格-济南方通厂家-山东鲁方通建材有限公司 | 寮步纸箱厂_东莞纸箱厂 _东莞纸箱加工厂-东莞市寮步恒辉纸制品厂 | 【MBA备考网】-2024年工商管理硕士MBA院校/报考条件/培训/考试科目/提前面试/考试/学费-MBA备考网 | AGV叉车|无人叉车|AGV智能叉车|AGV搬运车-江西丹巴赫机器人股份有限公司 |