关于Redis网络模型的源码详析
前言
Redis的网络模型是基于I/O多路复用程序来实现的。源码中包含四种多路复用函数库epoll、select、evport、kqueue。在程序编译时会根据系统自动选择这四种库其中之一。下面以epoll为例,来分析Redis的I/O模块的源码。
epoll系统调用方法
Redis网络事件处理模块的代码都是围绕epoll那三个系统方法来写的。先把这三个方法弄清楚,后面就不难了。
epfd=epoll_create(1024);
创建epoll实例
参数:表示该epoll实例最多可监听的socketfd(文件描述符)数量。
返回:epoll专用的文件描述符。
intepoll_ctl(intepfd,intop,intfd,structepoll_event*event)
管理epoll中的事件,对事件进行注册、修改和删除。
参数:
epfd:epoll实例的文件描述符;
op:取值三种:EPOLL_CTL_ADD注册、EPOLL_CTL_MOD修改、EPOLL_CTL_DEL删除;
fd:socket的文件描述符;
epoll_event*event:事件
event代表一个事件,类似于JavaNIO中的channel“通道”。epoll_event的结构如下:
typedefunionepoll_data{ void*ptr; intfd;/*socket文件描述符*/ __uint32_tu32; __uint64_tu64; }epoll_data_t; structepoll_event{ __uint32_tevents;/*Epollevents就是各种待监听操作的操作码求与的结果,例如EPOLLIN(fd可读)、EPOLLOUT(fd可写)*/ epoll_data_tdata;/*Userdatavariable*/ };
intepoll_wait(intepfd,structepoll_event*events,intmaxevents,inttimeout);
等待事件是否就绪,类似于JavaNIO中select方法。如果事件就绪,将就绪的event存入events数组中。
参数
epfd:epoll实例的文件描述符;
events:已就绪的事件数组;
intmaxevents:每次能处理的事件数;
timeout:阻塞时间,等待产生就绪事件的超时值。
源码分析
事件
Redis事件系统中将事件分为两种类型:
- 文件事件;网络套接字对应的事件;
- 时间事件:Redis中一些定时操作事件,例如serverCron函数。
下面从事件的注册、触发两个流程对源码进行分析
绑定事件
建立eventLoop
在initServer方法(由redis.c的main函数调用)中,在建立RedisDb对象的同时,会初始化一个“eventLoop”对象,我称之为事件处理器对象。结构体的关键成员变量如下所示:
structaeEventLoop{ aeFileEvent*events;//已注册的文件事件数组 aeFiredEvent*fired;//已就绪的文件事件数组 aeTimeEvent*timeEventHead;//时间事件数组 ... }
初始化eventLoop在ae.c的“aeCreateEventLoop”方法中执行。该方法中除了初始化eventLoop还调用如下方法初始化了一个epoll实例。
/* *ae_epoll.c *创建一个新的epoll实例,并将它赋值给eventLoop */ staticintaeApiCreate(aeEventLoop*eventLoop){ aeApiState*state=zmalloc(sizeof(aeApiState)); if(!state)return-1; //初始化事件槽空间 state->events=zmalloc(sizeof(structepoll_event)*eventLoop->setsize); if(!state->events){ zfree(state); return-1; } //创建epoll实例 state->epfd=epoll_create(1024);/*1024isjustahintforthekernel*/ if(state->epfd==-1){ zfree(state->events); zfree(state); return-1; } //赋值给eventLoop eventLoop->apidata=state; return0; }
也正是在此处调用了系统方法“epoll_create”。这里的state是一个aeApiState结构,如下所示:
/* *事件状态 */ typedefstructaeApiState{ //epoll实例描述符 intepfd; //事件槽 structepoll_event*events; }aeApiState;
这个state由eventLoop->apidata来记录。
绑定ip端口与句柄
通过listenToPort方法开启TCP端口,每个IP端口会对应一个文件描述符ipfd(因为服务器可能会有多个ip地址)
//打开TCP监听端口,用于等待客户端的命令请求 if(server.port!=0&& listenToPort(server.port,server.ipfd,&server.ipfd_count)==REDIS_ERR) exit(1);
注意:*eventLoop和ipfd分别被server.el和server.ipfd[]引用。server是结构体RedisServer的实例,是Redis的全局变量。
注册事件
如下所示代码,为每一个文件描述符绑定一个事件函数
//initServer方法: for(j=0;j=eventLoop->setsize){ errno=ERANGE; returnAE_ERR; } if(fd>=eventLoop->setsize)returnAE_ERR; //取出文件事件结构 aeFileEvent*fe=&eventLoop->events[fd]; //监听指定fd的指定事件 if(aeApiAddEvent(eventLoop,fd,mask)==-1) returnAE_ERR; //设置文件事件类型,以及事件的处理器 fe->mask|=mask; if(mask&AE_READABLE)fe->rfileProc=proc; if(mask&AE_WRITABLE)fe->wfileProc=proc; //私有数据 fe->clientData=clientData; //如果有需要,更新事件处理器的最大fd if(fd>eventLoop->maxfd) eventLoop->maxfd=fd; returnAE_OK; }
aeCreateFileEvent函数中有一个方法调用:aeApiAddEvent,代码如下
/* *ae_epoll.c *关联给定事件到fd */ staticintaeApiAddEvent(aeEventLoop*eventLoop,intfd,intmask){ aeApiState*state=eventLoop->apidata; structepoll_eventee; /*Ifthefdwasalreadymonitoredforsomeevent,weneedaMOD *operation.OtherwiseweneedanADDoperation. * *如果fd没有关联任何事件,那么这是一个ADD操作。 * *如果已经关联了某个/某些事件,那么这是一个MOD操作。 */ intop=eventLoop->events[fd].mask==AE_NONE? EPOLL_CTL_ADD:EPOLL_CTL_MOD; //注册事件到epoll ee.events=0; mask|=eventLoop->events[fd].mask;/*Mergeoldevents*/ if(mask&AE_READABLE)ee.events|=EPOLLIN; if(mask&AE_WRITABLE)ee.events|=EPOLLOUT; ee.data.u64=0;/*avoidvalgrindwarning*/ ee.data.fd=fd; if(epoll_ctl(state->epfd,op,fd,&ee)==-1)return-1; return0; }
这里实际上就是调用系统方法“epoll_ctl”,将事件(文件描述符)注册进epoll中。首先要封装一个epoll_event结构,即ee,通过“epoll_ctl”将其注册进epoll中。
除此之外,aeCreateFileEvent还完成了下面两个重要操作:
- 将事件函数“acceptTcpHandler”存入了eventLoop中,即由eventLoop->events[fd]->rfileProc来引用(也可能是wfileProc,分别代表读事件和写事件);
- 将当操作码添加进eventLoop->events[fd]->mask中(mask类似于JavaNIO中的ops操作码,代表事件类型)。
事件监听与执行
redis.c的main函数会调用ae.c中的main方法,如下所示:
/* *事件处理器的主循环 */ voidaeMain(aeEventLoop*eventLoop){ eventLoop->stop=0; while(!eventLoop->stop){ //如果有需要在事件处理前执行的函数,那么运行它 if(eventLoop->beforesleep!=NULL) eventLoop->beforesleep(eventLoop); //开始处理事件 aeProcessEvents(eventLoop,AE_ALL_EVENTS); } }
上述代码会调用aeProcessEvents方法用于处理事件,方法如下所示
/*Processeverypendingtimeevent,theneverypendingfileevent *(thatmayberegisteredbytimeeventcallbacksjustprocessed). * *处理所有已到达的时间事件,以及所有已就绪的文件事件。 *函数的返回值为已处理事件的数量 */ intaeProcessEvents(aeEventLoop*eventLoop,intflags) { intprocessed=0,numevents; /*Nothingtodo?returnASAP*/ if(!(flags&AE_TIME_EVENTS)&&!(flags&AE_FILE_EVENTS))return0; if(eventLoop->maxfd!=-1|| ((flags&AE_TIME_EVENTS)&&!(flags&AE_DONT_WAIT))){ intj; aeTimeEvent*shortest=NULL; structtimevaltv,*tvp; //获取最近的时间事件 if(flags&AE_TIME_EVENTS&&!(flags&AE_DONT_WAIT)) shortest=aeSearchNearestTimer(eventLoop); if(shortest){ //如果时间事件存在的话 //那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间 longnow_sec,now_ms; /*Calculatethetimemissingforthenearest *timertofire.*/ //计算距今最近的时间事件还要多久才能达到 //并将该时间距保存在tv结构中 aeGetTime(&now_sec,&now_ms); tvp=&tv; tvp->tv_sec=shortest->when_sec-now_sec; if(shortest->when_mstv_usec=((shortest->when_ms+1000)-now_ms)*1000; tvp->tv_sec--; }else{ tvp->tv_usec=(shortest->when_ms-now_ms)*1000; } //时间差小于0,说明事件已经可以执行了,将秒和毫秒设为0(不阻塞) if(tvp->tv_sec<0)tvp->tv_sec=0; if(tvp->tv_usec<0)tvp->tv_usec=0; }else{ //执行到这一步,说明没有时间事件 //那么根据AE_DONT_WAIT是否设置来决定是否阻塞,以及阻塞的时间长度 /*Ifwehavetocheckforeventsbutneedtoreturn *ASAPbecauseofAE_DONT_WAITweneedtosetthetimeout *tozero*/ if(flags&AE_DONT_WAIT){ //设置文件事件不阻塞 tv.tv_sec=tv.tv_usec=0; tvp=&tv; }else{ /*Otherwisewecanblock*/ //文件事件可以阻塞直到有事件到达为止 tvp=NULL;/*waitforever*/ } } //处理文件事件,阻塞时间由tvp决定 numevents=aeApiPoll(eventLoop,tvp); for(j=0;j events[eventLoop->fired[j].fd]; intmask=eventLoop->fired[j].mask; intfd=eventLoop->fired[j].fd; intrfired=0; /*notethefe->mask&mask&...code:maybeanalreadyprocessed *eventremovedanelementthatfiredandwestilldidn't *processed,sowecheckiftheeventisstillvalid.*/ //读事件 if(fe->mask&mask&AE_READABLE){ //rfired确保读/写事件只能执行其中一个 rfired=1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } //写事件 if(fe->mask&mask&AE_WRITABLE){ if(!rfired||fe->wfileProc!=fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } /*Checktimeevents*/ //执行时间事件 if(flags&AE_TIME_EVENTS) processed+=processTimeEvents(eventLoop); returnprocessed; }
该函数中代码大致分为三个主要步骤
- 根据时间事件与当前时间的关系,决定阻塞时间tvp;
- 调用aeApiPoll方法,将就绪事件都写入eventLoop->fired[]中,返回就绪事件数目;
- 遍历eventLoop->fired[],遍历每一个就绪事件,执行之前绑定好的方法rfileProc或者wfileProc。
ae_epoll.c中的aeApiPoll方法如下所示:
/* *获取可执行事件 */ staticintaeApiPoll(aeEventLoop*eventLoop,structtimeval*tvp){ aeApiState*state=eventLoop->apidata; intretval,numevents=0; //等待时间 retval=epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp?(tvp->tv_sec*1000+tvp->tv_usec/1000):-1); //有至少一个事件就绪? if(retval>0){ intj; //为已就绪事件设置相应的模式 //并加入到eventLoop的fired数组中 numevents=retval; for(j=0;jevents+j; if(e->events&EPOLLIN)mask|=AE_READABLE; if(e->events&EPOLLOUT)mask|=AE_WRITABLE; if(e->events&EPOLLERR)mask|=AE_WRITABLE; if(e->events&EPOLLHUP)mask|=AE_WRITABLE; eventLoop->fired[j].fd=e->data.fd; eventLoop->fired[j].mask=mask; } } //返回已就绪事件个数 returnnumevents; }
执行epoll_wait后,就绪的事件会被写入eventLoop->apidata->events事件槽。后面的循环就是将事件槽中的事件写入到eventLoop->fired[]中。具体描述:每一个事件都是一个epoll_event结构,用e来指代,则e.data.fd代表文件描述符,e->events表示其操作码,将操作码转化为mask,最后将fd和mask都写入eventLoop->fired[j]中。
之后,在外层的aeProcessEvents方法中会执行函数指针rfileProc或者wfileProc指向的方法,例如前文提到已注册的“acceptTcpHandler”。
总结
Redis的网络模块其实是一个简易的Reactor模式。本文顺着“服务端注册事件——>接受客户端连接——>监听事件是否就绪——>执行事件”这样的路线,来分析Redis源码,描述了Redis接受客户端connect的过程。实际上NIO的思想都基本类似。
到此这篇关于Redis网络模型的源码详析的文章就介绍到这了,更多相关Redis网络模型源码内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!