Python实现进程同步和通信的方法
Python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
引例:
如之前创建多进程的例子
#-*-coding:utf-8-*- frommultiprocessingimportProcess,Pool importos,time defrun_proc(name):##定义一个函数用于进程调用 foriinrange(5): time.sleep(0.2)#休眠0.2秒 print'Runchildprocess%s(%s)'%(name,os.getpid()) #执行一次该函数共需1秒的时间 if__name__=='__main__':#执行主进程 print'Runthemainprocess(%s).'%(os.getpid()) mainStart=time.time()#记录主进程开始的时间 p=Pool(8)#开辟进程池 foriinrange(16):#开辟14个进程 p.apply_async(run_proc,args=('Process'+str(i),))#每个进程都调用run_proc函数, #args表示给该函数传递的参数。 print'Waitingforallsubprocessesdone...' p.close()#关闭进程池 p.join()#等待开辟的所有进程执行完后,主进程才继续往下执行 print'Allsubprocessesdone' mainEnd=time.time()#记录主进程结束时间 print'Allprocessran%0.2fseconds.'%(mainEnd-mainStart)#主进程执行时间
运行结果:
Runthemainprocess(36652). Waitingforallsubprocessesdone… RunchildprocessProcess0(36708)RunchildprocessProcess1(36748) RunchildprocessProcess3(36736) RunchildprocessProcess2(36716) RunchildprocessProcess4(36768)
如第3行的输出,偶尔会出现这样不如意的输入格式,为什么呢?
原因是多个进程争用打印输出资源的结果。前一个进程为来得急输出换行符,该资源就切换给了另一个进程使用,致使两个进程输出在同一行上,而前一个进程的换行符在下一次获得资源时才打印输出。
Lock
为了避免这种情况,需在进程进入临界区(使进程进入临界资源的那段代码,称为临界区)时加锁。
可以向如下这样添加锁后看看执行效果:
#-*-coding:utf-8-*- lock=Lock()#申明一个全局的lock对象 defrun_proc(name): globallock#引用全局锁 foriinrange(5): time.sleep(0.2) lock.acquire()#申请锁 print'Runchildprocess%s(%s)'%(name,os.getpid()) lock.release()#释放锁
Semaphore
Semaphore为信号量机制。当共享的资源拥有多个时,可用Semaphore来实现进程同步。其用法和Lock差不多,s=Semaphore(N),每执行一次s.acquire(),该资源的可用个数将减少1,当资源个数已为0时,就进入阻塞;每执行一次s.release(),占用的资源被释放,该资源的可用个数增加1。
多进程的通信(信息交互)
不同进程之间进行数据交互,可能不少刚开始接触多进程的同学会想到共享全局变量的方式,这样通过向全局变量写入和读取信息便能实现信息交互。但是很遗憾,并不能这样实现。
下面通过例子,加深对那篇文章的理解:
#-*-coding:utf-8-*- frommultiprocessingimportProcess,Pool importos importtime L1=[1,2,3] defadd(a,b): globalL1 L1+=range(a,b) printL1 if__name__=='__main__': p1=Process(target=add,args=(20,30)) p2=Process(target=add,args=(30,40)) p1.start() p2.start() p1.join() p2.join() printL1
输出结果:
[1,2,3,20,21,22,23,24,25,26,27,28,29]
[1,2,3,30,31,32,33,34,35,36,37,38,39]
[1,2,3]
该程序的原本目的是想将两个子进程生成的列表加到全局变量L1中,但用该方法并不能达到想要的效果。既然不能通过全局变量来实现不同进程间的信息交互,那有什么办法呢。
mutiprocessing为我们可以通过Queue和Pipe来实现进程间的通信。
Queue
按上面的例子通过Queue来实现:
#-*-coding:utf-8-*- frommultiprocessingimportProcess,Queue,Lock L=[1,2,3] defadd(q,lock,a,b): lock.acquire()#加锁避免写入时出现不可预知的错误 L1=range(a,b) lock.release() q.put(L1) printL1 if__name__=='__main__': q=Queue() lock=Lock() p1=Process(target=add,args=(q,lock,20,30)) p2=Process(target=add,args=(q,lock,30,40)) p1.start() p2.start() p1.join() p2.join() L+=q.get()+q.get() printL
执行结果:
[20,21,22,23,24,25,26,27,28,29]
[30,31,32,33,34,35,36,37,38,39]
[1,2,3,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39]
下面介绍Queue的常用方法:
- 定义时可用q=Queue(maxsize=10)来指定队列的长度,默认时或maxsize值小于1时队列为无限长度。
- q.put(item)方法向队列放入元素,其还有一个可选参数block,默认为True,此时若队列已满则会阻塞等待,直到有空闲位置。而当black值为False,在该情况下就会抛出Full异常
- Queue是不可迭代的对象,不能通过for循环取值,取值时每次调用q.get()方法。同样也有可选参数block,默认为True,若此时队列为空则会阻塞等待。而black值为False时,在该情况下就会抛出Empty异常
- Queue.qsize()返回队列的大小
- Queue.empty()如果队列为空,返回True,反之False
- Queue.full()如果队列满了,返回True,反之False
- Queue.get([block[,timeout]])获取队列,timeout等待时间Queue.get_nowait()相当Queue.get(False)非阻塞Queue.put(item)写入队列,timeout等待时间
- Queue.put_nowait(item)相当Queue.put(item,False)
Pipe
Pipe管道,可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道(默认为双向)。双向Pipe允许两端的进即可以发送又可以接受;单向的Pipe只允许前面的端口用于接收,后面的端口用于发送。
下面给出例子:
#-*-coding:utf-8-*- frommultiprocessingimportProcess,Pipe defproc1(pipe): s='Hello,Thisisproc1' pipe.send(s) defproc2(pipe): whileTrue: print"proc2recieve:",pipe.recv() if__name__=="__main__": pipe=Pipe() p1=Process(target=proc1,args=(pipe[0],)) p2=Process(target=proc2,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join(2)#限制执行时间最多为2秒 print'\nendallprocesses.'
执行结果如下:
proc2recieve:Hello,Thisisproc1
proc2recieve:
endallprocesses.
当第二行输出后,因为管道中没有数据传来,Proc2处于阻塞状态,2秒后被强制结束。
以下是单向管道的例子,注意pipe[0],pipe[1]的分配。
#-*-coding:utf-8-*- frommultiprocessingimportProcess,Pipe defproc1(pipe): s='Hello,Thisisproc1' pipe.send(s) defproc2(pipe): whileTrue: print"proc2recieve:",pipe.recv() if__name__=="__main__": pipe=Pipe(duplex=False) p1=Process(target=proc1,args=(pipe[1],))#pipe[1]为发送端 p2=Process(target=proc2,args=(pipe[0],))#pipe[0]为接收端 p1.start() p2.start() p1.join() p2.join(2)#限制执行时间最多为2秒 print'\nendallprocesses.'
执行结果同上。
强大的Manage
Queue和Pipe实现的数据共享方式只支持两种结构Value和Array。Python中提供了强大的Manage专门用来做数据共享,其支持的类型非常多,包括:Value,Array,list,dict,Queue,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event等
其用法如下:
frommultiprocessingimportProcess,Manager deffunc(dt,lt): foriinrange(10): key='arg'+str(i) dt[key]=i*i lt+=range(11,16) if__name__=="__main__": manager=Manager() dt=manager.dict() lt=manager.list() p=Process(target=func,args=(dt,lt)) p.start() p.join() printdt,'\n',lt
执行结果:
{‘arg8':64,‘arg9':81,‘arg0':0,‘arg1':1,‘arg2':4,‘arg3':9,‘arg4':16,‘arg5':25,‘arg6':36,‘arg7':49}
[11,12,13,14,15]
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。