Redis can now optionally use threads to handle I/O, allowing to serve 2 times as much operations per second in a single instance when pipelining cannot be used.
目前对于单线程 Redis 来说,性能瓶颈主要在于网络的 IO 消耗,优化主要有两个方向:
- 提高网络 IO 性能,典型的实现像使用 DPDK 来替代内核网络栈的方式
- 使用多线程充分利用多核,典型的实现像 Memcached
协议栈优化的这种方式跟 Redis 关系不大,支持多线程是一种最有效最便捷的操作方式。所以 Redis 从 6.0 版本开始引入了 Threaded I/O,目的是为了提升执行命令前后的网络 I/O 性能。
但跟 Memcached 这种从 IO 处理到数据访问都是多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。
事件循环处理器 —— AE
Redis 使用了一个称为“A simple event-driven programming library”的自制异步事件库(简称“AE”),是 Redis 处理流程的核心。下面会对大体流程做一个简略的介绍,类似于背景,方便理解后面的 Threaded I/O 实现。
initServer
- 遍历 bind 的地址(ipfd),设置 TCP 连接事件的处理句柄为
acceptTcpHandler
- 设置 beforesleep 和 aftersleep 回调函数
// server.c
int main(int argc, char **argv) {
...
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */
...
initServer();
...
aeMain(server.el);
...
}
void initServer(void) {
...
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets.
* 内部调用 aeCreateFileEvent() */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
...
// 设置 beforesleep 和 aftersleep 回调函数
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
...
}
// server.c
int main(int argc, char **argv) {
...
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */
...
initServer();
...
aeMain(server.el);
...
}
void initServer(void) {
...
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets.
* 内部调用 aeCreateFileEvent() */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
...
// 设置 beforesleep 和 aftersleep 回调函数
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
...
}
// server.c int main(int argc, char **argv) { ... ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ ... initServer(); ... aeMain(server.el); ... } void initServer(void) { ... /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. * 内部调用 aeCreateFileEvent() */ if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); } ... // 设置 beforesleep 和 aftersleep 回调函数 aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); ... }
acceptTcpHandler
- 连接 Redis 时,TCP 连接事件被触发,
acceptTcpHandler
事件句柄被调用 - 在
acceptCommonHandler
中,创建 client 对象,并将 IO 读事件的处理句柄设置为readQueryFromClient
// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
...
while(max--) {
// 接受客户端的请求(tcp链接),返回fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
// 处理客户端的请求
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
...
return;
}
...
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
...
// 内部调用 aeCreateFileEvent()
connSetReadHandler(conn, readQueryFromClient);
}
...
}
// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
...
while(max--) {
// 接受客户端的请求(tcp链接),返回fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
// 处理客户端的请求
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
...
return;
}
...
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
...
// 内部调用 aeCreateFileEvent()
connSetReadHandler(conn, readQueryFromClient);
}
...
}
// networking.c void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ... while(max--) { // 接受客户端的请求(tcp链接),返回fd cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); ... // 处理客户端的请求 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } } static void acceptCommonHandler(connection *conn, int flags, char *ip) { ... /* Create connection and client */ if ((c = createClient(conn)) == NULL) { ... return; } ... } client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); if (conn) { ... // 内部调用 aeCreateFileEvent() connSetReadHandler(conn, readQueryFromClient); } ... }
aeMain
- 循环执行 aeProcessEvents
- beforesleep -> aeApiPoll -> aftersleep
- 遍历就绪的文件事件,执行已绑定的读写回调函数(通常为先读后写)
- io读事件:readQueryFromClient
- io写事件:sendReplyToClient
// ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
...
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires.
* IO 多路复用,等待文件事件就绪 */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 遍历文件事件
for (j = 0; j < numevents; j++) {
...
// 反转标识符
int invert = fe->mask & AE_BARRIER;
// 如果invert为false,先读
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 写
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 如果invert为true,后读
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
// ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
...
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires.
* IO 多路复用,等待文件事件就绪 */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 遍历文件事件
for (j = 0; j < numevents; j++) {
...
// 反转标识符
int invert = fe->mask & AE_BARRIER;
// 如果invert为false,先读
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 写
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 如果invert为true,后读
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
// ae.c void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. * IO 多路复用,等待文件事件就绪 */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 遍历文件事件 for (j = 0; j < numevents; j++) { ... // 反转标识符 int invert = fe->mask & AE_BARRIER; // 如果invert为false,先读 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } // 写 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } // 如果invert为true,后读 if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 处理时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
旧逻辑
- Client 中 queryBuf 为请求缓存,buf 为结果缓存(如果是大对象,则写入 reply)
- aeProcessEvents() 流程
- [beforeSleep] 检查 server.clients_pending_write(前一次的待回复列表),并响应客户端
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
...
}
int handleClientsWithPendingWrites(void) {
...
// 遍历 clients_pending_write
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
...
// 先执行 writeToClient()
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 还有数据要写(例如缓冲区已经写满了)
if (clientHasPendingReplies(c)) {
...
// 绑定 fileEvent 为 sendReplyToClient
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
...
}
int handleClientsWithPendingWrites(void) {
...
// 遍历 clients_pending_write
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
...
// 先执行 writeToClient()
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 还有数据要写(例如缓冲区已经写满了)
if (clientHasPendingReplies(c)) {
...
// 绑定 fileEvent 为 sendReplyToClient
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); ... } int handleClientsWithPendingWrites(void) { ... // 遍历 clients_pending_write listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { ... // 先执行 writeToClient() if (writeToClient(c->fd,c,0) == C_ERR) continue; // 还有数据要写(例如缓冲区已经写满了) if (clientHasPendingReplies(c)) { ... // 绑定 fileEvent 为 sendReplyToClient if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; }
- IO多路复用,等待文件事件
- 遍历文件事件,若读(写)就绪,执行对应的回调函数。
- [sendReplyToClient] 内部直接调用了 writeToClient,区别只是参数 handler_installed 不同(前面 handleClientsWithPendingWrites 中是 0,这里是 1)
- [readQueryFromClient] 将连接中的数据读取进 client.querybuf 消息缓冲区
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
...
nread = read(fd, c->querybuf+qblen, readlen);
...
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
...
// 内部执行 processCommand()
processInputBufferAndReplicate(c);
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
...
nread = read(fd, c->querybuf+qblen, readlen);
...
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
...
// 内部执行 processCommand()
processInputBufferAndReplicate(c);
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { ... nread = read(fd, c->querybuf+qblen, readlen); ... } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } ... // 内部执行 processCommand() processInputBufferAndReplicate(c); }
- [processCommand] call() 执行 redis 命令
int processCommand(client *c) {
// 一系列复杂的处理过程
...
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
...
} else {
call(c,CMD_CALL_FULL);
...
}
return C_OK;
}
int processCommand(client *c) {
// 一系列复杂的处理过程
...
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
...
} else {
call(c,CMD_CALL_FULL);
...
}
return C_OK;
}
int processCommand(client *c) { // 一系列复杂的处理过程 ... /* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { ... } else { call(c,CMD_CALL_FULL); ... } return C_OK; }
- 命令函数内部会调用 addReply(),将结果放入 client.buf 中,并将 client 加入到 clients_pending_write 中
void addReply(client *c, robj *obj) {
/* 内部调用 clientInstallWriteHandler(),将 client 加入到
clients_pending_write */
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 小对象写 buf
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
// 大对象写 reply
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
...
} else {
...
}
}
void addReply(client *c, robj *obj) {
/* 内部调用 clientInstallWriteHandler(),将 client 加入到
clients_pending_write */
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 小对象写 buf
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
// 大对象写 reply
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
...
} else {
...
}
}
void addReply(client *c, robj *obj) { /* 内部调用 clientInstallWriteHandler(),将 client 加入到 clients_pending_write */ if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { // 小对象写 buf if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 大对象写 reply _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { ... } else { ... } }
新逻辑
- io_threads_list:长度为线程数的列表,每一个元素又是一个新的列表,表示这一线程需要处理的 client 对象
- Redis 的 Threaded I/O 瞬时只能处于读或写的状态,不能部分线程读,部分写
- aeProcessEvents() 流程
- beforeSleep
- [handleClientsWithPendingReadsUsingThreads] 检查 server.clients_pending_read(前一次的待读取列表),并从客户端读取数据
- [handleClientsWithPendingWritesUsingThreads] 检查 server.clients_pending_write(前一次的待回复列表),并回复客户端
- beforeSleep
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
...
}
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
...
}
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); ... /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); ... }
- IO多路复用,等待文件事件
- 『延迟读』遍历文件事件,若读or写就绪,执行对应的回调函数
- [readQueryFromClient 延迟读] 遍历文件事件,将可以线程化读的 client 对象添加到 server.clients_pending_read 列表中
- [sendReplyToClient] 内部直接调用了 writeToClient,区别只是参数 handler_installed 不同(sendReplyToClient 中调用 writeToClient,参数为 1)
源码分析
有了上面的整体印象,接下来会对 Threaded I/O 相关的源码进行分析
readQueryFromClient
- 向 redis 发送命令时,IO 事件触发,
readQueryFromClient
句柄被调用 - 『启用了 threaded I/O 且满足条件』执行
postponeClientRead
,将 client 设置为CLIENT_PENDING_READ 状态,并将 client 添加到 server.clients_pending_read 链表头部 - 『不满足条件』读数据到 client 的 queryBuf 并进行处理
- 未启用 threaded I/O
- IO 线程,client 状态为 CLIENT_PENDING_READ
- 『未启用 threaded I/O』执行命令并将结果写入到 client 的 buf 中,同时将 client 加入到 clients_pending_write
// networking.c
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
...
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
...
// 读请求数据
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
...
processInputBuffer(c);
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads && // 是否开启多线程io
!ProcessingEventsWhileBlocked &&
// client 不是这四种状态之一
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
// 循环处理 queryBuf 中的数据,解析命令
...
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command.
* 如果是 io 多路复用场景,只修改标识符,不会执行命令 */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(),
* 完成 clients_pending_write 的插入和 buf 的写入 */
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
...
}
// networking.c
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
...
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
...
// 读请求数据
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
...
processInputBuffer(c);
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads && // 是否开启多线程io
!ProcessingEventsWhileBlocked &&
// client 不是这四种状态之一
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
// 循环处理 queryBuf 中的数据,解析命令
...
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command.
* 如果是 io 多路复用场景,只修改标识符,不会执行命令 */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(),
* 完成 clients_pending_write 的插入和 buf 的写入 */
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
...
}
// networking.c void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); ... /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; ... // 读请求数据 nread = connRead(c->conn, c->querybuf+qblen, readlen); ... } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } ... processInputBuffer(c); } /* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */ int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && // 是否开启多线程io !ProcessingEventsWhileBlocked && // client 不是这四种状态之一 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } } void processInputBuffer(client *c) { while(c->qb_pos < sdslen(c->querybuf)) { // 循环处理 queryBuf 中的数据,解析命令 ... /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. * 如果是 io 多路复用场景,只修改标识符,不会执行命令 */ if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(), * 完成 clients_pending_write 的插入和 buf 的写入 */ if (processCommandAndResetClient(c) == C_ERR) { return; } } } ... }
handleClientsWithPendingReads(Writes)UsingThreads
- 如果未开启 threaded i/o,直接返回
- 以 Round-Robin 的形式(取余)将 clients_pending_read 中的 client 分配给各线程,存储在对应线程的 client_list 中
- 主线程处理分配给自己的 client(io_threads_list[0]),然后等待所有 io 线程执行完各自的工作
- 主线程和 io 线程会再次调用 readQueryFromClient,但因为标识符的不同,会执行不同的逻辑
- 读取并解析客户端的数据,但不会执行命令
- 遍历 clients_pending_read 中的 client,串行地执行命令,将结果写入 buf,并将 client 加入到 clients_pending_write
int handleClientsWithPendingReadsUsingThreads(void) {
...
// 将 clients_pending_read 中的 client 分配给每个 io_threads_list
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
// rr 取余
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
...
// 主线程处理分配给自己的 client
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 循环等待所有的 io 线程结束工作
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
// 遍历
while(listLength(server.clients_pending_read)) {
...
processInputBuffer(c);
...
}
}
int handleClientsWithPendingReadsUsingThreads(void) {
...
// 将 clients_pending_read 中的 client 分配给每个 io_threads_list
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
// rr 取余
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
...
// 主线程处理分配给自己的 client
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 循环等待所有的 io 线程结束工作
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
// 遍历
while(listLength(server.clients_pending_read)) {
...
processInputBuffer(c);
...
}
}
int handleClientsWithPendingReadsUsingThreads(void) { ... // 将 clients_pending_read 中的 client 分配给每个 io_threads_list listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; // rr 取余 while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } ... // 主线程处理分配给自己的 client listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); // 循环等待所有的 io 线程结束工作 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 遍历 while(listLength(server.clients_pending_read)) { ... processInputBuffer(c); ... } }
- handleClientsWithPendingWritesUsingThreads
- 如果未开启 threaded i/o,执行原来的 handleClientsWithPendingWrites() 函数
- 整体逻辑和『读』差不多
- 遍历 clients_pending_write 中的 client,如果还有未写完的数据(超过缓冲区大小,正常情况下 io 线程就写完了),绑定写就绪回调函数 sendReplyToClient(),在遍历文件事件的时候会被调用
IOThreadMain
- 通过 getIOPendingCount() 获取自己的 io_threads_list 链表的长度
- 遍历 io_threads_list 链表,如果当前为写状态,执行 writeToClient();如果是读状态,执行 readQueryFromClient()
- 读:将请求信息保存到 client 的 queryBuf
- 写:将响应数据返回
void *IOThreadMain(void *myid) {
...
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
...
// 遍历自己的 io_threads_list
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
void *IOThreadMain(void *myid) {
...
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
...
// 遍历自己的 io_threads_list
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
void *IOThreadMain(void *myid) { ... while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } ... // 遍历自己的 io_threads_list listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); setIOPendingCount(id, 0); } }
参考文档
- 源码:Redis 5.0.13 & Redis 6.2.5
- https://www.cyhone.com/articles/analysis-of-redis-ae/ Redis 事件循环器 (AE) 实现剖析
- https://www.jianshu.com/p/6188becd2cea Redis命令处理过程分析
- https://jiekun.dev/posts/redis-tio-implementation/ Redis 6.0新Feature实现原理——Threaded I/O
- https://keys961.github.io/2020/04/16/源码阅读-Redis-6.0-多线程IO/ 源码阅读-Redis 6.0: 多线程I/O