Python gevent协程切换实现详解
一、背景
大家都知道gevent的机制是单线程+协程机制,当遇到可能会阻塞的操作时,就切换到可运行的协程中继续运行,以此来实现提交系统运行效率的目标,但是具体是怎么实现的呢?让我们直接从代码中看一下吧。
二、切换机制
让我们从socket的send、recv方法入手:
defrecv(self,*args): while1: try: returnself._sock.recv(*args) excepterrorasex: ifex.args[0]!=EWOULDBLOCKorself.timeout==0.0: raise #QQQwithoutclearingexc_infotest__refcount.test_clean_exitfails sys.exc_clear() self._wait(self._read_event)
这里会开启一个死循环,在循环中调用self._sock.recv()方法,并捕获异常,当错误是EWOULDBLOCK时,则调用self._wait(self._read_event)方法,该方法其实是:_wait=_wait_on_socket,_wait_on_socket方法的定义在文件:_hub_primitives.py中,如下:
#Suitabletobeboundasaninstancemethod defwait_on_socket(socket,watcher,timeout_exc=None): ifsocketisNoneorwatcherisNone: #test__hubTestCloseSocketWhilePolling,onPython2;Python3 #catchestheEBADFdifferently. raiseConcurrentObjectUseError("Thesockethasalreadybeenclosedbyanothergreenlet") _primitive_wait(watcher,socket.timeout, timeout_exciftimeout_excisnotNoneelse_NONE, socket.hub)
该方法其实是调用了函数:_primitive_wait(),其仍然在文件:_hub_primitives.py中定义,如下:
def_primitive_wait(watcher,timeout,timeout_exc,hub): ifwatcher.callbackisnotNone: raiseConcurrentObjectUseError('Thissocketisalreadyusedbyanothergreenlet:%r' %(watcher.callback,)) ifhubisNone: hub=get_hub() iftimeoutisNone: hub.wait(watcher) return timeout=Timeout._start_new_or_dummy( timeout, (timeout_exc iftimeout_excisnot_NONEortimeoutisNone else_timeout_error('timedout'))) withtimeout: hub.wait(watcher)
这里其实是调用了hub.wait()函数,该函数的定义在文件_hub.py中,如下:
classWaitOperationsGreenlet(SwitchOutGreenletWithLoop):#pylint:disable=undefined-variable defwait(self,watcher): """ Waituntilthe*watcher*(whichmustnotbestarted)isready. Thecurrentgreenletwillbeunscheduledduringthistime. """ waiter=Waiter(self)#pylint:disable=undefined-variable watcher.start(waiter.switch,waiter) try: result=waiter.get() ifresultisnotwaiter: raiseInvalidSwitchError( 'Invalidswitchinto%s:got%r(expected%r;waitingon%rwith%r)'%( getcurrent(),#pylint:disable=undefined-variable result, waiter, self, watcher ) ) finally: watcher.stop()
watcher.stop()
该类WaitOperationsGreenlet是Hub的基类,其方法wait中的逻辑是:生成一个Waiter对象,并调用watcher.start(waiter.switch,waiter)方法,watcher是最开始recv方法中使用的self._read_event,watcher是gevent的底层事件框架libev中的概念;同时还有一个waiter对象,它类似与python中的future概念,该对象有一个switch()方法以及get()方法,当没有得到结果没有准备好时,调用waiter.get()方法回导致协程被挂起;get()函数的定义如下:
defget(self): """Ifavalue/anexceptionisstored,return/raiseit.Otherwiseuntilswitch()orthrow()iscalled.""" ifself._exceptionisnot_NONE: ifself._exceptionisNone: returnself.value getcurrent().throw(*self._exception)#pylint:disable=undefined-variable else: ifself.greenletisnotNone: raiseConcurrentObjectUseError('ThisWaiterisalreadyusedby%r'%(self.greenlet,)) self.greenlet=getcurrent()#pylint:disable=undefined-variable try: returnself.hub.switch() finally: self.greenlet=None
在get()中最关键的是self.hub.switch()函数,该函数将执行权转移到hub,并继续运行,至此已经分析完了当在worker协程中从网络获取数据遇到阻塞时,如何避免阻塞并切换到hub中的实现,至于何时再切换会worker协程,我们后续再继续分析。
总结
要记得gevent中一个重要的概念,协程切换不是调用而是执行权的转移,从可能会阻塞的协程切换到hub,并由hub在合适的时机切换到另一个可以继续运行的协程继续执行;gevent通过这种形式实现了提高io密集型应用吞吐率的目标。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。