电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

python線程池 ThreadPoolExecutor 的用法示例

瀏覽:4日期:2022-07-08 16:14:31

前言

從Python3.2開始,標準庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。

相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象,它是一個未來可期的對象,通過它可以獲悉線程的狀態主線程(或進程)中可以獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值:

主線程可以獲取某一個線程(或者任務的)的狀態,以及返回值。當一個線程完成的時候,主線程能夠立即知道。讓多線程和多進程的編碼接口一致。

線程池的基本使用

# coding: utf-8from concurrent.futures import ThreadPoolExecutorimport timedef spider(page): time.sleep(page) print(f'crawl task{page} finished') return pagewith ThreadPoolExecutor(max_workers=5) as t: # 創建一個最大容納數量為5的線程池 task1 = t.submit(spider, 1) task2 = t.submit(spider, 2) # 通過submit提交執行的函數到線程池中 task3 = t.submit(spider, 3) print(f'task1: {task1.done()}') # 通過done來判斷線程是否完成 print(f'task2: {task2.done()}') print(f'task3: {task3.done()}') time.sleep(2.5) print(f'task1: {task1.done()}') print(f'task2: {task2.done()}') print(f'task3: {task3.done()}') print(task1.result()) # 通過result來獲取返回值

執行結果如下:

task1: Falsetask2: Falsetask3: Falsecrawl task1 finishedcrawl task2 finishedtask1: Truetask2: Truetask3: False1crawl task3 finished

1.使用 with 語句 ,通過 ThreadPoolExecutor 構造實例,同時傳入 max_workers 參數來設置線程池中最多能同時運行的線程數目。

2.使用 submit 函數來提交線程需要執行的任務到線程池中,并返回該任務的句柄(類似于文件、畫圖),注意 submit() 不是阻塞的,而是立即返回。

3.通過使用 done() 方法判斷該任務是否結束。上面的例子可以看出,提交任務后立即判斷任務狀態,顯示四個任務都未完成。在延時2.5后,task1 和 task2 執行完畢,task3 仍在執行中。

4.使用 result() 方法可以獲取任務的返回值。

主要方法

wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait 接受三個參數:fs: 表示需要執行的序列timeout: 等待的最大時間,如果超過這個時間即使線程未執行完成也將返回return_when:表示wait返回結果的條件,默認為 ALL_COMPLETED 全部執行完成再返回

還是用上面那個例子來熟悉用法示例:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETEDimport timedef spider(page): time.sleep(page) print(f'crawl task{page} finished') return pagewith ThreadPoolExecutor(max_workers=5) as t: all_task = [t.submit(spider, page) for page in range(1, 5)] wait(all_task, return_when=FIRST_COMPLETED) print(’finished’) print(wait(all_task, timeout=2.5))# 運行結果crawl task1 finishedfinishedcrawl task2 finishedcrawl task3 finishedDoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})crawl task4 finished

1.代碼中返回的條件是:當完成第一個任務的時候,就停止等待,繼續主線程任務

2.由于設置了延時, 可以看到最后只有 task4 還在運行中

as_completed

上面雖然提供了判斷任務是否結束的方法,但是不能在主線程中一直判斷啊。最好的方法是當某個任務結束了,就給主線程返回結果,而不是一直判斷每個任務是否結束。ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法,當子線程中的任務執行完后,直接用 result() 獲取返回結果

用法如下:

# coding: utf-8from concurrent.futures import ThreadPoolExecutor, as_completedimport timedef spider(page): time.sleep(page) print(f'crawl task{page} finished') return pagedef main(): with ThreadPoolExecutor(max_workers=5) as t: obj_list = [] for page in range(1, 5): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(f'main: {data}')# 執行結果crawl task1 finishedmain: 1crawl task2 finishedmain: 2crawl task3 finishedmain: 3crawl task4 finishedmain: 4

as_completed() 方法是一個生成器,在沒有任務完成的時候,會一直阻塞,除非設置了 timeout。

當有某個任務完成的時候,會 yield 這個任務,就能執行 for 循環下面的語句,然后繼續阻塞住,循環到所有的任務結束。同時,先完成的任務會先返回給主線程。

map

map(fn, *iterables, timeout=None)

fn: 第一個參數 fn 是需要線程執行的函數;iterables:第二個參數接受一個可迭代對象;timeout: 第三個參數 timeout 跟 wait() 的 timeout 一樣,但由于 map 是返回線程執行的結果,如果 timeout小于線程執行時間會拋異常 TimeoutError。

用法如下:

import timefrom concurrent.futures import ThreadPoolExecutordef spider(page): time.sleep(page) return pagestart = time.time()executor = ThreadPoolExecutor(max_workers=4)i = 1for result in executor.map(spider, [2, 3, 1, 4]): print('task{}:{}'.format(i, result)) i += 1# 運行結果task1:2task2:3task3:1task4:4

使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數 map 的含義相同,都是將序列中的每個元素都執行同一個函數。

上面的代碼對列表中的每個元素都執行 spider() 函數,并分配各線程池。

可以看到執行結果與上面的 as_completed() 方法的結果不同,輸出順序和列表的順序相同,就算 1s 的任務先執行完成,也會先打印前面提交的任務返回的結果。

多線程實戰

以某網站為例,演示線程池和單線程兩種方式爬取的差異

# coding: utf-8import requestsfrom concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport jsonfrom requests import adaptersfrom proxy import get_proxiesheaders = { 'Host': 'splcgk.court.gov.cn', 'Origin': 'https://splcgk.court.gov.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36', 'Referer': 'https://splcgk.court.gov.cn/gzfwww/ktgg',}url = 'https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1'def spider(page): data = { 'bt': '', 'fydw': '', 'pageNum': page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_datadef main(): with ThreadPoolExecutor(max_workers=10) as t: obj_list = [] begin = time.time() for page in range(1, 15): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(data) print(’*’ * 50) times = time.time() - begin print(times)if __name__ == '__main__': main()

運行結果:

python線程池 ThreadPoolExecutor 的用法示例

單線程實戰

下面我們可以使用單線程來爬取,代碼基本和上面的一樣,加個單線程函數代碼如下:

# coding: utf-8import requestsfrom concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport jsonfrom requests import adaptersfrom proxy import get_proxiesheaders = { 'Host': 'splcgk.court.gov.cn', 'Origin': 'https://splcgk.court.gov.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36', 'Referer': 'https://splcgk.court.gov.cn/gzfwww/ktgg',}url = 'https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1'def spider(page): data = { 'bt': '', 'fydw': '', 'pageNum': page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_datadef single(): begin = time.time() for page in range(1, 15): data = spider(page) print(data) print(’*’ * 50) times = time.time() - begin print(times)if __name__ == '__main__': single()

運行結果:

python線程池 ThreadPoolExecutor 的用法示例

可以看到,總共花了 19 秒。真是肉眼可見的差距啊!如果數據量大的話,運行時間差距會更大!

以上就是python線程池 ThreadPoolExecutor 的用法示例的詳細內容,更多關于python線程池 ThreadPoolExecutor 的用法及實戰的資料請關注好吧啦網其它相關文章!

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 博博会2021_中国博物馆及相关产品与技术博览会【博博会】 | 报警器_家用防盗报警器_烟雾报警器_燃气报警器_防盗报警系统厂家-深圳市刻锐智能科技有限公司 | 双吸泵,双吸泵厂家,OS双吸泵-山东博二泵业有限公司 | 亿诺千企网-企业核心产品贸易 | Trimos测长机_测高仪_TESA_mahr,WYLER水平仪,PWB对刀仪-德瑞华测量技术(苏州)有限公司 | 砍排机-锯骨机-冻肉切丁机-熟肉切片机-预制菜生产线一站式服务厂商 - 广州市祥九瑞盈机械设备有限公司 | 隧道风机_DWEX边墙风机_SDS射流风机-绍兴市上虞科瑞风机有限公司 | 广东恩亿梯电源有限公司【官网】_UPS不间断电源|EPS应急电源|模块化机房|电动汽车充电桩_UPS电源厂家(恩亿梯UPS电源,UPS不间断电源,不间断电源UPS) | 珠光砂保温板-一体化保温板-有釉面发泡陶瓷保温板-杭州一体化建筑材料 | 网带通过式抛丸机,,网带式打砂机,吊钩式,抛丸机,中山抛丸机生产厂家,江门抛丸机,佛山吊钩式,东莞抛丸机,中山市泰达自动化设备有限公司 | 自动螺旋上料机厂家价格-斗式提升机定制-螺杆绞龙输送机-杰凯上料机 | 隔爆型防爆端子分线箱_防爆空气开关箱|依客思 | 硅胶管挤出机厂家_硅胶挤出机生产线_硅胶条挤出机_臣泽智能装备 贵州科比特-防雷公司厂家提供贵州防雷工程,防雷检测,防雷接地,防雷设备价格,防雷产品报价服务-贵州防雷检测公司 | 车辆定位管理系统_汽车GPS系统_车载北斗系统 - 朗致物联 | 篷房|仓储篷房|铝合金篷房|体育篷房|篷房厂家-华烨建筑科技官网 知名电动蝶阀,电动球阀,气动蝶阀,气动球阀生产厂家|价格透明-【固菲阀门官网】 | 中医中药治疗血小板减少-石家庄血液病肿瘤门诊部 | 防潮防水通风密闭门源头实力厂家 - 北京酷思帝克门窗 | 长城人品牌官网| 胶泥瓷砖胶,轻质粉刷石膏,嵌缝石膏厂家,腻子粉批发,永康家德兴,永康市家德兴建材厂 | 线粒体膜电位荧光探针-细胞膜-标记二抗-上海复申生物科技有限公司 | 高柔性拖链电缆_卷筒电缆_耐磨耐折聚氨酯电缆-玖泰特种电缆 | 淘趣英语网 - 在线英语学习,零基础英语学习网站 | 好看的韩国漫画_韩漫在线免费阅读-汗汗漫画| ZHZ8耐压测试仪-上海胜绪电气有限公司| 断桥铝破碎机_铝合金破碎机_废铁金属破碎机-河南鑫世昌机械制造有限公司 | 智能监控-安防监控-监控系统安装-弱电工程公司_成都万全电子 | 意大利Frascold/富士豪压缩机_富士豪半封闭压缩机_富士豪活塞压缩机_富士豪螺杆压缩机 | 球磨机,节能球磨机价格,水泥球磨机厂家,粉煤灰球磨机-吉宏机械制造有限公司 | 电缆故障测试仪_电缆故障定位仪_探测仪_检测仪器_陕西意联电气厂家 | 湖南成人高考报名-湖南成考网 | 超细|超微气流粉碎机|气流磨|气流分级机|粉体改性机|磨粉机|粉碎设备-山东埃尔派粉体科技 | ICP备案查询_APP备案查询_小程序备案查询 - 备案巴巴 | 理化生实验室设备,吊装实验室设备,顶装实验室设备,实验室成套设备厂家,校园功能室设备,智慧书法教室方案 - 东莞市惠森教学设备有限公司 | 电子厂招聘_工厂招聘_普工招聘_小时工招聘信息平台-众立方招工网 | 石家庄律师_石家庄刑事辩护律师_石家庄取保候审-河北万垚律师事务所 | Type-c防水母座|贴片母座|耳机接口|Type-c插座-深圳市步步精科技有限公司 | 【官网】博莱特空压机,永磁变频空压机,螺杆空压机-欧能优 | 华溶溶出仪-Memmert稳定箱-上海协烁仪器科技有限公司 | 合肥卓创建筑装饰,专业办公室装饰、商业空间装修与设计。 | 柔性测斜仪_滑动测斜仪-广州杰芯科技有限公司 | 国产离子色谱仪,红外分光测油仪,自动烟尘烟气测试仪-青岛埃仑通用科技有限公司 |