电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

python線程池如何使用

瀏覽:4日期:2022-07-24 09:51:12

線程池的使用

線程池的基類是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于創(chuàng)建線程池,而 ProcessPoolExecutor 用于創(chuàng)建進程池。

如果使用線程池/進程池來管理并發(fā)編程,那么只要將相應的 task 函數(shù)提交給線程池/進程池,剩下的事情就由線程池/進程池來搞定。

Exectuor 提供了如下常用方法:

submit(fn, *args, **kwargs):將 fn 函數(shù)提交給線程池。*args 代表傳給 fn 函數(shù)的參數(shù),*kwargs 代表以關鍵字參數(shù)的形式為 fn 函數(shù)傳入?yún)?shù)。 map(func, *iterables, timeout=None, chunksize=1):該函數(shù)類似于全局函數(shù) map(func, *iterables),只是該函數(shù)將會啟動多個線程,以異步方式立即對 iterables 執(zhí)行 map 處理。 shutdown(wait=True):關閉線程池。

程序將 task 函數(shù)提交(submit)給線程池后,submit 方法會返回一個 Future 對象,F(xiàn)uture 類主要用于獲取線程任務函數(shù)的返回值。由于線程任務會在新線程中以異步方式執(zhí)行,因此,線程執(zhí)行的函數(shù)相當于一個“將來完成”的任務,所以 Python 使用 Future 來代表。

實際上,在 Java 的多線程編程中同樣有 Future,此處的 Future 與 Java 的 Future 大同小異。

Future 提供了如下方法:

cancel():取消該 Future 代表的線程任務。如果該任務正在執(zhí)行,不可取消,則該方法返回 False;否則,程序會取消該任務,并返回 True。 cancelled():返回 Future 代表的線程任務是否被成功取消。 running():如果該 Future 代表的線程任務正在執(zhí)行、不可被取消,該方法返回 True。 done():如果該 Funture 代表的線程任務被成功取消或執(zhí)行完成,則該方法返回 True。 result(timeout=None):獲取該 Future 代表的線程任務最后返回的結果。如果 Future 代表的線程任務還未完成,該方法將會阻塞當前線程,其中 timeout 參數(shù)指定最多阻塞多少秒。 exception(timeout=None):獲取該 Future 代表的線程任務所引發(fā)的異常。如果該任務成功完成,沒有異常,則該方法返回 None。 add_done_callback(fn):為該 Future 代表的線程任務注冊一個“回調函數(shù)”,當該任務成功完成時,程序會自動觸發(fā)該 fn 函數(shù)。

在用完一個線程池后,應該調用該線程池的 shutdown() 方法,該方法將啟動線程池的關閉序列。調用 shutdown() 方法后的線程池不再接收新任務,但會將以前所有的已提交任務執(zhí)行完成。當線程池中的所有任務都執(zhí)行完成后,該線程池中的所有線程都會死亡。

使用線程池來執(zhí)行線程任務的步驟如下:

a、調用 ThreadPoolExecutor 類的構造器創(chuàng)建一個線程池。

b、定義一個普通函數(shù)作為線程任務。

c、調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。

d、當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。

下面程序示范了如何使用線程池來執(zhí)行線程任務:

from concurrent.futures import ThreadPoolExecutorimport threadingimport time# 定義一個準備作為線程任務的函數(shù)def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ’ ’ + str(i)) my_sum += i return my_sum# 創(chuàng)建一個包含2條線程的線程池pool = ThreadPoolExecutor(max_workers=2)# 向線程池提交一個task, 50會作為action()函數(shù)的參數(shù)future1 = pool.submit(action, 50)# 向線程池再提交一個task, 100會作為action()函數(shù)的參數(shù)future2 = pool.submit(action, 100)# 判斷future1代表的任務是否結束print(future1.done())time.sleep(3)# 判斷future2代表的任務是否結束print(future2.done())# 查看future1代表的任務返回的結果print(future1.result())# 查看future2代表的任務返回的結果print(future2.result())# 關閉線程池pool.shutdown()

上面程序中,第 13 行代碼創(chuàng)建了一個包含兩個線程的線程池,接下來的兩行代碼只要將 action() 函數(shù)提交(submit)給線程池,該線程池就會負責啟動線程來執(zhí)行 action() 函數(shù)。這種啟動線程的方法既優(yōu)雅,又具有更高的效率。

當程序把 action() 函數(shù)提交給線程池時,submit() 方法會返回該任務所對應的 Future 對象,程序立即判斷 futurel 的 done() 方法,該方法將會返回 False(表明此時該任務還未完成)。接下來主程序暫停 3 秒,然后判斷 future2 的 done() 方法,如果此時該任務已經(jīng)完成,那么該方法將會返回 True。

程序最后通過 Future 的 result() 方法來獲取兩個異步任務返回的結果。

讀者可以自己運行此代碼查看運行結果,這里不再演示。

當程序使用 Future 的 result() 方法來獲取結果時,該方法會阻塞當前線程,如果沒有指定 timeout 參數(shù),當前線程將一直處于阻塞狀態(tài),直到 Future 代表的任務返回。

獲取執(zhí)行結果

前面程序調用了 Future 的 result() 方法來獲取線程任務的運回值,但該方法會阻塞當前主線程,只有等到錢程任務完成后,result() 方法的阻塞才會被解除。

如果程序不希望直接調用 result() 方法阻塞線程,則可通過 Future 的 add_done_callback() 方法來添加回調函數(shù),該回調函數(shù)形如 fn(future)。當線程任務完成后,程序會自動觸發(fā)該回調函數(shù),并將對應的 Future 對象作為參數(shù)傳給該回調函數(shù)。

下面程序使用 add_done_callback() 方法來獲取線程任務的返回值:

from concurrent.futures import ThreadPoolExecutorimport threadingimport time# 定義一個準備作為線程任務的函數(shù)def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ’ ’ + str(i)) my_sum += i return my_sum# 創(chuàng)建一個包含2條線程的線程池with ThreadPoolExecutor(max_workers=2) as pool: # 向線程池提交一個task, 50會作為action()函數(shù)的參數(shù) future1 = pool.submit(action, 50) # 向線程池再提交一個task, 100會作為action()函數(shù)的參數(shù) future2 = pool.submit(action, 100) def get_result(future): print(future.result()) # 為future1添加線程完成的回調函數(shù) future1.add_done_callback(get_result) # 為future2添加線程完成的回調函數(shù) future2.add_done_callback(get_result) print(’--------------’)

上面主程序分別為 future1、future2 添加了同一個回調函數(shù),該回調函數(shù)會在線程任務結束時獲取其返回值。

主程序的最后一行代碼打印了一條橫線。由于程序并未直接調用 future1、future2 的 result() 方法,因此主線程不會被阻塞,可以立即看到輸出主線程打印出的橫線。接下來將會看到兩個新線程并發(fā)執(zhí)行,當線程任務執(zhí)行完成后,get_result() 函數(shù)被觸發(fā),輸出線程任務的返回值。

另外,由于線程池實現(xiàn)了上下文管理協(xié)議(Context Manage Protocol),因此,程序可以使用 with 語句來管理線程池,這樣即可避免手動關閉線程池,如上面的程序所示。

此外,Exectuor 還提供了一個 map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能類似于全局函數(shù) map(),區(qū)別在于線程池的 map() 方法會為 iterables 的每個元素啟動一個線程,以并發(fā)方式來執(zhí)行 func 函數(shù)。這種方式相當于啟動 len(iterables) 個線程,井收集每個線程的執(zhí)行結果。

例如,如下程序使用 Executor 的 map() 方法來啟動線程,并收集線程任務的返回值:

from concurrent.futures import ThreadPoolExecutorimport threadingimport time# 定義一個準備作為線程任務的函數(shù)def action(max): my_sum = 0 for i in range(max): print(threading.current_thread().name + ’ ’ + str(i)) my_sum += i return my_sum# 創(chuàng)建一個包含4條線程的線程池with ThreadPoolExecutor(max_workers=4) as pool: # 使用線程執(zhí)行map計算 # 后面元組有3個元素,因此程序啟動3條線程來執(zhí)行action函數(shù) results = pool.map(action, (50, 100, 150)) print(’--------------’) for r in results:print(r)

上面程序使用 map() 方法來啟動 3 個線程(該程序的線程池包含 4 個線程,如果繼續(xù)使用只包含兩個線程的線程池,此時將有一個任務處于等待狀態(tài),必須等其中一個任務完成,線程空閑出來才會獲得執(zhí)行的機會),map() 方法的返回值將會收集每個線程任務的返回結果。

運行上面程序,同樣可以看到 3 個線程并發(fā)執(zhí)行的結果,最后通過 results 可以看到 3 個線程任務的返回結果。

通過上面程序可以看出,使用 map() 方法來啟動線程,并收集線程的執(zhí)行結果,不僅具有代碼簡單的優(yōu)點,而且雖然程序會以并發(fā)方式來執(zhí)行 action() 函數(shù),但最后收集的 action() 函數(shù)的執(zhí)行結果,依然與傳入?yún)?shù)的結果保持一致。也就是說,上面 results 的第一個元素是 action(50) 的結果,第二個元素是 action(100) 的結果,第三個元素是 action(150) 的結果。

實例擴展:

# coding:utf-8 import Queueimport threadingimport sysimport timeimport math class WorkThread(threading.Thread): def __init__(self, task_queue): threading.Thread.__init__(self) self.setDaemon(True) self.task_queue = task_queue self.start() self.idle = True def run(self): sleep_time = 0.01 # 第1次無任務可做時休息10毫秒 multiply = 0 while True: try:# 從隊列中取一個任務func, args, kwargs = self.task_queue.get(block=False)self.idle = Falsemultiply = 0# 執(zhí)行之func(*args, **kwargs) except Queue.Empty:time.sleep(sleep_time * math.pow(2, multiply))self.idle = Truemultiply += 1continue except:print sys.exc_info()raise class ThreadPool: def __init__(self, thread_num=10, max_queue_len=1000): self.max_queue_len = max_queue_len self.task_queue = Queue.Queue(max_queue_len) # 任務等待隊列 self.threads = [] self.__create_pool(thread_num) def __create_pool(self, thread_num): for i in xrange(thread_num): thread = WorkThread(self.task_queue) self.threads.append(thread) def add_task(self, func, *args, **kwargs): ’’’添加一個任務,返回任務等待隊列的長度 調用該方法前最后先調用isSafe()判斷一下等待的任務是不是很多,以防止提交的任務被拒絕 ’’’ try: self.task_queue.put((func, args, kwargs)) except Queue.Full: raise # 隊列已滿時直接拋出異常,不給執(zhí)行 return self.task_queue.qsize() def isSafe(self): ’’’等待的任務數(shù)量離警界線還比較遠 ’’’ return self.task_queue.qsize() < 0.9 * self.max_queue_len def wait_for_complete(self): ’’’等待提交到線程池的所有任務都執(zhí)行完畢 ’’’ #首先任務等待隊列要變成空 while not self.task_queue.empty(): time.sleep(1) # 其次,所以計算線程要變成idle狀態(tài) while True: all_idle = True for th in self.threads:if not th.idle: all_idle = False break if all_idle:break else:time.sleep(1) if __name__ == ’__main__’: def foo(a, b): print a + b time.sleep(0.01) thread_pool = ThreadPool(10, 100) ’’’在Windows上測試不通過,Windows上Queue.Queue不是線程安全的’’’ size = 0 for i in xrange(10000): try: size = thread_pool.add_task(foo, i, 2 * i) except Queue.Full: print ’queue full, queue size is ’, size time.sleep(2)

到此這篇關于python線程池如何使用的文章就介紹到這了,更多相關python中的線程池詳解內容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持好吧啦網(wǎng)!

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 手机游戏_热门软件app下载_好玩的安卓游戏下载基地-吾爱下载站 | 珠光砂保温板-一体化保温板-有釉面发泡陶瓷保温板-杭州一体化建筑材料 | 浙江华锤电器有限公司_地磅称重设备_防作弊地磅_浙江地磅售后维修_无人值守扫码过磅系统_浙江源头地磅厂家_浙江工厂直营地磅 | 变位机,焊接变位机,焊接变位器,小型变位机,小型焊接变位机-济南上弘机电设备有限公司 | 塑钢件_塑钢门窗配件_塑钢配件厂家-文安县启泰金属制品有限公司 深圳南财多媒体有限公司介绍 | 安徽净化工程设计_无尘净化车间工程_合肥净化实验室_安徽创世环境科技有限公司 | 水成膜泡沫灭火剂_氟蛋白泡沫液_河南新乡骏华消防科技厂家 | 除湿机|工业除湿机|抽湿器|大型地下室车间仓库吊顶防爆除湿机|抽湿烘干房|新风除湿机|调温/降温除湿机|恒温恒湿机|加湿机-杭州川田电器有限公司 | 蒜肠网-动漫,二次元,COSPLAY,漫展以及收藏型模型,手办,玩具的新媒体.(原变形金刚变迷TF圈) | 磁力轮,磁力联轴器,磁齿轮,钕铁硼磁铁-北京磁运达厂家 | 广州展览制作|展台制作工厂|展览设计制作|展览展示制作|搭建制作公司 | 螺旋绞龙叶片,螺旋输送机厂家,山东螺旋输送机-淄博长江机械制造有限公司 | 比亚迪叉车-比亚迪电动叉车堆垛车托盘车仓储叉车价格多少钱报价 磁力去毛刺机_去毛刺磁力抛光机_磁力光饰机_磁力滚抛机_精密金属零件去毛刺机厂家-冠古科技 | 尾轮组_头轮组_矿用刮板_厢式刮板机_铸石刮板机厂家-双驰机械 | 螺纹三通快插接头-弯通快插接头-宁波舜驰气动科技有限公司 | 不锈钢复合板厂家_钛钢复合板批发_铜铝复合板供应-威海泓方金属复合材料股份有限公司 | 浙江华锤电器有限公司_地磅称重设备_防作弊地磅_浙江地磅售后维修_无人值守扫码过磅系统_浙江源头地磅厂家_浙江工厂直营地磅 | 工业淬火油烟净化器,北京油烟净化器厂家,热处理油烟净化器-北京众鑫百科 | 山东限矩型液力偶合器_液力耦合器易熔塞厂家-淄博市汇川源机械厂 | 全自动包装秤_全自动上袋机_全自动套袋机_高位码垛机_全自动包装码垛系统生产线-三维汉界机器(山东)股份有限公司 | 希望影视-高清影视vip热播电影电视剧免费在线抢先看 | 基本型顶空进样器-全自动热脱附解吸仪价格-AutoHS全模式-成都科林分析技术有限公司 | 高温高压釜(氢化反应釜)百科| 高低温万能试验机_拉力试验机_拉伸试验机-馥勒仪器科技(上海)有限公司 | 线材成型机,线材折弯机,线材成型机厂家,贝朗自动化设备有限公司1 | 小型UV打印机-UV平板打印机-大型uv打印机-UV打印机源头厂家 |松普集团 | 酶联免疫分析仪-多管旋涡混合仪|混合器-莱普特科学仪器(北京)有限公司 | 北京普辉律师事务所官网_北京律师24小时免费咨询|法律咨询 | 佛山市德信昌电子有限公司| 康明斯发电机,上柴柴油发电机,玉柴柴油发电机组_海南重康电力官网 | 耐火浇注料-喷涂料-浇注料生产厂家_郑州市元领耐火材料有限公司 耐力板-PC阳光板-PC板-PC耐力板 - 嘉兴赢创实业有限公司 | 冷却塔厂家_冷却塔维修_冷却塔改造_凉水塔配件填料公司- 广东康明节能空调有限公司 | 悬浮拼装地板_幼儿园_篮球场_悬浮拼接地板-山东悬浮拼装地板厂家 | 金刚网,金刚网窗纱,不锈钢网,金刚网厂家- 河北萨邦丝网制品有限公司 | 奶茶加盟,奶茶加盟店连锁品牌-甜啦啦官网 | 珠海网站建设_响应网站建设_珠海建站公司_珠海网站设计与制作_珠海网讯互联 | 医疗仪器模块 健康一体机 多参数监护仪 智慧医疗仪器方案定制 血氧监护 心电监护 -朗锐慧康 | 北京租车公司_汽车/客车/班车/大巴车租赁_商务会议/展会用车/旅游大巴出租_北京桐顺创业租车公司 | 通用磨耗试验机-QUV耐候试验机|久宏实业百科 | 散热器-电子散热器-型材散热器-电源散热片-镇江新区宏图电子散热片厂家 | 超声波清洗机-超声波清洗设备定制生产厂家 - 深圳市冠博科技实业有限公司 |