Python如何实现线程间通信
问题
你的程序中有多个线程,你需要在这些线程之间安全地交换信息或数据
解决方案
从一个线程向另一个线程发送数据最安全的方式可能就是使用queue库中的队列了。创建一个被多个线程共享的Queue对象,这些线程通过使用put()和get()操作来向队列中添加或者删除元素。例如:
fromqueueimportQueue fromthreadingimportThread #Athreadthatproducesdata defproducer(out_q): whileTrue: #Producesomedata ... out_q.put(data) #Athreadthatconsumesdata defconsumer(in_q): whileTrue: #Getsomedata data=in_q.get() #Processthedata ... #Createthesharedqueueandlaunchboththreads q=Queue() t1=Thread(target=consumer,args=(q,)) t2=Thread(target=producer,args=(q,)) t1.start() t2.start()
Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。例如:
fromqueueimportQueue fromthreadingimportThread #Objectthatsignalsshutdown _sentinel=object() #Athreadthatproducesdata defproducer(out_q): whilerunning: #Producesomedata ... out_q.put(data) #Putthesentinelonthequeuetoindicatecompletion out_q.put(_sentinel) #Athreadthatconsumesdata defconsumer(in_q): whileTrue: #Getsomedata data=in_q.get() #Checkfortermination ifdatais_sentinel: in_q.put(_sentinel) break #Processthedata ...
本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用Condition变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列
importheapq importthreading classPriorityQueue: def__init__(self): self._queue=[] self._count=0 self._cv=threading.Condition() defput(self,item,priority): withself._cv: heapq.heappush(self._queue,(-priority,self._count,item)) self._count+=1 self._cv.notify() defget(self): withself._cv: whilelen(self._queue)==0: self._cv.wait() returnheapq.heappop(self._queue)[-1]
使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的task_done()和join():
fromqueueimportQueue fromthreadingimportThread #Athreadthatproducesdata defproducer(out_q): whilerunning: #Producesomedata ... out_q.put(data) #Athreadthatconsumesdata defconsumer(in_q): whileTrue: #Getsomedata data=in_q.get() #Processthedata ... #Indicatecompletion in_q.task_done() #Createthesharedqueueandlaunchboththreads q=Queue() t1=Thread(target=consumer,args=(q,)) t2=Thread(target=producer,args=(q,)) t1.start() t2.start() #Waitforallproduceditemstobeconsumed q.join()
如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知,你可以把要发送的数据和一个Event放到一起使用,这样“生产者”就可以通过这个Event对象来监测处理的过程了。示例如下:
fromqueueimportQueue fromthreadingimportThread,Event #Athreadthatproducesdata defproducer(out_q): whilerunning: #Producesomedata ... #Makean(data,event)pairandhandittotheconsumer evt=Event() out_q.put((data,evt)) ... #Waitfortheconsumertoprocesstheitem evt.wait() #Athreadthatconsumesdata defconsumer(in_q): whileTrue: #Getsomedata data,evt=in_q.get() #Processthedata ... #Indicatecompletion evt.set()
讨论
基于简单队列编写多线程程序在多数情况下是一个比较明智的选择。从线程安全队列的底层实现来看,你无需在你的代码中使用锁和其他底层的同步机制,这些只会把你的程序弄得乱七八糟。此外,使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴,比如,你可以把你的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。例如:
fromqueueimportQueue fromthreadingimportThread importcopy #Athreadthatproducesdata defproducer(out_q): whileTrue: #Producesomedata ... out_q.put(copy.deepcopy(data)) #Athreadthatconsumesdata defconsumer(in_q): whileTrue: #Getsomedata data=in_q.get() #Processthedata ...
Queue对象提供一些在当前上下文很有用的附加特性。比如在创建Queue对象时提供可选的size参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者”“消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。get()和put()方法都支持非阻塞方式和设定超时,例如:
importqueue q=queue.Queue() try: data=q.get(block=False) exceptqueue.Empty: ... try: q.put(item,block=False) exceptqueue.Full: ... try: data=q.get(timeout=5.0) exceptqueue.Empty: ...
这些操作都可以用来避免当执行某些特定队列操作时发生无限阻塞的情况,比如,一个非阻塞的put()方法和一个固定大小的队列一起使用,这样当队列已满时就可以执行不同的代码。比如输出一条日志信息并丢弃。
defproducer(q): ... try: q.put(item,block=False) exceptqueue.Full: log.warning('queueditem%rdiscarded!',item)
如果你试图让消费者线程在执行像q.get()这样的操作时,超时自动终止以便检查终止标志,你应该使用q.get()的可选参数timeout,如下:
_running=True defconsumer(q): while_running: try: item=q.get(timeout=5.0) #Processitem ... exceptqueue.Empty: pass
最后,有q.qsize(),q.full(),q.empty()等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用empty()判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。
以上就是Python如何实现线程间通信的详细内容,更多关于Python线程间通信的资料请关注毛票票其它相关文章!