Android6.0 消息机制原理解析
消息都是存放在一个消息队列中去,而消息循环线程就是围绕这个消息队列进入一个无限循环的,直到线程退出。如果队列中有消息,消息循环线程就会把它取出来,并分发给相应的Handler进行处理;如果队列中没有消息,消息循环线程就会进入空闲等待状态,等待下一个消息的到来。在编写Android应用程序时,当程序执行的任务比较繁重时,为了不阻塞UI主线程而导致ANR的发生,我们通常的做法的创建一个子线程来完成特定的任务。在创建子线程时,有两种选择,一种通过创建Thread对象来创建一个无消息循环的子线程;还有一种就是创建一个带有消息循环的子线程,而创建带有消息循环的子线程由于两种实现方法,一种是直接利用Android给我们封装好的HandlerThread类来直接生成一个带有消息循环的线程对象,另一种方法是在实现线程的run()方法内使用以下方式启动一个消息循环:
一、消息机制使用
通常消息都是有一个消息线程和一个Handler组成,下面我们看PowerManagerService中的一个消息Handler:
mHandlerThread=newServiceThread(TAG, Process.THREAD_PRIORITY_DISPLAY,false/*allowIo*/); mHandlerThread.start(); mHandler=newPowerManagerHandler(mHandlerThread.getLooper());
这里的ServiceThread就是一个HandlerThread,创建Handler的时候,必须把HandlerThread的looper传进去,否则就是默认当前线程的looper。
而每个handler,大致如下:
privatefinalclassPowerManagerHandlerextendsHandler{ publicPowerManagerHandler(Looperlooper){ super(looper,null,true/*async*/); } @Override publicvoidhandleMessage(Messagemsg){ switch(msg.what){ caseMSG_USER_ACTIVITY_TIMEOUT: handleUserActivityTimeout(); break; caseMSG_SANDMAN: handleSandman(); break; caseMSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT: handleScreenBrightnessBoostTimeout(); break; caseMSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT: checkWakeLockAquireTooLong(); Messagem=mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); m.setAsynchronous(true); mHandler.sendMessageDelayed(m,WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT); break; } } }
二、消息机制原理
那我们先来看下HandlerThread的主函数run函数:
publicvoidrun(){ mTid=Process.myTid(); Looper.prepare(); synchronized(this){ mLooper=Looper.myLooper();//赋值后notifyall,主要是getLooper函数返回的是mLooper notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid=-1; }
再来看看Lopper的prepare函数,最后新建了一个Looper对象,并且放在线程的局部变量中。
publicstaticvoidprepare(){ prepare(true); } privatestaticvoidprepare(booleanquitAllowed){ if(sThreadLocal.get()!=null){ thrownewRuntimeException("OnlyoneLoopermaybecreatedperthread"); } sThreadLocal.set(newLooper(quitAllowed)); }
Looper的构造函数中创建了MessageQueue
privateLooper(booleanquitAllowed){ mQueue=newMessageQueue(quitAllowed); mThread=Thread.currentThread(); }
我们再来看下MessageQueue的构造函数,其中nativeInit是一个native方法,并且把返回值保存在mPtr显然是用long型变量保存的指针
MessageQueue(booleanquitAllowed){ mQuitAllowed=quitAllowed; mPtr=nativeInit(); }
native函数中主要创建了NativeMessageQueue对象,并且把指针变量返回了。
staticjlongandroid_os_MessageQueue_nativeInit(JNIEnv*env,jclassclazz){ NativeMessageQueue*nativeMessageQueue=newNativeMessageQueue(); if(!nativeMessageQueue){ jniThrowRuntimeException(env,"Unabletoallocatenativequeue"); return0; } nativeMessageQueue->incStrong(env); returnreinterpret_cast<jlong>(nativeMessageQueue); }
NativeMessageQueue构造函数就是获取mLooper,如果没有就是新建一个Looper
NativeMessageQueue::NativeMessageQueue(): mPollEnv(NULL),mPollObj(NULL),mExceptionObj(NULL){ mLooper=Looper::getForThread(); if(mLooper==NULL){ mLooper=newLooper(false); Looper::setForThread(mLooper); } }
然后我们再看下Looper的构造函数,显示调用了eventfd创建了一个fd,eventfd它的主要是用于进程或者线程间的通信,我们可以看下这篇博客eventfd介绍
Looper::Looper(boolallowNonCallbacks): mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false), mPolling(false),mEpollFd(-1),mEpollRebuildRequired(false), mNextRequestSeq(0),mResponseIndex(0),mNextMessageUptime(LLONG_MAX){ mWakeEventFd=eventfd(0,EFD_NONBLOCK); LOG_ALWAYS_FATAL_IF(mWakeEventFd<0,"Couldnotmakewakeeventfd.errno=%d",errno); AutoMutex_l(mLock); rebuildEpollLocked(); }
2.1c层创建epoll
我们再来看下rebuildEpollLocked函数,创建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll
voidLooper::rebuildEpollLocked(){ //Closeoldepollinstanceifwehaveone. if(mEpollFd>=0){ #ifDEBUG_CALLBACKS ALOGD("%p~rebuildEpollLocked-rebuildingepollset",this); #endif close(mEpollFd); } //Allocatethenewepollinstanceandregisterthewakepipe. mEpollFd=epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd<0,"Couldnotcreateepollinstance.errno=%d",errno); structepoll_eventeventItem; memset(&eventItem,0,sizeof(epoll_event));//zerooutunusedmembersofdatafieldunion eventItem.events=EPOLLIN; eventItem.data.fd=mWakeEventFd; intresult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeEventFd,&eventItem); LOG_ALWAYS_FATAL_IF(result!=0,"Couldnotaddwakeeventfdtoepollinstance.errno=%d", errno); for(size_ti=0;i<mRequests.size();i++){ constRequest&request=mRequests.valueAt(i); structepoll_eventeventItem; request.initEventItem(&eventItem); intepollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,request.fd,&eventItem); if(epollResult<0){ ALOGE("Erroraddingepolleventsforfd%dwhilerebuildingepollset,errno=%d", request.fd,errno); } } }
继续回到HandlerThread的run函数,我们继续分析Looper的loop函数
publicvoidrun(){ mTid=Process.myTid(); Looper.prepare(); synchronized(this){ mLooper=Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid=-1; }
我们看看Looper的loop函数:
publicstaticvoidloop(){ finalLooperme=myLooper(); if(me==null){ thrownewRuntimeException("NoLooper;Looper.prepare()wasn'tcalledonthisthread."); } finalMessageQueuequeue=me.mQueue;//得到Looper的mQueue //Makesuretheidentityofthisthreadisthatofthelocalprocess, //andkeeptrackofwhatthatidentitytokenactuallyis. Binder.clearCallingIdentity(); finallongident=Binder.clearCallingIdentity(); for(;;){ Messagemsg=queue.next();//mightblock这个函数会阻塞,阻塞主要是epoll_wait if(msg==null){ //Nomessageindicatesthatthemessagequeueisquitting. return; } //Thismustbeinalocalvariable,incaseaUIeventsetsthelogger Printerlogging=me.mLogging;//自己打的打印 if(logging!=null){ logging.println(">>>>>Dispatchingto"+msg.target+""+ msg.callback+":"+msg.what); } msg.target.dispatchMessage(msg); if(logging!=null){ logging.println("<<<<<Finishedto"+msg.target+""+msg.callback); } //Makesurethatduringthecourseofdispatchingthe //identityofthethreadwasn'tcorrupted. finallongnewIdent=Binder.clearCallingIdentity(); if(ident!=newIdent){ Log.wtf(TAG,"Threadidentitychangedfrom0x" +Long.toHexString(ident)+"to0x" +Long.toHexString(newIdent)+"whiledispatchingto" +msg.target.getClass().getName()+"" +msg.callback+"what="+msg.what); } msg.recycleUnchecked(); } }
MessageQueue类的next函数主要是调用了nativePollOnce函数,后面就是从消息队列中取出一个Message
Messagenext(){ //Returnhereifthemessageloophasalreadyquitandbeendisposed. //Thiscanhappeniftheapplicationtriestorestartalooperafterquit //whichisnotsupported. finallongptr=mPtr;//之前保留的指针 if(ptr==0){ returnnull; } intpendingIdleHandlerCount=-1;//-1onlyduringfirstiteration intnextPollTimeoutMillis=0; for(;;){ if(nextPollTimeoutMillis!=0){ Binder.flushPendingCommands(); } nativePollOnce(ptr,nextPollTimeoutMillis);
下面我们主要看下nativePollOnce这个native函数,把之前的指针强制转换成NativeMessageQueue,然后调用其pollOnce函数
staticvoidandroid_os_MessageQueue_nativePollOnce(JNIEnv*env,jobjectobj, jlongptr,jinttimeoutMillis){ NativeMessageQueue*nativeMessageQueue=reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env,obj,timeoutMillis); }
2.2c层epoll_wait阻塞
pollOnce函数,这个函数前面的while一般都没有只是处理了indent大于0的情况,这种情况一般没有,所以我们可以直接看pollInner函数
intLooper::pollOnce(inttimeoutMillis,int*outFd,int*outEvents,void**outData){ intresult=0; for(;;){ while(mResponseIndex<mResponses.size()){ constResponse&response=mResponses.itemAt(mResponseIndex++); intident=response.request.ident; if(ident>=0){ intfd=response.request.fd; intevents=response.events; void*data=response.request.data; #ifDEBUG_POLL_AND_WAKE ALOGD("%p~pollOnce-returningsignalledidentifier%d:" "fd=%d,events=0x%x,data=%p", this,ident,fd,events,data); #endif if(outFd!=NULL)*outFd=fd; if(outEvents!=NULL)*outEvents=events; if(outData!=NULL)*outData=data; returnident; } } if(result!=0){ #ifDEBUG_POLL_AND_WAKE ALOGD("%p~pollOnce-returningresult%d",this,result); #endif if(outFd!=NULL)*outFd=0; if(outEvents!=NULL)*outEvents=0; if(outData!=NULL)*outData=NULL; returnresult; } result=pollInner(timeoutMillis); } }
pollInner函数主要就是调用epoll_wait阻塞,并且java层会计算每次阻塞的时间传到c层,等待有mWakeEventFd或者之前addFd的fd有事件过来,才会epoll_wait返回。
intLooper::pollInner(inttimeoutMillis){ #ifDEBUG_POLL_AND_WAKE ALOGD("%p~pollOnce-waiting:timeoutMillis=%d",this,timeoutMillis); #endif //Adjustthetimeoutbasedonwhenthenextmessageisdue. if(timeoutMillis!=0&&mNextMessageUptime!=LLONG_MAX){ nsecs_tnow=systemTime(SYSTEM_TIME_MONOTONIC); intmessageTimeoutMillis=toMillisecondTimeoutDelay(now,mNextMessageUptime); if(messageTimeoutMillis>=0 &&(timeoutMillis<0||messageTimeoutMillis<timeoutMillis)){ timeoutMillis=messageTimeoutMillis; } #ifDEBUG_POLL_AND_WAKE ALOGD("%p~pollOnce-nextmessagein%"PRId64"ns,adjustedtimeout:timeoutMillis=%d", this,mNextMessageUptime-now,timeoutMillis); #endif } //Poll. intresult=POLL_WAKE; mResponses.clear();//清空mResponses mResponseIndex=0; //Weareabouttoidle. mPolling=true; structepoll_eventeventItems[EPOLL_MAX_EVENTS]; inteventCount=epoll_wait(mEpollFd,eventItems,EPOLL_MAX_EVENTS,timeoutMillis);//epoll_wait主要线程阻塞在这,这个阻塞的时间也是有java层传过来的 //Nolongeridling. mPolling=false; //Acquirelock. mLock.lock(); //Rebuildepollsetifneeded. if(mEpollRebuildRequired){ mEpollRebuildRequired=false; rebuildEpollLocked(); gotoDone; } //Checkforpollerror. if(eventCount<0){ if(errno==EINTR){ gotoDone; } ALOGW("Pollfailedwithanunexpectederror,errno=%d",errno); result=POLL_ERROR; gotoDone; } //Checkforpolltimeout. if(eventCount==0){ #ifDEBUG_POLL_AND_WAKE ALOGD("%p~pollOnce-timeout",this); #endif result=POLL_TIMEOUT; gotoDone; } //Handleallevents. #ifDEBUG_POLL_AND_WAKE ALOGD("%p~pollOnce-handlingeventsfrom%dfds",this,eventCount); #endif for(inti=0;i<eventCount;i++){ intfd=eventItems[i].data.fd; uint32_tepollEvents=eventItems[i].events; if(fd==mWakeEventFd){//通知唤醒线程的事件 if(epollEvents&EPOLLIN){ awoken(); }else{ ALOGW("Ignoringunexpectedepollevents0x%xonwakeeventfd.",epollEvents); } }else{ ssize_trequestIndex=mRequests.indexOfKey(fd);//之前addFd的事件 if(requestIndex>=0){ intevents=0; if(epollEvents&EPOLLIN)events|=EVENT_INPUT; if(epollEvents&EPOLLOUT)events|=EVENT_OUTPUT; if(epollEvents&EPOLLERR)events|=EVENT_ERROR; if(epollEvents&EPOLLHUP)events|=EVENT_HANGUP; pushResponse(events,mRequests.valueAt(requestIndex));//放在mResponses中 }else{ ALOGW("Ignoringunexpectedepollevents0x%xonfd%dthatis" "nolongerregistered.",epollEvents,fd); } } } Done:; //Invokependingmessagecallbacks. mNextMessageUptime=LLONG_MAX; while(mMessageEnvelopes.size()!=0){//这块主要是c层的消息,java层的消息是自己管理的 nsecs_tnow=systemTime(SYSTEM_TIME_MONOTONIC); constMessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0); if(messageEnvelope.uptime<=now){ //Removetheenvelopefromthelist. //WekeepastrongreferencetothehandleruntilthecalltohandleMessage //finishes.Thenwedropitsothatthehandlercanbedeleted*before* //wereacquireourlock. {//obtainhandler sp<MessageHandler>handler=messageEnvelope.handler; Messagemessage=messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage=true; mLock.unlock(); #ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS ALOGD("%p~pollOnce-sendingmessage:handler=%p,what=%d", this,handler.get(),message.what); #endif handler->handleMessage(message); }//releasehandler mLock.lock(); mSendingMessage=false; result=POLL_CALLBACK; }else{ //Thelastmessageleftattheheadofthequeuedeterminesthenextwakeuptime. mNextMessageUptime=messageEnvelope.uptime; break; } } //Releaselock. mLock.unlock(); //Invokeallresponsecallbacks. for(size_ti=0;i<mResponses.size();i++){//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调 Response&response=mResponses.editItemAt(i); if(response.request.ident==POLL_CALLBACK){ intfd=response.request.fd; intevents=response.events; void*data=response.request.data; #ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS ALOGD("%p~pollOnce-invokingfdeventcallback%p:fd=%d,events=0x%x,data=%p", this,response.request.callback.get(),fd,events,data); #endif //Invokethecallback.Notethatthefiledescriptormaybeclosedby //thecallback(andpotentiallyevenreused)beforethefunctionreturnsso //weneedtobealittlecarefulwhenremovingthefiledescriptorafterwards. intcallbackResult=response.request.callback->handleEvent(fd,events,data); if(callbackResult==0){ removeFd(fd,response.request.seq); } //Clearthecallbackreferenceintheresponsestructurepromptlybecausewe //willnotcleartheresponsevectoritselfuntilthenextpoll. response.request.callback.clear(); result=POLL_CALLBACK; } } returnresult; }
继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息
for(;;){ Messagemsg=queue.next();//mightblock if(msg==null){ //Nomessageindicatesthatthemessagequeueisquitting. return; } //Thismustbeinalocalvariable,incaseaUIeventsetsthelogger Printerlogging=me.mLogging;//自己的打印 if(logging!=null){ logging.println(">>>>>Dispatchingto"+msg.target+""+ msg.callback+":"+msg.what); } msg.target.dispatchMessage(msg); if(logging!=null){ logging.println("<<<<<Finishedto"+msg.target+""+msg.callback); } //Makesurethatduringthecourseofdispatchingthe //identityofthethreadwasn'tcorrupted. finallongnewIdent=Binder.clearCallingIdentity(); if(ident!=newIdent){ Log.wtf(TAG,"Threadidentitychangedfrom0x" +Long.toHexString(ident)+"to0x" +Long.toHexString(newIdent)+"whiledispatchingto" +msg.target.getClass().getName()+"" +msg.callback+"what="+msg.what); } msg.recycleUnchecked(); } }
2.3增加调试打印
我们先来看自己添加打印,可以通过Lopper的setMessageLogging函数来打印
publicvoidsetMessageLogging(@NullablePrinterprinter){ mLogging=printer; } Printer就是一个interface publicinterfacePrinter{ /** *Writealineoftexttotheoutput.Thereisnoneedtoterminate *thegivenstringwithanewline. */ voidprintln(Stringx); }
2.4java层消息分发处理
再来看消息的分发,先是调用Handler的obtainMessage函数
Messagemsg=mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg,WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
先看obtainMessage调用了Message的obtain函数
publicfinalMessageobtainMessage(intwhat) { returnMessage.obtain(this,what); }
Message的obtain函数就是新建一个Message,然后其target就是设置成其Handler
publicstaticMessageobtain(Handlerh,intwhat){ Messagem=obtain();//就是新建一个Message m.target=h; m.what=what; returnm; }
我们再联系之前分发消息
msg.target.dispatchMessage(msg);最后就是调用Handler的dispatchMessage函数,最后在Handler中,最后会根据不同的情况对消息进行处理。
publicvoiddispatchMessage(Messagemsg){ if(msg.callback!=null){ handleCallback(msg);//这种就是用post形式发送,带Runnable的 }else{ if(mCallback!=null){//这种是handler传参的时候就是传入了mCallback回调了 if(mCallback.handleMessage(msg)){ return; } } handleMessage(msg);//最后就是在自己实现的handleMessage处理 } }
2.3java层消息发送
我们再看下java层的消息发送,主要也是调用Handler的sendMessagepost之类函数,最终都会调用下面这个函数
publicbooleansendMessageAtTime(Messagemsg,longuptimeMillis){ MessageQueuequeue=mQueue; if(queue==null){ RuntimeExceptione=newRuntimeException( this+"sendMessageAtTime()calledwithnomQueue"); Log.w("Looper",e.getMessage(),e); returnfalse; } returnenqueueMessage(queue,msg,uptimeMillis); }
我们再来看java层发送消息最终都会调用enqueueMessage函数
privatebooleanenqueueMessage(MessageQueuequeue,Messagemsg,longuptimeMillis){ msg.target=this; if(mAsynchronous){ msg.setAsynchronous(true); } returnqueue.enqueueMessage(msg,uptimeMillis); }
最终在enqueueMessage中,把消息加入消息队列,然后需要的话就调用c层的nativeWake函数
booleanenqueueMessage(Messagemsg,longwhen){ if(msg.target==null){ thrownewIllegalArgumentException("Messagemusthaveatarget."); } if(msg.isInUse()){ thrownewIllegalStateException(msg+"Thismessageisalreadyinuse."); } synchronized(this){ if(mQuitting){ IllegalStateExceptione=newIllegalStateException( msg.target+"sendingmessagetoaHandleronadeadthread"); Log.w(TAG,e.getMessage(),e); msg.recycle(); returnfalse; } msg.markInUse(); msg.when=when; Messagep=mMessages; booleanneedWake; if(p==null||when==0||when<p.when){ //Newhead,wakeuptheeventqueueifblocked. msg.next=p; mMessages=msg; needWake=mBlocked; }else{ //Insertedwithinthemiddleofthequeue.Usuallywedon'thavetowake //uptheeventqueueunlessthereisabarrierattheheadofthequeue //andthemessageistheearliestasynchronousmessageinthequeue. needWake=mBlocked&&p.target==null&&msg.isAsynchronous(); Messageprev; for(;;){ prev=p; p=p.next; if(p==null||when<p.when){ break; } if(needWake&&p.isAsynchronous()){ needWake=false; } } msg.next=p;//invariant:p==prev.next prev.next=msg; } //WecanassumemPtr!=0becausemQuittingisfalse. if(needWake){ nativeWake(mPtr); } } returntrue; }
我们看下这个native方法,最后也是调用了Looper的wake函数
staticvoidandroid_os_MessageQueue_nativeWake(JNIEnv*env,jclassclazz,jlongptr){ NativeMessageQueue*nativeMessageQueue=reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } voidNativeMessageQueue::wake(){ mLooper->wake(); }
Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了继续先发送c层消息,然后处理之前addFd的事件,然后处理java层的消息。
voidLooper::wake(){ #ifDEBUG_POLL_AND_WAKE ALOGD("%p~wake",this); #endif uint64_tinc=1; ssize_tnWrite=TEMP_FAILURE_RETRY(write(mWakeEventFd,&inc,sizeof(uint64_t))); if(nWrite!=sizeof(uint64_t)){ if(errno!=EAGAIN){ ALOGW("Couldnotwritewakesignal,errno=%d",errno); } } }
2.4c层发送消息
在c层也是可以发送消息的,主要是调用Looper的sendMessageAtTime函数,参数有有一个handler是一个回调,我们把消息放在mMessageEnvelopes中。
voidLooper::sendMessageAtTime(nsecs_tuptime,constsp<MessageHandler>&handler, constMessage&message){ #ifDEBUG_CALLBACKS ALOGD("%p~sendMessageAtTime-uptime=%"PRId64",handler=%p,what=%d", this,uptime,handler.get(),message.what); #endif size_ti=0; {//acquirelock AutoMutex_l(mLock); size_tmessageCount=mMessageEnvelopes.size(); while(i<messageCount&&uptime>=mMessageEnvelopes.itemAt(i).uptime){ i+=1; } MessageEnvelopemessageEnvelope(uptime,handler,message); mMessageEnvelopes.insertAt(messageEnvelope,i,1); //Optimization:IftheLooperiscurrentlysendingamessage,thenwecanskip //thecalltowake()becausethenextthingtheLooperwilldoafterprocessing //messagesistodecidewhenthenextwakeuptimeshouldbe.Infact,itdoes //notevenmatterwhetherthiscodeisrunningontheLooperthread. if(mSendingMessage){ return; } }//releaselock //Wakethepolllooponlywhenweenqueueanewmessageatthehead. if(i==0){ wake(); } }
当在pollOnce中,在epoll_wait之后,会遍历mMessageEnvelopes中的消息,然后调用其handler的handleMessage函数
while(mMessageEnvelopes.size()!=0){ nsecs_tnow=systemTime(SYSTEM_TIME_MONOTONIC); constMessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0); if(messageEnvelope.uptime<=now){ //Removetheenvelopefromthelist. //WekeepastrongreferencetothehandleruntilthecalltohandleMessage //finishes.Thenwedropitsothatthehandlercanbedeleted*before* //wereacquireourlock. {//obtainhandler sp<MessageHandler>handler=messageEnvelope.handler; Messagemessage=messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage=true; mLock.unlock(); #ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS ALOGD("%p~pollOnce-sendingmessage:handler=%p,what=%d", this,handler.get(),message.what); #endif handler->handleMessage(message); }//releasehandler mLock.lock(); mSendingMessage=false; result=POLL_CALLBACK; }else{ //Thelastmessageleftattheheadofthequeuedeterminesthenextwakeuptime. mNextMessageUptime=messageEnvelope.uptime; break; } }
有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下
sp<StubMessageHandler>handler=newStubMessageHandler(); mLooper->sendMessageAtTime(now+ms2ns(100),handler,Message(MSG_TEST1)); StubMessageHandler继承MessageHandler就必须实现handleMessage方法 classStubMessageHandler:publicMessageHandler{ public: Vector<Message>messages; virtualvoidhandleMessage(constMessage&message){ messages.push(message); } };
我们再顺便看下Message和MessageHandler类
structMessage{ Message():what(0){} Message(intwhat):what(what){} /*Themessagetype.(interpretationisleftuptothehandler)*/ intwhat; }; /** *InterfaceforaLoopermessagehandler. * *TheLooperholdsastrongreferencetothemessagehandlerwheneverithas *amessagetodelivertoit.MakesuretocallLooper::removeMessages *toremoveanypendingmessagesdestinedforthehandlersothatthehandler *canbedestroyed. */ classMessageHandler:publicvirtualRefBase{ protected: virtual~MessageHandler(){} public: /** *Handlesamessage. */ virtualvoidhandleMessage(constMessage&message)=0; };
2.5c层addFd
我们也可以在Looper.cpp的addFd中增加fd放入线程epoll中,当fd有数据来我们也可以处理相应的数据,下面我们先来看下addFd函数,我们注意其中有一个callBack回调
intLooper::addFd(intfd,intident,intevents,Looper_callbackFunccallback,void*data){ returnaddFd(fd,ident,events,callback?newSimpleLooperCallback(callback):NULL,data); } intLooper::addFd(intfd,intident,intevents,constsp<LooperCallback>&callback,void*data){ #ifDEBUG_CALLBACKS ALOGD("%p~addFd-fd=%d,ident=%d,events=0x%x,callback=%p,data=%p",this,fd,ident, events,callback.get(),data); #endif if(!callback.get()){ if(!mAllowNonCallbacks){ ALOGE("InvalidattempttosetNULLcallbackbutnotallowedforthislooper."); return-1; } if(ident<0){ ALOGE("InvalidattempttosetNULLcallbackwithident<0."); return-1; } }else{ ident=POLL_CALLBACK; } {//acquirelock AutoMutex_l(mLock); Requestrequest; request.fd=fd; request.ident=ident; request.events=events; request.seq=mNextRequestSeq++; request.callback=callback; request.data=data; if(mNextRequestSeq==-1)mNextRequestSeq=0;//reservesequencenumber-1 structepoll_eventeventItem; request.initEventItem(&eventItem); ssize_trequestIndex=mRequests.indexOfKey(fd); if(requestIndex<0){ intepollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,fd,&eventItem);//加入epoll if(epollResult<0){ ALOGE("Erroraddingepolleventsforfd%d,errno=%d",fd,errno); return-1; } mRequests.add(fd,request);//放入mRequests中 }else{ intepollResult=epoll_ctl(mEpollFd,EPOLL_CTL_MOD,fd,&eventItem);//更新 if(epollResult<0){ if(errno==ENOENT){ //TolerateENOENTbecauseitmeansthatanolderfiledescriptorwas //closedbeforeitscallbackwasunregisteredandmeanwhileanew //filedescriptorwiththesamenumberhasbeencreatedandisnow //beingregisteredforthefirsttime.Thiserrormayoccurnaturally //whenacallbackhastheside-effectofclosingthefiledescriptor //beforereturningandunregisteringitself.Callbacksequencenumber //checksfurtherensurethattheraceisbenign. // //Unfortunatelyduetokernellimitationsweneedtorebuildtheepoll //setfromscratchbecauseitmaycontainanoldfilehandlethatweare //nowunabletoremovesinceitsfiledescriptorisnolongervalid. //Nosuchproblemwouldhaveoccurredifwewereusingthepollsystem //callinstead,butthatapproachcarriesothersdisadvantages. #ifDEBUG_CALLBACKS ALOGD("%p~addFd-EPOLL_CTL_MODfailedduetofiledescriptor" "beingrecycled,fallingbackonEPOLL_CTL_ADD,errno=%d", this,errno); #endif epollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,fd,&eventItem); if(epollResult<0){ ALOGE("Errormodifyingoraddingepolleventsforfd%d,errno=%d", fd,errno); return-1; } scheduleEpollRebuildLocked(); }else{ ALOGE("Errormodifyingepolleventsforfd%d,errno=%d",fd,errno); return-1; } } mRequests.replaceValueAt(requestIndex,request); } }//releaselock return1; }
在pollOnce函数中,我们先寻找mRequests中匹配的fd,然后在pushResponse中新建一个Response,然后把Response和Request匹配起来。
}else{ ssize_trequestIndex=mRequests.indexOfKey(fd); if(requestIndex>=0){ intevents=0; if(epollEvents&EPOLLIN)events|=EVENT_INPUT; if(epollEvents&EPOLLOUT)events|=EVENT_OUTPUT; if(epollEvents&EPOLLERR)events|=EVENT_ERROR; if(epollEvents&EPOLLHUP)events|=EVENT_HANGUP; pushResponse(events,mRequests.valueAt(requestIndex)); }else{ ALOGW("Ignoringunexpectedepollevents0x%xonfd%dthatis" "nolongerregistered.",epollEvents,fd); } }
下面我们就会遍历mResponses中的Response,然后调用其request中的回调
for(size_ti=0;i<mResponses.size();i++){ Response&response=mResponses.editItemAt(i); if(response.request.ident==POLL_CALLBACK){ intfd=response.request.fd; intevents=response.events; void*data=response.request.data; #ifDEBUG_POLL_AND_WAKE||DEBUG_CALLBACKS ALOGD("%p~pollOnce-invokingfdeventcallback%p:fd=%d,events=0x%x,data=%p", this,response.request.callback.get(),fd,events,data); #endif //Invokethecallback.Notethatthefiledescriptormaybeclosedby //thecallback(andpotentiallyevenreused)beforethefunctionreturnsso //weneedtobealittlecarefulwhenremovingthefiledescriptorafterwards. intcallbackResult=response.request.callback->handleEvent(fd,events,data); if(callbackResult==0){ removeFd(fd,response.request.seq); } //Clearthecallbackreferenceintheresponsestructurepromptlybecausewe //willnotcleartheresponsevectoritselfuntilthenextpoll. response.request.callback.clear(); result=POLL_CALLBACK; } }
同样我们再来看看Looper_test.cpp是如何使用的?
Pipepipe; StubCallbackHandlerhandler(true); handler.setCallback(mLooper,pipe.receiveFd,Looper::EVENT_INPUT);
我们看下handler的setCallback函数
classCallbackHandler{ public: voidsetCallback(constsp<Looper>&looper,intfd,intevents){ looper->addFd(fd,0,events,staticHandler,this);//就是调用了looper的addFd函数,并且回调 } protected: virtual~CallbackHandler(){} virtualinthandler(intfd,intevents)=0; private: staticintstaticHandler(intfd,intevents,void*data){//这个就是回调函数 returnstatic_cast<CallbackHandler*>(data)->handler(fd,events); } }; classStubCallbackHandler:publicCallbackHandler{ public: intnextResult; intcallbackCount; intfd; intevents; StubCallbackHandler(intnextResult):nextResult(nextResult), callbackCount(0),fd(-1),events(-1){ } protected: virtualinthandler(intfd,intevents){//这个是通过回调函数再调到这里的 callbackCount+=1; this->fd=fd; this->events=events; returnnextResult; } };
我们结合Looper的addFd一起来看,当callback是有的,我们新建一个SimpleLooperCallback
intLooper::addFd(intfd,intident,intevents,Looper_callbackFunccallback,void*data){ returnaddFd(fd,ident,events,callback?newSimpleLooperCallback(callback):NULL,data); }
这里的Looper_callbackFunc是一个typedef
typedefint(*Looper_callbackFunc)(intfd,intevents,void*data);
我们再来看SimpleLooperCallback
classSimpleLooperCallback:publicLooperCallback{ protected: virtual~SimpleLooperCallback(); public: SimpleLooperCallback(Looper_callbackFunccallback); virtualinthandleEvent(intfd,intevents,void*data); private: Looper_callbackFuncmCallback; };SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunccallback): mCallback(callback){ } SimpleLooperCallback::~SimpleLooperCallback(){ } intSimpleLooperCallback::handleEvent(intfd,intevents,void*data){ returnmCallback(fd,events,data); }
最后我们是调用callback->handleEvent(fd,events,data),而callback就是SimpleLooperCallback,这里的data,之前传进来的就是CallbackHandler的this指针
因此最后就是调用了staticHandler,而data->handler,就是this->handler,最后是虚函数就调用到了StubCallbackHandler的handler函数中了。
当然我们也可以不用这么复杂,直接使用第二个addFd函数,当然callBack我们需要自己定义一个类来实现LooperCallBack类就行了,这样就简单多了。
intaddFd(intfd,intident,intevents,constsp<LooperCallback>&callback,void*data);
2.6java层addFd
一直以为只能在c层的Looper中才能addFd,原来在java层也通过jni做了这个功能。
我们可以在MessageQueue中的addOnFileDescriptorEventListener来实现这个功能
publicvoidaddOnFileDescriptorEventListener(@NonNullFileDescriptorfd, @OnFileDescriptorEventListener.Eventsintevents, @NonNullOnFileDescriptorEventListenerlistener){ if(fd==null){ thrownewIllegalArgumentException("fdmustnotbenull"); } if(listener==null){ thrownewIllegalArgumentException("listenermustnotbenull"); } synchronized(this){ updateOnFileDescriptorEventListenerLocked(fd,events,listener); } }
我们再来看看OnFileDescriptorEventListener这个回调
publicinterfaceOnFileDescriptorEventListener{ publicstaticfinalintEVENT_INPUT=1<<0; publicstaticfinalintEVENT_OUTPUT=1<<1; publicstaticfinalintEVENT_ERROR=1<<2; /**@hide*/ @Retention(RetentionPolicy.SOURCE) @IntDef(flag=true,value={EVENT_INPUT,EVENT_OUTPUT,EVENT_ERROR}) public@interfaceEvents{} @EventsintonFileDescriptorEvents(@NonNullFileDescriptorfd,@Eventsintevents); }
接着调用了updateOnFileDescriptorEventListenerLocked函数
privatevoidupdateOnFileDescriptorEventListenerLocked(FileDescriptorfd,intevents, OnFileDescriptorEventListenerlistener){ finalintfdNum=fd.getInt$(); intindex=-1; FileDescriptorRecordrecord=null; if(mFileDescriptorRecords!=null){ index=mFileDescriptorRecords.indexOfKey(fdNum); if(index>=0){ record=mFileDescriptorRecords.valueAt(index); if(record!=null&&record.mEvents==events){ return; } } } if(events!=0){ events|=OnFileDescriptorEventListener.EVENT_ERROR; if(record==null){ if(mFileDescriptorRecords==null){ mFileDescriptorRecords=newSparseArray<FileDescriptorRecord>(); } record=newFileDescriptorRecord(fd,events,listener);//fd保存在FileDescriptorRecord对象 mFileDescriptorRecords.put(fdNum,record);//mFileDescriptorRecords然后保存在 }else{ record.mListener=listener; record.mEvents=events; record.mSeq+=1; } nativeSetFileDescriptorEvents(mPtr,fdNum,events);//调用native函数 }elseif(record!=null){ record.mEvents=0; mFileDescriptorRecords.removeAt(index); } }
native最后调用了NativeMessageQueue的setFileDescriptorEvents函数
staticvoidandroid_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv*env,jclassclazz, jlongptr,jintfd,jintevents){ NativeMessageQueue*nativeMessageQueue=reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->setFileDescriptorEvents(fd,events); }
setFileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback
voidNativeMessageQueue::setFileDescriptorEvents(intfd,intevents){ if(events){ intlooperEvents=0; if(events&CALLBACK_EVENT_INPUT){ looperEvents|=Looper::EVENT_INPUT; } if(events&CALLBACK_EVENT_OUTPUT){ looperEvents|=Looper::EVENT_OUTPUT; } mLooper->addFd(fd,Looper::POLL_CALLBACK,looperEvents,this, reinterpret_cast<void*>(events)); }else{ mLooper->removeFd(fd); } }
果然是,需要实现handleEvent函数
classNativeMessageQueue:publicMessageQueue,publicLooperCallback{ public: NativeMessageQueue(); virtual~NativeMessageQueue(); virtualvoidraiseException(JNIEnv*env,constchar*msg,jthrowableexceptionObj); voidpollOnce(JNIEnv*env,jobjectobj,inttimeoutMillis); voidwake(); voidsetFileDescriptorEvents(intfd,intevents); virtualinthandleEvent(intfd,intevents,void*data);
handleEvent就是在looper中epoll_wait之后,当我们增加的fd有数据就会调用这个函数
intNativeMessageQueue::handleEvent(intfd,intlooperEvents,void*data){ intevents=0; if(looperEvents&Looper::EVENT_INPUT){ events|=CALLBACK_EVENT_INPUT; } if(looperEvents&Looper::EVENT_OUTPUT){ events|=CALLBACK_EVENT_OUTPUT; } if(looperEvents&(Looper::EVENT_ERROR|Looper::EVENT_HANGUP|Looper::EVENT_INVALID)){ events|=CALLBACK_EVENT_ERROR; } intoldWatchedEvents=reinterpret_cast<intptr_t>(data); intnewWatchedEvents=mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents,fd,events);//调用回调 if(!newWatchedEvents){ return0;//unregisterthefd } if(newWatchedEvents!=oldWatchedEvents){ setFileDescriptorEvents(fd,newWatchedEvents); } return1; }
最后在java的MessageQueue中的dispatchEvents就是在jni层反调过来的,然后调用之前注册的回调函数
//Calledfromnativecode. privateintdispatchEvents(intfd,intevents){ //Getthefiledescriptorrecordandanystatethatmightchange. finalFileDescriptorRecordrecord; finalintoldWatchedEvents; finalOnFileDescriptorEventListenerlistener; finalintseq; synchronized(this){ record=mFileDescriptorRecords.get(fd);//通过fd得到FileDescriptorRecord if(record==null){ return0;//spurious,nolistenerregistered } oldWatchedEvents=record.mEvents; events&=oldWatchedEvents;//filtereventsbasedoncurrentwatchedset if(events==0){ returnoldWatchedEvents;//spurious,watchedeventschanged } listener=record.mListener; seq=record.mSeq; } //Invokethelisteneroutsideofthelock. intnewWatchedEvents=listener.onFileDescriptorEvents(//listener回调 record.mDescriptor,events); if(newWatchedEvents!=0){ newWatchedEvents|=OnFileDescriptorEventListener.EVENT_ERROR; } //Updatethefiledescriptorrecordifthelistenerchangedthesetof //eventstowatchandthelisteneritselfhasn'tbeenupdatedsince. if(newWatchedEvents!=oldWatchedEvents){ synchronized(this){ intindex=mFileDescriptorRecords.indexOfKey(fd); if(index>=0&&mFileDescriptorRecords.valueAt(index)==record &&record.mSeq==seq){ record.mEvents=newWatchedEvents; if(newWatchedEvents==0){ mFileDescriptorRecords.removeAt(index); } } } } //Returnthenewsetofeventstowatchfornativecodetotakecareof. returnnewWatchedEvents; }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。