Redis 6.0 新特性之 Threaded I/O

/ 0评 / 0

Redis can now optionally use threads to handle I/O, allowing to serve 2 times as much operations per second in a single instance when pipelining cannot be used.

目前对于单线程 Redis 来说,性能瓶颈主要在于网络的 IO 消耗,优化主要有两个方向:

  1. 提高网络 IO 性能,典型的实现像使用 DPDK 来替代内核网络栈的方式
  2. 使用多线程充分利用多核,典型的实现像 Memcached

协议栈优化的这种方式跟 Redis 关系不大,支持多线程是一种最有效最便捷的操作方式。所以 Redis 从 6.0 版本开始引入了 Threaded I/O,目的是为了提升执行命令前后的网络 I/O 性能。

但跟 Memcached 这种从 IO 处理到数据访问都是多线程的实现模式有些差异。Redis 的多线程部分只是用来处理网络数据的读写和协议解析,执行命令仍然是单线程。之所以这么设计是不想因为多线程而变得复杂,需要去控制 key、lua、事务,LPUSH/LPOP 等等的并发问题。

事件循环处理器 —— AE

Redis 使用了一个称为“A simple event-driven programming library”的自制异步事件库(简称“AE”),是 Redis 处理流程的核心。下面会对大体流程做一个简略的介绍,类似于背景,方便理解后面的 Threaded I/O 实现。

initServer

// server.c
int main(int argc, char **argv) {
...
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */
...
initServer();
...
aeMain(server.el);
...
}
void initServer(void) {
...
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets.
* 内部调用 aeCreateFileEvent() */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
...
// 设置 beforesleep 和 aftersleep 回调函数
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
...
}
// server.c int main(int argc, char **argv) { ... ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ ... initServer(); ... aeMain(server.el); ... } void initServer(void) { ... /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. * 内部调用 aeCreateFileEvent() */ if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) { serverPanic("Unrecoverable error creating TCP socket accept handler."); } ... // 设置 beforesleep 和 aftersleep 回调函数 aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); ... }
// server.c
int main(int argc, char **argv) {
    ...
    ACLInit(); /* The ACL subsystem must be initialized ASAP because the
                  basic networking code and client creation depends on it. */
    ...
    initServer();
    ...
    aeMain(server.el);
    ...
}

void initServer(void) {
    ...
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. 
     * 内部调用 aeCreateFileEvent() */
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    
    ...
    
    // 设置 beforesleep 和 aftersleep 回调函数
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);
    ...
}

acceptTcpHandler

// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
...
while(max--) {
// 接受客户端的请求(tcp链接),返回fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
// 处理客户端的请求
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
...
return;
}
...
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
...
// 内部调用 aeCreateFileEvent()
connSetReadHandler(conn, readQueryFromClient);
}
...
}
// networking.c void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ... while(max--) { // 接受客户端的请求(tcp链接),返回fd cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); ... // 处理客户端的请求 acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } } static void acceptCommonHandler(connection *conn, int flags, char *ip) { ... /* Create connection and client */ if ((c = createClient(conn)) == NULL) { ... return; } ... } client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); if (conn) { ... // 内部调用 aeCreateFileEvent() connSetReadHandler(conn, readQueryFromClient); } ... }
// networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...
    while(max--) {
        // 接受客户端的请求(tcp链接),返回fd
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        ...
        // 处理客户端的请求
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    ...
    /* Create connection and client */
    if ((c = createClient(conn)) == NULL) {
        ...
        return;
    }
    ...
}

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        ...
        // 内部调用 aeCreateFileEvent()
        connSetReadHandler(conn, readQueryFromClient);
    }
    ...
}

aeMain

// ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
...
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
...
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires.
* IO 多路复用,等待文件事件就绪 */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 遍历文件事件
for (j = 0; j < numevents; j++) {
...
// 反转标识符
int invert = fe->mask & AE_BARRIER;
// 如果invert为false,先读
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 写
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
// 如果invert为true,后读
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
// ae.c void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } int aeProcessEvents(aeEventLoop *eventLoop, int flags) { ... if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { ... if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. * IO 多路复用,等待文件事件就绪 */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); // 遍历文件事件 for (j = 0; j < numevents; j++) { ... // 反转标识符 int invert = fe->mask & AE_BARRIER; // 如果invert为false,先读 if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } // 写 if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } // 如果invert为true,后读 if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } // 处理时间事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
// ae.c
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ...
    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        ...
        
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. 
         * IO 多路复用,等待文件事件就绪 */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 遍历文件事件
        for (j = 0; j < numevents; j++) {
            ...
            // 反转标识符
            int invert = fe->mask & AE_BARRIER;
            // 如果invert为false,先读
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }
            // 写
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            // 如果invert为true,后读
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            
            processed++;
        }
    }
    // 处理时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

旧逻辑

void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
...
}
int handleClientsWithPendingWrites(void) {
...
// 遍历 clients_pending_write
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
...
// 先执行 writeToClient()
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 还有数据要写(例如缓冲区已经写满了)
if (clientHasPendingReplies(c)) {
...
// 绑定 fileEvent 为 sendReplyToClient
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(); ... } int handleClientsWithPendingWrites(void) { ... // 遍历 clients_pending_write listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { ... // 先执行 writeToClient() if (writeToClient(c->fd,c,0) == C_ERR) continue; // 还有数据要写(例如缓冲区已经写满了) if (clientHasPendingReplies(c)) { ... // 绑定 fileEvent 为 sendReplyToClient if (aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); } } } return processed; }
void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
    ...
}

int handleClientsWithPendingWrites(void) {
    ...
    
    // 遍历 clients_pending_write
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        ...
        
        // 先执行 writeToClient()
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // 还有数据要写(例如缓冲区已经写满了)
        if (clientHasPendingReplies(c)) {
            ...
            // 绑定 fileEvent 为 sendReplyToClient
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR) {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
...
nread = read(fd, c->querybuf+qblen, readlen);
...
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
...
// 内部执行 processCommand()
processInputBufferAndReplicate(c);
}
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { ... nread = read(fd, c->querybuf+qblen, readlen); ... } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } ... // 内部执行 processCommand() processInputBufferAndReplicate(c); }
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    ...
    nread = read(fd, c->querybuf+qblen, readlen);
    ...
    } else if (c->flags & CLIENT_MASTER) {
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    ...
    // 内部执行 processCommand()
    processInputBufferAndReplicate(c);
}
int processCommand(client *c) {
// 一系列复杂的处理过程
...
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
...
} else {
call(c,CMD_CALL_FULL);
...
}
return C_OK;
}
int processCommand(client *c) { // 一系列复杂的处理过程 ... /* Exec the command */ if (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { ... } else { call(c,CMD_CALL_FULL); ... } return C_OK; }
int processCommand(client *c) {
    // 一系列复杂的处理过程
    ...
    
    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        ...
    } else {
        call(c,CMD_CALL_FULL);
        ...
    }
    return C_OK;
}
void addReply(client *c, robj *obj) {
/* 内部调用 clientInstallWriteHandler(),将 client 加入到
clients_pending_write */
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
// 小对象写 buf
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
// 大对象写 reply
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
...
} else {
...
}
}
void addReply(client *c, robj *obj) { /* 内部调用 clientInstallWriteHandler(),将 client 加入到 clients_pending_write */ if (prepareClientToWrite(c) != C_OK) return; if (sdsEncodedObject(obj)) { // 小对象写 buf if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 大对象写 reply _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { ... } else { ... } }
void addReply(client *c, robj *obj) {
    /* 内部调用 clientInstallWriteHandler(),将 client 加入到
     clients_pending_write */
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        // 小对象写 buf
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            // 大对象写 reply
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        ...
    } else {
        ...
    }
}

新逻辑

void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
...
}
void beforeSleep(struct aeEventLoop *eventLoop) { ... /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); ... /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); ... }
void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    /* We should handle pending reads clients ASAP after event loop. */
    handleClientsWithPendingReadsUsingThreads();
    ...
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
    ...
}

源码分析

有了上面的整体印象,接下来会对 Threaded I/O 相关的源码进行分析

readQueryFromClient

// networking.c
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
...
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
...
// 读请求数据
nread = connRead(c->conn, c->querybuf+qblen, readlen);
...
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
...
processInputBuffer(c);
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads && // 是否开启多线程io
!ProcessingEventsWhileBlocked &&
// client 不是这四种状态之一
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
// 循环处理 queryBuf 中的数据,解析命令
...
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command.
* 如果是 io 多路复用场景,只修改标识符,不会执行命令 */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(),
* 完成 clients_pending_write 的插入和 buf 的写入 */
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
...
}
// networking.c void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); ... /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return; ... // 读请求数据 nread = connRead(c->conn, c->querybuf+qblen, readlen); ... } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); } ... processInputBuffer(c); } /* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */ int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && // 是否开启多线程io !ProcessingEventsWhileBlocked && // client 不是这四种状态之一 !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } } void processInputBuffer(client *c) { while(c->qb_pos < sdslen(c->querybuf)) { // 循环处理 queryBuf 中的数据,解析命令 ... /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. * 如果是 io 多路复用场景,只修改标识符,不会执行命令 */ if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; } /* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(), * 完成 clients_pending_write 的插入和 buf 的写入 */ if (processCommandAndResetClient(c) == C_ERR) { return; } } } ... }
// networking.c
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    ...
    
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
    ...
    
    // 读请求数据
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    ...
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    ...
    
    processInputBuffer(c);
}

/* Return 1 if we want to handle the client read later using threaded I/O.
 * This is called by the readable handler of the event loop.
 * As a side effect of calling this function the client is put in the
 * pending read clients and flagged as such. */
int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads && // 是否开启多线程io
        !ProcessingEventsWhileBlocked &&
        // client 不是这四种状态之一
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) 
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
        // 循环处理 queryBuf 中的数据,解析命令
        ...
        
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. 
             * 如果是 io 多路复用场景,只修改标识符,不会执行命令 */
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            /* 内部调用 processCommand() 执行命令,命令执行结束后调用 addReply(),
             * 完成 clients_pending_write 的插入和 buf 的写入 */
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }
    ...
}

handleClientsWithPendingReads(Writes)UsingThreads

int handleClientsWithPendingReadsUsingThreads(void) {
...
// 将 clients_pending_read 中的 client 分配给每个 io_threads_list
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
// rr 取余
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
...
// 主线程处理分配给自己的 client
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
// 循环等待所有的 io 线程结束工作
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
// 遍历
while(listLength(server.clients_pending_read)) {
...
processInputBuffer(c);
...
}
}
int handleClientsWithPendingReadsUsingThreads(void) { ... // 将 clients_pending_read 中的 client 分配给每个 io_threads_list listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0; // rr 取余 while((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } ... // 主线程处理分配给自己的 client listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0]); // 循环等待所有的 io 线程结束工作 while(1) { unsigned long pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += getIOPendingCount(j); if (pending == 0) break; } // 遍历 while(listLength(server.clients_pending_read)) { ... processInputBuffer(c); ... } }
int handleClientsWithPendingReadsUsingThreads(void) {
    ...
    
    // 将 clients_pending_read 中的 client 分配给每个 io_threads_list
    listIter li;
    listNode *ln;
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    // rr 取余
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }
    ...
    
    // 主线程处理分配给自己的 client
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);
    
    // 循环等待所有的 io 线程结束工作
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }
    
    // 遍历
    while(listLength(server.clients_pending_read)) {
        ...
        processInputBuffer(c);
        ...
    }
}

IOThreadMain

void *IOThreadMain(void *myid) {
...
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
...
// 遍历自己的 io_threads_list
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
void *IOThreadMain(void *myid) { ... while(1) { /* Wait for start */ for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } ... // 遍历自己的 io_threads_list listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); setIOPendingCount(id, 0); } }
void *IOThreadMain(void *myid) {
    ...
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }

        ...

        // 遍历自己的 io_threads_list
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
    }
}

参考文档

发表评论

邮箱地址不会被公开。 必填项已用*标注