电脑知识|欧美黑人一区二区三区|软件|欧美黑人一级爽快片淫片高清|系统|欧美黑人狂野猛交老妇|数据库|服务器|编程开发|网络运营|知识问答|技术教程文章 - 好吧啦网

您的位置:首頁技術文章
文章詳情頁

Python 多進程原理及實現

瀏覽:3日期:2022-07-01 15:13:28
1 進程的基本概念

什么是進程?

​ 進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據集、進程控制塊三部分組成。我們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程中所需要使用的資源;進程控制塊用來記錄進程的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標志。

進程的生命周期:創建(New)、就緒(Runnable)、運行(Running)、阻塞(Block)、銷毀(Destroy)

進程的狀態(分類):(Actived)活動進程、可見進程(Visiable)、后臺進程(Background)、服務進程(Service)、空進程

2 父進程和子進程​

Linux 操作系統提供了一個 fork() 函數用來創建子進程,這個函數很特殊,調用一次,返回兩次,因為操作系統是將當前的進程(父進程)復制了一份(子進程),然后分別在父進程和子進程內返回。子進程永遠返回0,而父進程返回子進程的 PID。我們可以通過判斷返回值是不是 0 來判斷當前是在父進程還是子進程中執行。

​ 在 Python 中同樣提供了 fork() 函數,此函數位于 os 模塊下。

# -*- coding: utf-8 -*- import osimport timeprint('在創建子進程前: pid=%s, ppid=%s' % (os.getpid(), os.getppid()))pid = os.fork()if pid == 0: print('子進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) time.sleep(5)else: print('父進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) # pid表示回收的子進程的pid #pid, result = os.wait() # 回收子進程資源阻塞 time.sleep(5) #print('父進程:回收的子進程pid=%d' % pid) #print('父進程:子進程退出時 result=%d' % result)# 下面的內容會被打印兩次,一次是在父進程中,一次是在子進程中。# 父進程中拿到的返回值是創建的子進程的pid,大于0print('fork創建完后: pid=%s, ppid=%s' % (os.getpid(), os.getppid()))2.1 父子進程如何區分?

子進程是父進程通過fork()產生出來的,pid = os.fork()

​ 通過返回值pid是否為0,判斷是否為子進程,如果是0,則表示是子進程

​ 由于 fork() 是 Linux 上的概念,所以如果要跨平臺,最好還是使用 subprocess 模塊來創建子進程。

2.2 子進程如何回收?

python中采用os.wait()方法用來回收子進程占用的資源

pid, result = os.wait() # 回收子進程資源阻塞,等待子進程執行完成回收

如果有子進程沒有被回收的,但是父進程已經死掉了,這個子進程就是僵尸進程。

3 Python進程模塊

python的進程multiprocessing模塊有多種創建進程的方式,每種創建方式和進程資源的回收都不太相同,下面分別針對Process,Pool及系統自帶的fork三種進程分析。

3.1 fork()

import ospid = os.fork() # 創建一個子進程os.wait() # 等待子進程結束釋放資源pid為0的代表子進程。

缺點: ​ 1.兼容性差,只能在類linux系統下使用,windows系統不可使用; ​ 2.擴展性差,當需要多條進程的時候,進程管理變得很復雜; ​ 3.會產生“孤兒”進程和“僵尸”進程,需要手動回收資源。 優點: ​ 是系統自帶的接近低層的創建方式,運行效率高。

3.2 Process進程

multiprocessing模塊提供Process類實現新建進程

# -*- coding: utf-8 -*-import osfrom multiprocessing import Processimport timedef fun(name): print('2 子進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) print('hello ' + name)def test(): print(’ssss’)if __name__ == '__main__': print('1 主進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) ps = Process(target=fun, args=(’jingsanpang’, )) print('111 ##### ps pid: ' + str(ps.pid) + ', ident:' + str(ps.ident)) print('3 進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) print(ps.is_alive()) # 啟動之前 is_alive為False(系統未創建) ps.start() print(ps.is_alive()) # 啟動之后,is_alive為True(系統已創建) print('222 #### ps pid: ' + str(ps.pid) + ', ident:' + str(ps.ident)) print('4 進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) ps.join() # 等待子進程完成任務 類似于os.wait() print(ps.is_alive()) print('5 進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid())) ps.terminate() #終斷進程 print('6 進程信息: pid=%s, ppid=%s' % (os.getpid(), os.getppid()))

特點:

​1.注意:Process對象可以創建進程,但Process對象不是進程,其刪除與否與系統資源是否被回收沒有直接的關系。 2.主進程執行完后會默認等待子進程結束后回收資源,不需要手動回收資源;join()函數用來控制子進程結束的順序,其內部也有一個清除僵尸進程的函數,可以回收資源; 3.Process進程創建時,子進程會將主進程的Process對象完全復制一份,這樣在主進程和子進程各有一個 Process對象,但是p.start()啟動的是子進程,主進程中的Process對象作為一個靜態對象存在,不執行。

4.當子進程執行完畢后,會產生一個僵尸進程,其會被join函數回收,或者再有一條進程開啟,start函數也會回收僵尸進程,所以不一定需要寫join函數。 5.windows系統在子進程結束后會立即自動清除子進程的Process對象,而linux系統子進程的Process對象如果沒有join函數和start函數的話會在主進程結束后統一清除。

另外還可以通過繼承Process對象來重寫run方法創建進程

3.3 進程池POOL (多個進程)

import multiprocessingimport timedef work(msg): mult_proces_name = multiprocessing.current_process().name print(’process: ’ + mult_proces_name + ’-’ + msg)if __name__ == '__main__': pool = multiprocessing.Pool(processes=5) # 創建5個進程 for i in range(20): msg = 'process %d' %(i) pool.apply_async(work, (msg, )) pool.close() # 關閉進程池,表示不能在往進程池中添加進程 pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用 print('Sub-process all done.')

上述代碼中的pool.apply_async()是apply()函數的變體,apply_async()是apply()的并行版本,apply()是apply_async()的阻塞版本,使用apply()主進程會被阻塞直到函數執行結束,所以說是阻塞版本。apply()既是Pool的方法,也是Python內置的函數,兩者等價。可以看到輸出結果并不是按照代碼for循環中的順序輸出的。

多個子進程并返回值

apply_async()本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func中返回一個值,那么pool.apply_async(func, (msg, ))的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)。

import multiprocessingimport timedef func(msg): return multiprocessing.current_process().name + ’-’ + msgif __name__ == '__main__': pool = multiprocessing.Pool(processes=4) # 創建4個進程 results = [] for i in range(20): msg = 'process %d' %(i) results.append(pool.apply_async(func, (msg, ))) pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用 pool.join() # 等待進程池中的所有進程執行完畢 print ('Sub-process(es) done.') for res in results: print (res.get())

與之前的輸出不同,這次的輸出是有序的。

​如果電腦是八核,建立8個進程,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的

4 進程間通信方式

管道pipe:管道是一種半雙工的通信方式,數據只能單向流動,而且只能在具有親緣關系的進程間使用。進程的親緣關系通常是指父子進程關系。命名管道FIFO:有名管道也是半雙工的通信方式,但是它允許無親緣關系進程間的通信。消息隊列MessageQueue:消息隊列是由消息的鏈表,存放在內核中并由消息隊列標識符標識。消息隊列克服了信號傳遞信息少、管道只能承載無格式字節流以及緩沖區大小受限等缺點。共享存儲SharedMemory:共享內存就是映射一段能被其他進程所訪問的內存,這段共享內存由一個進程創建,但多個進程都可以訪問。共享內存是最快的 IPC 方式,它是針對其他進程間通信方式運行效率低而專門設計的。它往往與其他通信機制,如信號兩,配合使用,來實現進程間的同步和通信。以上幾種進程間通信方式中,消息隊列是使用的比較頻繁的方式。

(1)管道pipe

import multiprocessingdef foo(conn): conn.send(’hello father’) #向管道pipe發消息 print(conn.recv())if __name__ == ’__main__’: conn1,conn2=multiprocessing.Pipe(True) #開辟兩個口,都是能進能出,括號中如果False即單向通信 p=multiprocessing.Process(target=foo,args=(conn1,)) #子進程使用sock口,調用foo函數 p.start() print(conn2.recv()) #主進程使用conn口接收,從管道(Pipe)中讀取消息 conn2.send(’hi son’) #主進程使用conn口發送

(2)消息隊列Queue

Queue是多進程的安全隊列,可以使用Queue實現多進程之間的數據傳遞。

Queue的一些常用方法:

Queue.qsize():返回當前隊列包含的消息數量; Queue.empty():如果隊列為空,返回True,反之False ; Queue.full():如果隊列滿了,返回True,反之False; Queue.get():獲取隊列中的一條消息,然后將其從列隊中移除,可傳參超時時長。 Queue.get_nowait():相當Queue.get(False),取不到值時觸發異常:Empty; Queue.put():將一個值添加進數列,可傳參超時時長。 Queue.put_nowait():相當于Queue.get(False),當隊列滿了時報錯:Full。

案例:

from multiprocessing import Process, Queueimport timedef write(q): for i in [’A’, ’B’, ’C’, ’D’, ’E’]: print(’Put %s to queue’ % i) q.put(i) time.sleep(0.5)def read(q): while True: v = q.get(True) print(’get %s from queue’ % v)if __name__ == ’__main__’: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) print(’write process = ’, pw) print(’read process = ’, pr) pw.start() pr.start() pw.join() pr.join() pr.terminate() pw.terminate()

Queue和pipe只是實現了數據交互,并沒實現數據共享,即一個進程去更改另一個進程的數據。

注:進程間通信應該盡量避免使用共享數據的方式

5 多進程實現生產者消費者

以下通過多進程實現生產者,消費者模式

import multiprocessingfrom multiprocessing import Processfrom time import sleepimport timeclass MultiProcessProducer(multiprocessing.Process): def __init__(self, num, queue): '''Constructor''' multiprocessing.Process.__init__(self) self.num = num self.queue = queue def run(self): t1 = time.time() print(’producer start ’ + str(self.num)) for i in range(1000): self.queue.put((i, self.num)) # print ’producer put’, i, self.num t2 = time.time() print(’producer exit ’ + str(self.num)) use_time = str(t2 - t1) print(’producer ’ + str(self.num) + ’, use_time: ’+ use_time)class MultiProcessConsumer(multiprocessing.Process): def __init__(self, num, queue): '''Constructor''' multiprocessing.Process.__init__(self) self.num = num self.queue = queue def run(self): t1 = time.time() print(’consumer start ’ + str(self.num)) while True: d = self.queue.get() if d != None: # print ’consumer get’, d, self.num continue else: break t2 = time.time() print(’consumer exit ’ + str(self.num)) print(’consumer ’ + str(self.num) + ’, use time:’ + str(t2 - t1))def main(): # create queue queue = multiprocessing.Queue() # create processes producer = [] for i in range(5): producer.append(MultiProcessProducer(i, queue)) consumer = [] for i in range(5): consumer.append(MultiProcessConsumer(i, queue)) # start processes for i in range(len(producer)): producer[i].start() for i in range(len(consumer)): consumer[i].start() # wait for processs to exit for i in range(len(producer)): producer[i].join() for i in range(len(consumer)): queue.put(None) for i in range(len(consumer)): consumer[i].join() print(’all done finish’)if __name__ == '__main__': main()6 總結

​ python中的多進程創建有以下兩種方式:

(1)fork子進程

(2)采用 multiprocessing 這個庫創建子進程

​ 需要注意的是隊列中queue.Queue是線程安全的,但并不是進程安全,所以多進程一般使用線程、進程安全的multiprocessing.Queue()

​ 另外, 進程池使用 multiprocessing.Pool實現,pool = multiprocessing.Pool(processes = 3),產生一個進程池,pool.apply_async實現非租塞模式,pool.apply實現阻塞模式。

apply_async和 apply函數,前者是非阻塞的,后者是阻塞。可以看出運行時間相差的倍數正是進程池數量。

​ 同時可以通過result.append(pool.apply_async(func, (msg, )))獲取非租塞式調用結果信息的。

以上就是Python 多進程原理及實現的詳細內容,更多關于python 多進程的資料請關注好吧啦網其它相關文章!

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 吸音板,隔音板,吸音材料,吸音板价格,声学材料 - 佛山诺声吸音板厂家 | 自动焊锡机_点胶机_螺丝机-锐驰机器人 | 【甲方装饰】合肥工装公司-合肥装修设计公司,专业从事安徽办公室、店面、售楼部、餐饮店、厂房装修设计服务 | 等离子表面处理机-等离子表面活化机-真空等离子清洗机-深圳市东信高科自动化设备有限公司 | 阿米巴企业经营-阿米巴咨询管理-阿米巴企业培训-广东键锋企业管理咨询有限公司 | 袋式过滤器,自清洗过滤器,保安过滤器,篮式过滤器,气体过滤器,全自动过滤器,反冲洗过滤器,管道过滤器,无锡驰业环保科技有限公司 | 磁力反应釜,高压釜,实验室反应釜,高温高压反应釜-威海自控反应釜有限公司 | 广东高华家具-公寓床|学生宿舍双层铁床厂家【质保十年】 | 防腐木批发价格_深圳_惠州_东莞防腐木厂家_森源(深圳)防腐木有限公司 | 小型单室真空包装机,食品单室真空包装机-百科| VI设计-LOGO设计公司-品牌设计公司-包装设计公司-导视设计-杭州易象设计 | 三效蒸发器_多效蒸发器价格_四效三效蒸发器厂家-青岛康景辉 | 芜湖厨房设备_芜湖商用厨具_芜湖厨具设备-芜湖鑫环厨具有限公司 控显科技 - 工控一体机、工业显示器、工业平板电脑源头厂家 | 美甲贴片-指甲贴片-穿戴美甲-假指甲厂家--薇丝黛拉 | 南京和瑞包装有限公司| 小威小说网 - 新小威小说网 - 小威小说网小说搜索引擎 | 玻璃钢罐_玻璃钢储罐_盐酸罐厂家-河北华盛节能设备有限公司 | 密集柜_档案密集柜_智能密集架_密集柜厂家_密集架价格-智英伟业 密集架-密集柜厂家-智能档案密集架-自动选层柜订做-河北风顺金属制品有限公司 | SMC-ASCO-CKD气缸-FESTO-MAC电磁阀-上海天筹自动化设备官网 | 机房监控|动环监控|动力环境监控系统方案产品定制厂家 - 迈世OMARA | 压力控制器,差压控制器,温度控制器,防爆压力控制器,防爆温度控制器,防爆差压控制器-常州天利智能控制股份有限公司 | 安徽控制器-合肥船用空调控制器-合肥家电控制器-合肥迅驰电子厂 安徽净化板_合肥岩棉板厂家_玻镁板厂家_安徽科艺美洁净科技有限公司 | 不锈钢发酵罐_水果酒发酵罐_谷物发酵罐_山东誉诚不锈钢制品有限公司 | 南京办公用品网-办公文具用品批发-打印机耗材采购 | TTCMS自助建站_网站建设_自助建站_免费网站_免费建站_天天向上旗下品牌 | 新车测评网_网罗汽车评测资讯_汽车评测门户报道 | 交通气象站_能见度检测仪_路面状况监测站- 天合环境科技 | 恒温恒湿试验箱厂家-高低温试验箱维修价格_东莞环仪仪器_东莞环仪仪器 | 光谱仪_积分球_分布光度计_灯具检测生产厂家_杭州松朗光电【官网】 | 深圳南财多媒体有限公司介绍 | 断桥铝破碎机_铝合金破碎机_废铁金属破碎机-河南鑫世昌机械制造有限公司 | 阴离子_阳离子聚丙烯酰胺厂家_聚合氯化铝价格_水处理絮凝剂_巩义市江源净水材料有限公司 | 真空粉体取样阀,电动楔式闸阀,电动针型阀-耐苛尔(上海)自动化仪表有限公司 | 代做标书-代写标书-专业标书文件编辑-「深圳卓越创兴公司」 | 北京宣传片拍摄_产品宣传片拍摄_宣传片制作公司-现像传媒 | 北京网站建设首页,做网站选【优站网】,专注北京网站建设,北京网站推广,天津网站建设,天津网站推广,小程序,手机APP的开发。 | 合肥活动房_安徽活动板房_集成打包箱房厂家-安徽玉强钢结构集成房屋有限公司 | 东莞猎头公司_深圳猎头公司_广州猎头公司-广东万诚猎头提供企业中高端人才招聘服务 | 英思科GTD-3000EX(美国英思科气体检测仪MX4MX6)百科-北京嘉华众信科技有限公司 | 刚性-柔性防水套管-橡胶伸缩接头-波纹管补偿器-启腾供水材料有限公司 | 转子泵_凸轮泵_凸轮转子泵厂家-青岛罗德通用机械设备有限公司 |