Redis 发布订阅 - litter-fish/ReadSource GitHub Wiki
发布订阅由 SUBSCRIBE、PUBLISH、PSUBSCRIBE等命令组成。
事务
事务开始
multi
void multiCommand(client *c) {
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested");
return;
}
// 打开事务标识
c->flags |= CLIENT_MULTI;
// 回复OK消息
addReply(c,shared.ok);
}
命令入队
事务队列
typedef struct multiCmd {
// 参数
robj **argv;
// 参数个数
int argc;
// 命令指针
struct redisCommand *cmd;
} multiCmd;
typedef struct multiState {
// 事务队列,FIFO队列
multiCmd *commands; /* Array of MULTI commands */
// 已入对的命令
int count; /* Total number of MULTI commands */
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
每个客户端都有自己的事务状态
typedef struct client {
// ......
// 事务状态
multiState mstate; /* MULTI/EXEC state */
// ......
} client;
事务执行
当一个事务状态的客户端向服务器发送EXEC命令时,这个命令将被立即执行。
Watch命令
使用watch命令监视数据库键
typedef struct redisDb {
// ......
// 正在被watch监视的键
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
// ......
} redisDb;
监视机制的触发
所有调用数据库进行修改的命令,都会调用multi.c/touchWatchedKey
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
// 获取数据中监视该键的所有客户端
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
// 遍历每个客户端
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 打开每个客户端的标识
c->flags |= CLIENT_DIRTY_CAS;
}
}
判断事务是否安全
当服务器接收到一个exec命令时,服务器会根据这个客户端是否打开CLIENT_DIRTY_CAS标识,来决定是否执行命令。
事务的CAID特性
-
原子性
-
一致性 1)入队错误,如果命令在入队过程中,出现命令不存在、或者命令格式不正确等,将拒绝执行事务 2)执行错误,事务在执行过程中出现错误,会忽略错误命令,继续执行其他命令。 3)服务器停机,
-
隔离性
-
持久性