Python3多进程 multiprocessing 模块实例详解
本文实例讲述了Python3多进程multiprocessing模块。分享给大家供大家参考,具体如下:
多进程Multiprocessing模块
multiprocessing模块官方说明文档
Process类
Process类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成Process示例的创建。
star()方法启动进程,
join()方法实现进程间的同步,等待所有进程退出。
close()用来阻止多余的进程涌入进程池Pool造成进程阻塞。
multiprocessing.Process(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)
target是函数名字,需要调用的函数
args函数需要的参数,以tuple的形式传入
示例:
importmultiprocessing importos defrun_proc(name): print('Childprocess{0}{1}Running'.format(name,os.getpid())) if__name__=='__main__': print('Parentprocess{0}isRunning'.format(os.getpid())) foriinrange(5): p=multiprocessing.Process(target=run_proc,args=(str(i),)) print('processstart') p.start() p.join() print('Processclose')
结果:
Parentprocess809isRunning
processstart
processstart
processstart
processstart
processstart
Childprocess0810Running
Childprocess1811Running
Childprocess2812Running
Childprocess3813Running
Childprocess4814Running
Processclose
Pool
Pool可以提供指定数量的进程供用户使用,默认是CPU核数。当有新的请求提交到Poll的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。
-Pool对象调用join方法会等待所有的子进程执行完毕
-调用join方法之前,必须调用close
-调用close之后就不能继续添加新的Process了
pool.apply_async
apply_async方法用来同步执行进程,允许多个进程同时进入池子。
importmultiprocessing importos importtime defrun_task(name): print('Task{0}pid{1}isrunning,parentidis{2}'.format(name,os.getpid(),os.getppid())) time.sleep(1) print('Task{0}end.'.format(name)) if__name__=='__main__': print('currentprocess{0}'.format(os.getpid())) p=multiprocessing.Pool(processes=3) foriinrange(6): p.apply_async(run_task,args=(i,)) print('Waitingforallsubprocessesdone...') p.close() p.join() print('Allprocessesdone!')
结果:
currentprocess921
Waitingforallsubprocessesdone...
Task0pid922isrunning,parentidis921
Task1pid923isrunning,parentidis921
Task2pid924isrunning,parentidis921
Task0end.
Task3pid922isrunning,parentidis921
Task1end.
Task4pid923isrunning,parentidis921
Task2end.
Task5pid924isrunning,parentidis921
Task3end.
Task4end.
Task5end.
Allprocessesdone!
pool.apply
apply(func[,args[,kwds]])
该方法只能允许一个进程进入池子,在一个进程结束之后,另外一个进程才可以进入池子。
importmultiprocessing importos importtime defrun_task(name): print('Task{0}pid{1}isrunning,parentidis{2}'.format(name,os.getpid(),os.getppid())) time.sleep(1) print('Task{0}end.'.format(name)) if__name__=='__main__': print('currentprocess{0}'.format(os.getpid())) p=multiprocessing.Pool(processes=3) foriinrange(6): p.apply(run_task,args=(i,)) print('Waitingforallsubprocessesdone...') p.close() p.join() print('Allprocessesdone!')
结果:
Task0pid928isrunning,parentidis927
Task0end.
Task1pid929isrunning,parentidis927
Task1end.
Task2pid930isrunning,parentidis927
Task2end.
Task3pid928isrunning,parentidis927
Task3end.
Task4pid929isrunning,parentidis927
Task4end.
Task5pid930isrunning,parentidis927
Task5end.
Waitingforallsubprocessesdone...
Allprocessesdone!
Queue进程间通信
Queue用来在多个进程间通信。Queue有两个方法,get和put。
put方法
Put方法用来插入数据到队列中,有两个可选参数,blocked和timeout。
-blocked=True(默认值),timeout为正
该方法会阻塞timeout指定的时间,直到该队列有剩余空间。如果超时,抛出Queue.Full异常。
blocked=False
如果Queue已满,立刻抛出Queue.Full异常
get方法
get方法用来从队列中读取并删除一个元素。有两个参数可选,blocked和timeout
-blocked=False(默认),timeout正值
等待时间内,没有取到任何元素,会抛出Queue.Empty异常。
blocked=True
Queue有一个值可用,立刻返回改值;Queue没有任何元素,
frommultiprocessingimportProcess,Queue importos,time,random #写数据进程执行的代码: defproc_write(q,urls): print('Process(%s)iswriting...'%os.getpid()) forurlinurls: q.put(url) print('Put%stoqueue...'%url) time.sleep(random.random()) #读数据进程执行的代码: defproc_read(q): print('Process(%s)isreading...'%os.getpid()) whileTrue: url=q.get(True) print('Get%sfromqueue.'%url) if__name__=='__main__': #父进程创建Queue,并传给各个子进程: q=Queue() proc_writer1=Process(target=proc_write,args=(q,['url_1','url_2','url_3'])) proc_writer2=Process(target=proc_write,args=(q,['url_4','url_5','url_6'])) proc_reader=Process(target=proc_read,args=(q,)) #启动子进程proc_writer,写入: proc_writer1.start() proc_writer2.start() #启动子进程proc_reader,读取: proc_reader.start() #等待proc_writer结束: proc_writer1.join() proc_writer2.join() #proc_reader进程里是死循环,无法等待其结束,只能强行终止: proc_reader.terminate()
结果:
Process(1083)iswriting...
Puturl_1toqueue...
Process(1084)iswriting...
Puturl_4toqueue...
Process(1085)isreading...
Geturl_1fromqueue.
Geturl_4fromqueue.
Puturl_5toqueue...
Geturl_5fromqueue.
Puturl_2toqueue...
Geturl_2fromqueue.
Puturl_6toqueue...
Geturl_6fromqueue.
Puturl_3toqueue...
Geturl_3fromqueue.
Pipe进程间通信
常用来在两个进程间通信,两个进程分别位于管道的两端。
multiprocessing.Pipe([duplex])
示例一和示例二,也是网上找的别人的例子,尝试理解并增加了注释而已。网上的例子,大多是例子一和例子二在一起的,这里分开来看,比较容易理解。
示例一:
frommultiprocessingimportProcess,Pipe defsend(pipe): pipe.send(['spam']+[42,'egg'])#send传输一个列表 pipe.close() if__name__=='__main__': (con1,con2)=Pipe()#创建两个Pipe实例 sender=Process(target=send,args=(con1,))#函数的参数,args一定是实例化之后的Pip变量,不能直接写args=(Pip(),) sender.start()#Process类启动进程 print("con2got:%s"%con2.recv())#管道的另一端con2从send收到消息 con2.close()#关闭管道
结果:
con2got:['spam',42,'egg']
示例二:
frommultiprocessingimportProcess,Pipe deftalk(pipe): pipe.send(dict(name='Bob',spam=42))#传输一个字典 reply=pipe.recv()#接收传输的数据 print('talkergot:',reply) if__name__=='__main__': (parentEnd,childEnd)=Pipe()#创建两个Pipe()实例,也可以改成conf1,conf2 child=Process(target=talk,args=(childEnd,))#创建一个Process进程,名称为child child.start()#启动进程 print('parentgot:',parentEnd.recv())#parentEnd是一个Pip()管道,可以接收childProcess进程传输的数据 parentEnd.send({x*2forxin'spam'})#parentEnd是一个Pip()管道,可以使用send方法来传输数据 child.join()#传输的数据被talk函数内的pip管道接收,并赋值给reply print('parentexit')
结果:
parentgot:{'name':'Bob','spam':42}
talkergot:{'ss','aa','pp','mm'}
parentexit
更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《PythonSocket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》
希望本文所述对大家Python程序设计有所帮助。