Redis 底层数据结构 - litter-fish/ReadSource GitHub Wiki
一个可以被动态修改的字符串字面量
struct sdshdr {
// buf数组已经存储的字符长度
unsigned int len;
// buf数组未使用的字符长度
unsigned int free;
// 字节数组,用于保存字符串
char buf[];
};
实例:
-
SDS中记录了字符串长度,因此复杂度是一个常数
-
SDS在进行修改时,会先判定SDS空间是否满足修改所需条件,如果不满足,SDS会先进行扩容,在进行修改,因此SDS不会出现内容溢出
-
SDS内存重分配 空间预分配策略 如果对SDS进行修改之后,SDS的长度将小于1M,那么程序将分配和len属性大小的使用空间,这时len的长度等于free 初始情况: s = "RE";
执行,sdscat(s, "DIS")后,len=5、free=5,如下图所示:
如果再次执行,sdscat(s, "TEST"),free足够加入字符串,不需要在进行内存重分配操作,如:
如果SDS修改后长度大于1M,程序会分配1M未使用空间
惰性空间释放 优化SDS的字符串缩短操作:对于不用的内存空间,SDS并不会真正释放其内存空间,而是通过free属性记录数组的可以使用空间,大大减少内存空间的重分配
- SDS使用二进制的方式处理buf中的数据,并通过使用len属性来判定字符串是否已经结束
- 兼容部分c string API
sdsclear:清除SDS中保存的字符串,使用惰性空间释放,空间复杂度为1:O(1) sdstrim:移除SDS中出现指定字符的字符,O(N * N)
每个节点的结构
typedef struct listNode {
// 前置节点
struct listNode *prev;
// 后置节点
struct listNode *next;
// 节点的值
void *value;
} listNode;
链表的结构
typedef struct list {
// 表头
listNode *head;
// 表尾
listNode *tail;
// 节点值复制函数
void *(*dup)(void *ptr);
// 节点值释放函数
void (*free)(void *ptr);
// 节点值对比函数
int (*match)(void *ptr, void *key);
// 链表包含节点个数
unsigned long len;
} list;
结构实例:
底层使用哈希表实现,一个哈希表包含多个哈希节点,每个哈希节点保存了键值对
哈希表节点
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下一个哈希表节点,用于解决哈希冲突
struct dictEntry *next;
} dictEntry;
哈希表
typedef struct dictht {
// 哈希表数组
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小掩码,用于计算索引值,等于size-1
unsigned long sizemask;
// 该哈希表已有节点的个数,
unsigned long used;
} dictht;
字典
typedef struct dict {
//
dictType *type;
void *privdata;
// 一般情况下只会使用ht[0]的哈希表,ht[1]哈希表之后在对ht[0]进行rehash的时候使用
dictht ht[2];
// 记录rehash进度
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int iterators; /* number of iterators currently running */
} dict;
实例:
执行步骤:
- 为ht[1]分配空间, 如果执行的是一个扩展操作,则ht[1]的大小等于ht[0].used * 2 如果执行的是收缩操作,则ht[1]的大小为第一个大于等于ht[0].used的2的n次方
- 将ht[0]上的所有键值对rehash到ht[1]上:设置索引计数器变量rehashidx=0,表示正在进行rehash操作,在进行rehash期间,每次执行添加、删除、查找和更新操作时,会顺带将ht[0]中下标为rehashidx的键值迁移到ht[1]中,当rehash完成,将计数器增加1
- 当ht[0]的所有键都重新迁移到ht[1]上,释放ht[0],将ht[1]设置为ht[0],并在ht[1]上新创建一个空白哈希表,将rehashidx设置为-1
哈希表执行扩容的条件
-
服务器目前没有在执行bgsave或bgrewriteaof命令,且负载因子大于等于1 负载因子的计算公式: 负载因子 = 哈希表已经保存的节点数量 / 哈希表大小,load_factor = ht[0].used / ht[0].size
-
服务器目前正在执行bgsave或bgrewriteaof命令,且负载因子大于等于5
哈希表执行收缩条件 负载因子小于0.1时,自动进行收缩操作
跳跃表节点
typedef struct zskiplistNode {
// 成员对象
robj *obj;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned int span;
} level[];
} zskiplistNode;
-
层 每次生成一个新的跳跃表节点,程序都会按照幂次定律随机生成一个介于1到32之间的值
-
前进指针(level[i].forward) 从表头向表尾访问
-
跨度(level[i].span) 记录两个节点之间的距离
-
后退指针(backward) 从表尾向表头访问
-
成员对象和分值 分值是一个double类型的浮点数,跳跃表中的所有节点都是按照分值从小到大进行排序 成员对象是一个指向字符串对象
跳跃表
typedef struct zskiplist {
// 表头节点、表尾节点
struct zskiplistNode *header, *tail;
// 表中节点数量
unsigned long length;
// 表中节点层数最大的
int level;
} zskiplist;
如下图所示跳跃表:
当一个集合只包含整数值元素,并且集合元素不多
typedef struct intset {
// 集合编码方式
uint32_t encoding;
// 集合元素个数
uint32_t length;
// 集合内容
int8_t contents[];
} intset;
新元素的类型比集合中的所有元素类型都大时,需要先进行升级,接着插入元素
升级步骤
-
根据新元素扩展底层存储空间,并为新元素分配空间
-
将元素转换成新类型,并放入到正确位置
-
将新元素加入集合
升级的好处
- 提升灵活性,可以将不同数据类型的数据放入同一个数据结构中,而不用担心发送错误
- 节约内存,只有集合真正有需要升级时才进行升级
列表和哈希底层的实现之一,当列表键只包含少量列表项,且列表项要么是小整数数值,或长度比较短的字符串。对于哈希表,如果键值对满足前面条件也使用压缩列表存储
目的是为了节约内存,使用连续内存进行存储。
typedef struct redisObject {
// 类型
unsigned type:4;
// 编码
unsigned encoding:4;
// 对象最后一次被访问的时间
unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
// 对象被引用次数,使用引用计数器算法进行内存回收
int refcount;
// 指向底层实现数据结构的指针
void *ptr;
} robj;
-
类型 REDIS_STRING:字符串对象 REDIS_LIST:列表对象 REDIS_HASH:哈希对象 REDIS_SET:集合对象 REDIS_ZSET:有序集合对象
-
编码 REDIS_ENCODING_INT:long类型的整数 REDIS_ENCODING_EMBSETR:embstr编码的简单动态字符串 REDIS_ENCODING_RAW:简单动态字符串 REDIS_ENCODING_HT:字典 REDIS_ENCODING_LINKEDLIST:双端列表 REDIS_ENCODING_ZIPLIST:压缩列表 REDIS_ENCODING_INTSET:整数集合 REDIS_ENCODEING_SKIPLIST:跳跃表和字典
字符串编码可以使用int、raw、embstr 如果一个字符串可以使用long来表示的整数指,则编码使用int 如果一个字符串对象保存的是一个长度大于32字节的字符串值,则使用raw编码来保存一个SDS,调用两次内存分配 embstr用于保存短字符串,与raw区别:调用一次内存分配策略,申请一个连续的内存空间
raw与embstr存储结构
列表对象可以使用ziplist和linkedlist存储。
-
当列表中的元素个数小于512个且所有字符串元素的长度都小于64字节时使用ziplist存储
-
否则使用linkedlist存储
哈希对象可以使用ziplist和hashtable存储
-
当键值对的个数小于512个且所有键值对的字符串长度都小于64字节时使用ziplist存储 同一个键值对总是挨在一起,键在前值在后,先添加的在表头,后添加的在表尾
-
否则使用 hashtable 存储
集合对象可以使用intset和hashtable存储
-
如果集合对象中保存的元素都是整数值且数量小于512个时将使用intset存储
-
否则使用 hashtable 存储
有序集合对象可以使用ziplist和skiplist存储
-
如果有序集合的元素小于128个且所有的元素长度小于64字节时使用ziplist存储, 使用两个挨着的连续压缩列表节点保存,第一个节点保存元素成员,第二个节点保存分值
-
否则使用跳跃表进行存储 首先按照分值从小到大的顺序将元素存储在跳跃表中,同时会创建一个从成员到分值的哈希映射dict。 虽然同时存在跳跃表和哈希表,不过其中StringObject对象会通过指针进行共享,以节省内存。 为什么需要同时使用跳跃表和哈希表进行存储? 为了提升性能。举例。 跳跃表使用有序集合,因此能够保留范围型操作的优点 而使用哈希能够以O(1)的复杂度进行查找操作。
AOF还原原理
- 创建一个没有网络连接的伪客户端
- 从AOF文件中读入内容
- 使用伪客户端执行命令
- 循环处理2、3步骤
AOF重写原理
-
主线程fork一个子进程,有子进程负责重写
-
子进程创建一个临时AOF文件
-
子进程循环读取所有数据库的所有键值对,并保存到临时AOF文件中
-
子进程在重新过程中,主进程继续处理命令,并将修改命令写入AOF缓存和AOF重写缓存中
-
当子进程完成数据重写,将向主进程发送一个信号,主进程接收到这个信号后。
-
主进程会将AOF重写缓存中的命令重写到临时AOF文件中。
-
临时文件进行改名,原子替换就的AOF文件
文件事件处理
IO多路复用程序总是将所有产生套接字存放到一个队列中,然后通过这个队列,以有序、同步、每次一个套接字的方式向文件事件分派器传送套接字。
IO多路复用组件
客户端和服务端是一个一对多的关系
struct redisServer {
......
// 服务器维护一个与之相连的客户端列表
list *clients; /* List of active clients */
.....
};
typedef struct redisClient {
uint64_t id; /* Client incremental unique ID. */
// 套接字描述符,-1表示一个伪客户端,AOF还原数据时使用
int fd;
redisDb *db;
int dictid;
// 客户端名称
robj *name; /* As set by CLIENT SETNAME */
// 客户端的输入缓冲区,保存客户端发送的命令请求
sds querybuf;
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
// 服务器对接收到的命令进行解析,将解析的命令参数个数和命令参数保存到argc和argv属性中
int argc;
robj **argv;
// 服务器根据argv[0]中的值去命令表中查找对应的命令函数。
struct redisCommand *cmd, *lastcmd;
int reqtype;
int multibulklen; /* number of multi bulk arguments left to read */
long bulklen; /* length of bulk argument in multi bulk request */
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
// 记录客户端创建时间
time_t ctime; /* Client creation time */
// 客户端与服务端最后一次互动时间,可用于计算客户端的空转时间
time_t lastinteraction; /* time of the last interaction, used for timeout */
// 输出缓冲区第一次到达软性限制时间
time_t obuf_soft_limit_reached_time;
// 标示客户端的角色、以及目前客户端所处状态
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
// 表示客户端是否通过认证,0 - 未通过认证,除AUTH命令其他命令将被拒绝,
int authenticated; /* when requirepass is non-NULL */
int replstate; /* replication state if this is a slave */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* replication DB file descriptor */
off_t repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
long long reploff; /* replication offset if this is our master */
long long repl_ack_off; /* replication ack offset, if this is a slave */
long long repl_ack_time;/* replication ack time, if this is a slave */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
/* Response buffer */
// 命令回复缓冲区,一个固定大小的内存,用于存储长度比较小的回复,REDIS_REPLY_CHUNK_BYTES=16kb
int bufpos; // 记录buf目前使用的字节数量
char buf[REDIS_REPLY_CHUNK_BYTES];
// 一个可变长的回复缓冲区,通过使用链表来连接多个字符串,服务器可以为客户端保存比较长的回复
list *reply;
} redisClient;
-
客户端发送命令,客户端将命令转换成协议格式,然后通过套接字将协议格式的命令发送给服务器
-
读取命令请求,调用命令请求处理器,读取协议格式命令,保存到输入缓冲区,然后解析命令,提取命令参数个数及参数,接着调用命令执行器执行命令
-
根据参数argv[0]到命令表中查找参数指定的命令,并将查找到的命令保存到客户端状态的cmd属性中
struct redisCommand {
// 命令名字
char *name;
// 命令的实现函数
redisCommandProc *proc;
// 命令参数个数,用于检测命令请求的格式是否正确
int arity;
// 命令的属性标识,命令是写命令、读命令
char *sflags; /* Flags as string representation, one char per flag. */
// 对sflags分析得到的二进制标识
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
// 服务器执行这个命令所耗费的总时长,服务器总共执行了多少次这个命令
long long microseconds, calls;
};
-
执行预备操作,检查cmd是否为空、参数个数是否正确、是否身份认证等
-
执行命令,将执行的结果保存到回复缓冲区中
-
执行后续操作,是否开启慢查询并且满足慢查询条件、是否需要进行AOF等等。
-
将命令结果发送给客户端
-
客户端接收并打印结果
- 初始化服务器状态结构
void initServerConfig(void) {
int j;
// 设置服务器的运行ID
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
// 设置默认配置文件路径
server.configfile = NULL;
server.hz = REDIS_DEFAULT_HZ;
// 为运行ID加上结尾字符
server.runid[REDIS_RUN_ID_SIZE] = '\0';
// 设置服务器运行架构
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
// 设置默认端口
server.port = REDIS_SERVERPORT;
// 命令表初始化
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN;
/* Latency monitor */
server.latency_monitor_threshold = REDIS_DEFAULT_LATENCY_MONITOR_THRESHOLD;
/* Debugging */
server.assert_failed = "<no assertion failed>";
server.assert_file = "<no file>";
server.assert_line = 0;
server.bug_report_start = 0;
server.watchdog_period = 0;
}
-
载入自定义配置设置
-
初始化数据结构
-
还原数据库状态
void loadDataFromDisk(void) {
long long start = ustime();
// 如果开启aof,则使用aof进行还原
if (server.aof_state == REDIS_AOF_ON) {
if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
// 没有开启AOF则使用RDB进行还原
if (rdbLoad(server.rdb_filename) == REDIS_OK) {
redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
} else if (errno != ENOENT) {
redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
}
- 执行时间循环