Sub Pubscribe - wyc902/redis GitHub Wiki

redis的订阅和发布功能可以用在服务器向客户端发消息,多客户端之间同步消息。应用范围比较广。

redis代码中和订阅以及发布相关的数据结构有

struct redisServer {
    ....
    dict *pubsub_channels; 
    /* 字典,键值对,key是channel名,value是list,list的每个node的值是client
       表示订阅了该channel的所有客户端 */
    list *pubsub_patterns; /* 订阅模式list,每个node的值是pubsubPattern结构 */
}

typedef struct pubsubPattern {
    client *client;  /* 相应的redis的客户端 */
    robj *pattern;   
   /* 订阅的模式, 比如 client9订阅了模式 sports.[12] 那么往channel sports.1 和 sports.2发布消息,client9都会收到 */
} pubsubPattern;

了解了pubsub_channels结构,就很容易了解subscribe和publish命令的实现 subscribe命令:

int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* 主逻辑,添加client到相应的server的相应channel的列表,以及把channel添加到client的订阅列表 */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* 以上的逻辑已完成channel添加到client的订阅列表 */
        /* 下面的代码会添加client到server的相应channel列表 */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            /* client是第一个订阅该channel的client创建相应的键值对 */
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        /* 添加client到订阅channel的client list 的末尾 */
        listAddNodeTail(clients,c);
    }
    .....
    return retval;
}

publish命令:

int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* 找到订阅channel的客户端列表,逐一发送 */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            /* 通知客户端的操作 */
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* 遍历匹配模式列表,通知订阅了匹配模式的客户端 */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            /* 匹配上了 */
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}