[i] Epoll 多路复用器
Redis采用Epoll对网络I/O事件进行监听,例如与新client建立连接,读取客户端操作db请求等。相对select, poll方式它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,其内核态主要底层结构为一个Red-black Tree和Ready List (linked list)。
Red-black tree负责管理已注册的I/O事件(add, mod, delete等,操作性能为logn级),add (epoll_ctl(EPOLL_CTL_ADD))时和相应fd绑定事件并注册回调函数ep_poll_callback,一旦监听文件的事件就绪,ep_poll_callback立即被调用,将fd对应的event(epi结构体.event)加入到Ready List中,当用户调用epoll_wait时,epoll加锁,将Ready List就绪的数据拷贝至用户空间,即epoll_event数组中。
底层原理详见: epoll源码分析
![[epoll_demux]](/2021/04/23/blog8/epoll_demux.png)
epoll除了提供select/pollIO事件的水平触发LT(Level Triggered)外,还提供了边沿触发ET(Edge Triggered),配合非阻塞fd,这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
![[epoll_ET_LT]](/2021/04/23/blog8/epoll_ET_LT.png)
[ii] Epoll Reactor监听模型
弃用data.fd,利用epoll_events的data.ptr泛型指针指向并关联自定义事件myEvent,同时向myEvent.call_back注册回调函数来处理未来的就绪事件,例如accpetconn(新增cfd连接),recvdata(读取client连接传入数据)或senddata(向client写回数据),因此无需对fd的类型进行判别,epoll_wait之后拿到ready list直接调用event.data.ptr指向的myEvent.call_back()函数处理相关事件
![[epoll_reactor]](/2021/04/23/blog8/epoll_reactor.png)
[iii] Redis网络循环事件结构体
总的来说Linux Redis采用了Epoll Reactor模型,并配合非阻塞fd+ET模式对网络I/O事件进行监听,高并发处理效果非常好。下面来总结redis网络事件模型中的相关结构体:
全局结构体 server
// 全局变量,记录几乎所有redis相关信息的描述符 |
全局事件状态结构体 server.el
即redis的事件驱动循环中心(aeEventLoop),内部有3个主要的数据结构:文件事件结构体,时间事件结构体和触发事件结构体。
server.el 结构
aeEventLoop *el;
/* State of an event based program
*
* 事件处理器的状态
*/
typedef struct aeEventLoop {
// 目前已注册的最大描述符
int maxfd; /* highest file descriptor currently registered so far */
// 目前已追踪的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用于生成时间事件 id
long long timeEventNextId;
// 最后一次执行时间事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 已注册的文件事件
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件
aeFiredEvent *fired; /* Fired events */
// 时间事件
aeTimeEvent *timeEventHead;
// 事件处理器的开关
int stop;
// 多路复用库的私有数据 epoll相关
void *apidata; /* This is used for polling API specific data */
// 在处理事件前要执行的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop;aeEventLopp.apidata结构,存储epoll相关数据
apidata = struct aeApiState* state;
/*
* 事件状态
*/
typedef struct aeApiState {
// epoll_event 实例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
- 文件事件结构体 (aeFileEvent): 维护I/O事件表,相当于libevent中自定义myevent结构,内部包含mask(对应epoll中的events标识位) + w/rfileProc回调处理函数(call_back函数指针)
- 时间事件结构体 (aeTimeEvent): 时间事件…
触发事件结构体 (aeFiredEvent): 已就绪的事件,每次轮询epoll_wait后会将apidata中的就绪event信息拷贝至aeFiredEvent数组,包含已就绪的fd和mask
/* File event structure
*
* 文件事件结构
*/
typedef struct aeFileEvent {
// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// 多路复用库的私有数据
void *clientData;
} aeFileEvent;
/* Time event structure
*
* 时间事件结构
*/
typedef struct aeTimeEvent {
// 时间事件的唯一标识符
long long id; /* time event identifier. */
// 事件的到达时间
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件处理函数
aeTimeProc *timeProc;
// 事件释放函数
aeEventFinalizerProc *finalizerProc;
// 多路复用库的私有数据
void *clientData;
// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;
} aeTimeEvent;
/* A fired event
*
* 已就绪事件
*/
typedef struct aeFiredEvent {
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;
[iv] Redis事件监听服务源码追踪
Redis 的事件驱动总结如下
初始化事件循环结构体 (aeEventLoop)
注册监听套接字的读事件 (aeFileEvent)
注册定时事件 (aeTimeEvent)
进入事件循环 (aeMain(server.el);)
如果监听套接字变为可读,会接收客户端请求,并为对应的套接字注册读事件
如果与客户端连接的套接字变为可读,执行相应的操作
![[event_model]](/2021/04/23/blog8/event_model.png)
Redis主函数开始剖析
主函数main位于redis.c文件的底部位置,网络服务主要处理函数如下:
- initServerConfig: 初始化服务器配置,例如上设置默认监听端口号6379、aof配置信息、集群配置等
- initServer: 创建并初始化服务器数据结构,例如初始化全局事件循环结构体server.el (aeEventLoop)、创建监听fd (执行socket -> bind -> listen函数,得到listen_fd数组,存往server.ipfd指针)、初始化aeTimeEvent并注册时间事件、初始化aeFileEvent并注册监听套接字的读事件
aeMain: 开始通过多路复用器循环监听I/O事件,并处理各项读写/时间事件
int main(int argc, char **argv) {
// ......
// 初始化服务器配置
initServerConfig();
// 创建并初始化服务器数据结构
initServer();
// 运行事件处理器,一直到服务器关闭为止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el); // 循环监听事件 while(!stop)
// 服务器关闭,停止事件循环
aeDeleteEventLoop(server.el);
return 0;
}
initServer函数跟踪
这里省略 initServerConfig - 其内容主要为对全局server结构体的初始化
void initServer() { |
aeCreateEventLoop函数: 初始化I/O事件表和各类事件结构,其中eventLoop->events为aeFileEvent类型的数组,他的idx表示fd,element是aeFileEvent结构体。
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 创建事件状态结构
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 初始化文件事件结构和已就绪文件事件结构数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
// 设置数组大小
eventLoop->setsize = setsize;
// 初始化执行最近一次执行时间
eventLoop->lastTime = time(NULL);
// 初始化时间事件结构
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
// 创建epoll实例,调用epoll_create,初始化等待队列epoll_event
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 初始化监听事件
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
// 返回事件循环
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
// 创建一个新的 epoll 实例,并将它赋值给 eventLoop
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 初始化事件槽空间 - epoll_event就绪队列初始化
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 关键代码,创建 epoll 实例
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 赋值给 eventLoop
eventLoop->apidata = state;
return 0;
}listenToPort函数:
int listenToPort(int port, int *fds, int *count) {
int j;
// fds为监听server.ipfd数组
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
if (server.bindaddr[j] == NULL) {
// ...ipv6省略
// 初始化监听服务 (socket -> bind -> listen -> return listen_fd)
// fds -->> server.ipfd
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
/* Exit the loop if we were able to bind * on IPv4 or IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
if (*count) break;
} else if (strchr(server.bindaddr[j],':')) {
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
} else {
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
}
if (fds[*count] == ANET_ERR) {
redisLog(REDIS_WARNING,
"Creating Server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
return REDIS_ERR;
}
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return REDIS_OK;
}
anetTcpServer函数,创建监听socket返回fd (socket -> bind -> listen(fd)):
int anetTcpServer(char *err, int port, char *bindaddr, int backlog) {
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) {
int s, rv; /* s为listen fd 返回 */
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,"%d",port);
memset(&hints,0,sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket");
goto error;
}
error:
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
if (listen(s, backlog) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
aeCreateFileEvent函数:
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
if (fd >= eventLoop->setsize) return AE_ERR;
// 取出文件事件结构
// eventLoop->events:相当于epoll反应堆模型中的全局MyEvent数组
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定 fd 的指定事件 linux对应epoll上树操作
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
// 注册回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 私有数据
fe->clientData = clientData;
// 如果有需要,更新事件处理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
aeApiAddEvent函数: 关联给定事件到fd,取出全局变量server的文件事件(aeFileEvent)server.el.events[listen_fd],将事件回调函数、读写类型枚举mask写入
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation.
*/
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// 注册事件到 epoll
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
acceptTcpHandler [回调函数]: 在initServer的aeCreateFileEvent函数调用中被注册到listen_fd对应的文件事件(server.el.events[listen_fd])。|| 当后面listen_fd就绪时,开始调用anetTcpAccept函数与客户端建立TCP连接,再通过调用acceptCommonHandler函数创建客户端结构体、初始化状态,最后调用aeCreateFileEvent将client_fd(cfd)及对应的事件注册到全局I/O事件表server.el.events[]中,该cfd的回调函数是readQueryFromClient。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while(max--) {
// accept 客户端连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 为客户端创建客户端状态(redisClient)
acceptCommonHandler(cfd,0);
}
}
anetTcpAccept函数: 调用anetGenericAccept函数建立TCP连接
/*
* TCP 连接 accept 函数
*/
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
// 建立tcp连接
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
if (sa.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&sa;
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
// 字节序转换 big-endian to little-endian
if (port) *port = ntohs(s->sin_port);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
return fd;
}
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
acceptCommonHandler函数: 调用createClient函数初始化新client结构,并且createClient会给新的client_fd向全局I/O事件表server.el.events[]中注册读事件监听,即为cfd调用aeCreateFileEvent函数,call_back为readQueryFromClient函数:
static void acceptCommonHandler(int fd, int flags) {
// 创建客户端
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
// ......
// 更新连接次数
server.stat_numconnections++;
// 设置 FLAG
c->flags |= flags;
}
/*
* 创建一个新客户端
*/
redisClient *createClient(int fd) {
// 分配空间
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
// 当 fd 不为 -1 时,创建带网络连接的客户端
// 如果 fd 为 -1 ,那么创建无网络连接的伪客户端
// 因为 Redis 的命令必须在客户端的上下文中使用,所以在执行 Lua 环境中的命令时
// 需要用到这种伪终端
if (fd != -1) {
// 非阻塞: 内部使用fcntl(fd, NONBLOCK)
anetNonBlock(NULL,fd);
// 禁用 Nagle 算法
anetEnableTcpNoDelay(NULL,fd);
// 设置 keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 绑定读事件到事件 loop (开始接收命令请求)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
close(fd);
zfree(c);
return NULL;
}
}
// 初始化各个属性
// 默认数据库
selectDb(c,0);
// 套接字
c->fd = fd;
// 名字
c->name = NULL;
// ......
// 返回客户端
return c;
}readQueryFromClient[回调函数]: 当和client_fd就绪时调用,即从client读取请求内容,并在最后调用processInputBuffer函数进行解析、执行命令:
/*
* 读取客户端的查询缓冲区内容
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 设置服务器的当前客户端
server.current_client = c;
// 读入长度(默认为 16 MB)
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
// 获取查询缓冲区当前内容的长度
// 如果读取出现 short read ,那么可能会有内容滞留在读取缓冲区里面
// 这些滞留内容也许不能完整构成一个符合协议的命令,
qblen = sdslen(c->querybuf);
// 如果有需要,更新缓冲区内容长度的峰值(peak)
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 为查询缓冲区分配空间
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 读入内容到查询缓存
nread = read(fd, c->querybuf+qblen, readlen);
// 读入出错
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
// 遇到 EOF
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
// 根据内容,更新查询缓冲区(SDS) free 和 len 属性
// 并将 '\0' 正确地放到内容的最后
sdsIncrLen(c->querybuf,nread);
// 记录服务器和客户端最后一次互动的时间
c->lastinteraction = server.unixtime;
// 如果客户端是 master 的话,更新它的复制偏移量
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
// 在 nread == -1 且 errno == EAGAIN 时运行
server.current_client = NULL;
return;
}
// 查询缓冲区长度超出服务器最大缓冲区长度
// 清空缓冲区并释放客户端
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
// 从查询缓存重读取内容,创建参数,并执行命令
// 函数会执行到缓存中的所有内容都被处理完为止
processInputBuffer(c);
server.current_client = NULL;
}processInputBuffer函数: 代码略,首先会进行多重判断,如果无误则在最后会调用processCommand(redisClient *c)函数处理命令;
processCommand函数: 此时说明我们已经读入了一个完整的命令到客户端,本函数负责执行这个命令。首先会对命令进行解析判断合法性,通过则调用call(redisClient *c, int flags)函数执行命令,其中会执行c->cmd->proc(c),该回调函数里会调用addReply(redisClient c, robj obj)函数向全局I/O事件表中注册写事件,返回客户端:
int processCommand(redisClient *c) {
// ......
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// redis 首先根据客户端给出的命令字在命令表中查找对应的 c->cmd, 即 struct redisCommand().
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// 没有找到命令
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
// 参数个数不符合
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
// .....
// 加入命令队列的情况
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){
// 命令入队
queueMultiCommand(c);
addReply(c,shared.queued);
// 真正执行命令。
// 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
} else {
// 关键代码 - 处理
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}call函数: 其处理的事很多,代码略。这里只关注调用了命令的回调函数 c->cmd->proc(c); proc是一个函数指针(类型: typedef void redisCommandProc(redisClient *c);),之前在processCommand里就会先通过lookupCommand函数根据参数c解析的命令内容,指向具体的地址。
// 全局结构redisCommandTable,存储redis所有命令
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wm",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"r",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"exists",existsCommand,2,"r",0,NULL,1,1,1,0,0},
{"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,"r",0,NULL,1,1,1,0,0},
{"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
{"incr",incrCommand,2,"wm",0,NULL,1,1,1,0,0},
{"decr",decrCommand,2,"wm",0,NULL,1,1,1,0,0},
{"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
//......
};
struct redisCommand {
// 命令名字
char *name;
// 实现函数
redisCommandProc *proc;
// 参数个数
int arity;
// 字符串表示的 FLAG
char *sflags; /* Flags as string representation, one char per flag. */
// 实际 FLAG
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
// 从命令中判断命令的键参数。在 Redis 集群转向时使用。
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
// 指定哪些参数是 key
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
// 统计信息
// microseconds 记录了命令执行耗费的总毫微秒数
// calls 是命令被执行的总次数
long long microseconds, calls;
};
这里以setCommand为例说明
void setCommand(redisClient *c) {
// ......
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
void setGenericCommand(redisClient *c, int flags, robj *key,
robj *val, robj *expire, int unit, robj *ok_reply,
robj *abort_reply) {
// ......
setKey(c->db,key,val);
// ......
// 关键代码,将resp写回client(epoll注册写事件,ready后内核向cfd写回数据)
addReply(c, ok_reply ? ok_reply : shared.ok);
}
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
// ......
}
// setKey() 首先查看 key 是否存在于数据集中,如果存在则覆盖写;如果不存在则添加到数据集中。这里关注 key 不存在的情况:
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
//dictAdd() 把key存到字典哈希表中。
int retval = dictAdd(db->dict, copy, val);
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
}
addReply函数: 首先调用prepareClientToWrite(c)函数注册写事,即aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c);再将写回的内容添加至client结构的buf缓冲区c->buf。待再一次回到事件循环的时候,如果这个套接字可写,相应的回调函数就可以被回调了,回复缓存中的数据会被发送到客户端。
void addReply(redisClient *c, robj *obj) {
// 为客户端安装写处理器到事件循环
if (prepareClientToWrite(c) != REDIS_OK) return;
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it's not needed.
*
* 如果在使用子进程,那么尽可能地避免修改对象的 refcount 域。
*
* If the encoding is RAW and there is room in the static buffer
* we'll be able to send the object to the client without
* messing with its page.
*
* 如果对象的编码为 RAW ,并且静态缓冲区中有空间
* 那么就可以在不弄乱内存页的情况下,将对象发送给客户端。
*/
if (sdsEncodedObject(obj)) {
// 首先尝试复制内容到 c->buf 中,这样可以避免内存分配
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
// 如果 c->buf 中的空间不够,就复制到 c->reply 链表中
// 可能会引起内存分配
_addReplyObjectToList(c,obj);
} else if (obj->encoding == REDIS_ENCODING_INT) {
/* Optimization: if there is room in the static buffer for 32 bytes
* (more than the max chars a 64 bit integer can take as string) we
* avoid decoding the object and go for the lower level approach. */
// 优化,如果 c->buf 中有等于或多于 32 个字节的空间
// 那么将整数直接以字符串的形式复制到 c->buf 中
if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
char buf[32];
int len;
len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) == REDIS_OK)
return;
/* else... continue with the normal code path, but should never
* happen actually since we verified there is room. */
}
// 执行到这里,代表对象是整数,并且长度大于 32 位
// 将它转换为字符串
obj = getDecodedObject(obj);
// 保存到缓存中
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
} else {
redisPanic("Wrong obj->encoding in addReply()");
}
}
aeMain函数跟踪
aeMain(server.el); // 循环监听事件 while(!stop) |
aeProcessEvents函数: Process every pending time event, then every pending file event. Without special flags the function sleeps until some file event fires, or when the next time event occurs (if any). If flags is 0, the function does nothing and returns. if flags has AE_ALL_EVENTS set, all the kind of events are processed. if flags has AE_FILE_EVENTS set, file events are processed.
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 获取最近的时间事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 文件事件可以阻塞直到有事件到达为止
tvp = NULL; /* wait forever */
}
}
// 关键调用,处理文件事件,阻塞时间由 tvp 决定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// Reactor模型的精髓所在:
// 读事件,执行对应回调函数
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件,执行对应回调函数
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}aeApiPoll函数:
numevents = aeApiPoll(eventLoop, tvp);
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 关键调用,多路复用监听
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
// 有至少一个事件就绪?
if (retval > 0) {
int j;
// 为已就绪事件设置相应的模式
// 并加入到 eventLoop 的 fired 数组中
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
// 返回已就绪事件个数
return numevents;
}
至此,有关Redis是如何利用Epoll的Reactor模型提供网络服务的流程概述已经梳理了一遍,本文结束。