/images/avatar.png

redis源码阅读-事务

redis的事务提供了一种将单个命令请求打包,然后一次性、按照顺序执行多个命令的机制,这种方式服务器会一次性把命令执行完,中间不会执行其他客户端的命令。不过redis的命令不支持错误命令执行后的回滚机制,也就是命令设计者要对命令的正确性负责,即使多个命令中存在部分错误的命令,剩余命令也会继续执行下去。

主要命令

命令 功能
MULTI 开始一个新的事务
DISCARD 放弃执行事务
EXEC 执行事务中的所有命令
WATCH 监视key,如果在exec之前被修改,则不执行事务
UNWATCH 取消对所有键的监视

事务的实现

一个事务分为三个阶段:

  • 事务开始
  • 命令入队
  • 事务执行

事务开始

使用multi开启事务,redis主要使用redisClient中flag成员记录状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13

void multiCommand(redisClient *c) {

    // 已经开启事务
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }

    // 标记事务开启
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
}

Redis源码阅读-AOF持久化

之前看了RDB持久化,功能是把数据库的数据全部使用一种特定格式进行存储。恢复时一个一个数据库键恢复。

AOF持久化与RDB持久化不同,AOF通过保存Redis服务器执行的写命令来记录数据库的状态。

AOF持久化的实现

AOF持久化功能分为命令追加、文件写入、文件同步三个步骤。

  • 命令追加:将命令数据写入aof_buf缓冲区
  • 文件写入:将aof_buff缓冲区数据写入系统IO缓冲区
  • 文件同步:将系统IO缓冲区的数据同步到磁盘文件

命令追加

AOF持久化打开时,服务器在执行一个写命令之后,会以协议的格式将执行的命令追加倒服务器状态aof_buf缓冲区尾部。

1
2
3
4
5
struct redisServer { 
  // ...
  sds aof_buf;  // aof缓冲区
  // ....
}

命令追加的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 将命令追加到AOF缓冲区中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    sds buf = sdsempty();
    robj *tmpargv[3];

    // 如果没切换到正确的数据库,则追加切换数据库命令
    if (dictid != server.aof_selected_db) {
        char seldb[64];

        snprintf(seldb,sizeof(seldb),"%d",dictid);
        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
            (unsigned long)strlen(seldb),seldb);
        server.aof_selected_db = dictid;
    }

    // 根据命令类型追加命令
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
        cmd->proc == expireatCommand) {
        // 追加过期键命令
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        // 追加setexCommand或者psetexCommand
        tmpargv[0] = createStringObject("SET",3);
        tmpargv[1] = argv[1];
        tmpargv[2] = argv[3];
        buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
        decrRefCount(tmpargv[0]);
        buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
    } else {
        // 追加其他一般修改数据库命令
        buf = catAppendOnlyGenericCommand(buf,argc,argv);
    }

    // 将格式化的命令字符串追加到aof_buf缓冲区中
    if (server.aof_state == REDIS_AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

    if (server.aof_child_pid != -1)
        // 如果在执行AOF重写,那么追加的新的AOF文件中
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

    sdsfree(buf);
}

// 根据参数,格式化命令
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
    char buf[32];
    int len, j;
    robj *o;

    buf[0] = '*';
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    buf[len++] = '\r';
    buf[len++] = '\n';
    dst = sdscatlen(dst,buf,len);

    // 遍历每一个参数,支架到AOF缓冲区中
    for (j = 0; j < argc; j++) {
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        buf[len++] = '\r';
        buf[len++] = '\n';
        dst = sdscatlen(dst,buf,len);
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        dst = sdscatlen(dst,"\r\n",2);
        decrRefCount(o);
    }
    return dst;
}

Redis源码阅读-订阅与发布

Redis的发布与订阅主要是实现客户端订阅一个频道或者模式,当某客户端向一个频道发送消息时,该频道或者匹配模式订阅者都能够收到消息。

频道的订阅与退订

Redis在服务器结构体中的pubsub_channels字典中保存了所有的频道订阅关系。pubsub_channels键为频道,值为订阅的客户端组成的链表。

客户端结构体的pubsub_channels保存了客户端订阅的所有频道,pubsub_channels的键为频道,值为空。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct redisServer {
  // ...
  dict *pubsub_channels;  // 保存所有的频道订阅关系
  // ...
}

struct redisClient {
  // ...
  dict *pubsub_channels; // 记录客户端订阅的频道
  // ...
}

订阅

源码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 订阅命令处理函数
void subscribeCommand(redisClient *c) {
    int j;

    // 遍历指令中的所有频道
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= REDIS_PUBSUB;
}

// 设置客户端c订阅频道channel
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将channels加倒c->c->pubsub_channels的字典里
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        // 找出服务器中的频道
        de = dictFind(server.pubsub_channels,channel);
        
        // 不存在就添加一个频道
        // 获取客户端链表
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // 添加到客户端链表尾部
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    // 回复客户端
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}

redis源码阅读-RDB持久化

Redis是一个内存数据库,数据存储在内存之中。有一个问题就是如果数据不存储到硬盘,那么在服务器进程退出之后,服务器中所有的数据库数据就会丢失。

Redis为了解决这个问题,提供了持久化功能,目前有两种一种是RDB持久化,一种是AOF持久化。

RDB持久化是生成一个RDB文件,该文件是一个经过压缩的二进制文件,通过该文件可以还原数据库的状态。

RDB文件的保存命令

Redis有两个命令可以生成RDB文件一个是SAVE,一个是BGSAVE。 SAVE命令调用saveCommand进行处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
void saveCommand(redisClient *c) {

    // 判断是否正在执行BGSAVE,是则退出
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
        return;
    }

    //调用rdbSave生成RDB文件
    if (rdbSave(server.rdb_filename) == REDIS_OK) {
        addReply(c,shared.ok);
    } else {
        addReply(c,shared.err);
    }
}

BGSAVE调用bgsaveCommand进行处理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
void bgsaveCommand(redisClient *c) {

    // 判断是否已经在执行BGSAVE
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
    } else if (server.aof_child_pid != -1) {
        
    // 判断是否在执行BGREWRIEAOF
        addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
    
    // 执行rdbSaveBackground 生成RDB文件
    } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) {
        addReplyStatus(c,"Background saving started");
    } else {
        addReply(c,shared.err);
    }
}

int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    // 如果BGSAVE正在执行直接返回
    if (server.rdb_child_pid != -1) return REDIS_ERR;

    // 获取dirty数据 执行时间
    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);

    // fork() 开始前时间
    start = ustime();

    // 调用fork,克隆该进程
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");

        // 执行保存操作
        retval = rdbSave(filename);

        // 打印 copy-on-write 时使用的内存数
        if (retval == REDIS_OK) {
            size_t private_dirty = zmalloc_get_private_dirty();

            if (private_dirty) {
                redisLog(REDIS_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }
        }
        // 向父进程发送信号
        exitFromChild((retval == REDIS_OK) ? 0 : 1);
    } else {
        /* Parent */

        // 计算 fork() 执行的时间
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        
        // 执行fork()错误信息
        if (childpid == -1) {
            server.lastbgsave_status = REDIS_ERR;
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
        
        // 记录数据库开始BGSAVE时间
        server.rdb_save_time_start = time(NULL);

        // 子进程ID 类型
        server.rdb_child_pid = childpid;
        server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK;

        // 关闭自动Rehash
        updateDictResizePolicy();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

redis源码阅读-服务器

服务器

redis运行存在一个redis服务器结构,一个服务器中保存着n个数据库。

dbnum由服务器配置决定,默认值为16。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
struct redisServer {
    redisDb *db;  // Redis的数据库
    // ...
    int dbnum;  // 表明数据库的数量
    // ...
}

typedef struct redisDb {
    dict *dict;                 /* 数据库键字典 */
    dict *expires;              /* 键过期时间字典 */
    dict *blocking_keys;        /* 处于阻塞状态的键 */
    dict *ready_keys;           /* 可以解除阻塞的键 */
    dict *watched_keys;         /* 被watch的键 */
    struct evictionPoolEntry *eviction_pool;    /* Eviction pool of keys */
    int id;                     /* 数据库编号 */
    long long avg_ttl;          /* 数据库键的平均时间*/
} redisDb;

切换数据库

每个redis客户端都有自己的目标数据库,当客户端执行数据库读写命令,目标数据库是这些命令的操作对象。

redis提供select命令来切换数据库,redisClient

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
typedef struct redisClient { 
    int fd;  // 套接字描述符
    redisDb *db; // 当前正在使用的数据库
    // ...
}

int selectDb(redisClient *c, int id) {

    // 校验id
    if (id < 0 || id >= server.dbnum)
        return REDIS_ERR;

    // 切换客户端数据库
    c->db = &server.db[id];

    return REDIS_OK;
}

数据库键空间

Redis数据库存放的数据都是以键值对形式存在,redisDB结构的dict字典保存数据库中的所有键值对,这个字典被成为键空间。

1
2
3
4
5

typedef struct redisDb {
    dict *dict;                 /* 数据库键字典 */
    // ...
} redisDb;

键空间的键就是数据库的键,每个键都是一个字符串对象。

键空间的值就是数据库的值,每个值可以是字符串对象、列表对象、哈希表对象、集合对象、有序集合对象中任意一种。

键空间的操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);// 移除键的过期时间
void propagateExpire(redisDb *db, robj *key); // 
int expireIfNeeded(redisDb *db, robj *key); // 检查是否过期,是则删除键
long long getExpire(redisDb *db, robj *key); // 获取过期时间
void setExpire(redisDb *db, robj *key, long long when); // 设定过期时间
robj *lookupKey(redisDb *db, robj *key); // 从db中取出键key的值
robj *lookupKeyRead(redisDb *db, robj *key); // 从db中取出键key的值
robj *lookupKeyWrite(redisDb *db, robj *key); // 从db中取出键key的值
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply); // 从db中取出键key的值
robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply);// 从db中取出键key的值
void dbAdd(redisDb *db, robj *key, robj *val);// 尝试将键值对key\val添加到数据库中
void dbOverwrite(redisDb *db, robj *key, robj *val); // 重写指定键的值,键不存在的话终止
void setKey(redisDb *db, robj *key, robj *val);// 设定指定键的值,不管存不存在
int dbExists(redisDb *db, robj *key); // 判断指定键是否存在  
robj *dbRandomKey(redisDb *db); // 随机从数据库中取出一个键,并以字符串对象的方式返回这个键
int dbDelete(redisDb *db, robj *key); // 从数据库中删除给定的键
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
long long emptyDb(void(callback)(void*));// 情况所有数据
int selectDb(redisClient *c, int id); // 切换db
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
int verifyClusterConfigWithData(void);
void scanGenericCommand(redisClient *c, robj *o, unsigned long cursor);
int parseScanCursorOrReply(redisClient *c, robj *o, unsigned long *cursor);

redis源码阅读-对象

之前阅读了redis用到的主要的数据结构,这些数据结构是redis对象基础。redis在这些基础数据结构之上创建了一个对象系统,这个系统包含字符串对象、列表对象、哈希对象、集合对象和有序集合对象五种类型的对象。

redis执行命令前,先判断命令是否能够执行给定命令。根据不同场合选择使用不同的数据结构。

对象的类型与编码

redis使用对象来表示数据库中的键值,创建一个键值对时,会创建至少两个对象,一个对象用作键值对的键,一个对象用作键值对的值。

对象的结构体

1
2
3
4
5
6
7
typedef struct redisObject {
    unsigned type:4; // 类型
    unsigned encoding:4; // 编码
    unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */
    int refcount; // 引用计数
    void *ptr; // 值
} robj;

redis结构体使用位段结构节省空间

类型type

记录redis对象类型,五种类型

1
2
3
4
5
#define REDIS_STRING 0 // 字符串对象
#define REDIS_LIST 1 // 列表对象
#define REDIS_SET 2 // 哈希对象
#define REDIS_ZSET 3 // 集合对象
#define REDIS_HASH 4 // 有序集合对象

编码encoding

记录redis对象的编码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// 对象编码
#define REDIS_ENCODING_RAW 0     /* 简单动态字符串 */
#define REDIS_ENCODING_INT 1     /* long类型的整数 */
#define REDIS_ENCODING_HT 2      /* 字典 */
#define REDIS_ENCODING_ZIPMAP 3  /* zipmap 3.2.5不再使用 */
#define REDIS_ENCODING_LINKEDLIST 4 /* 双端队列 */
#define REDIS_ENCODING_ZIPLIST 5 /* 压缩列表 */
#define REDIS_ENCODING_INTSET 6  /* 整数集合 */
#define REDIS_ENCODING_SKIPLIST 7  /* 跳跃表 */
#define REDIS_ENCODING_EMBSTR 8  /* EMBSTR编码的简单字符串 */

每种类型对应至少两种不同的编码。

对象类型 编码方式
REDIS_STRING REDIS_ENCODING_RAW ,REDIS_ENCODING_INT ,REDIS_ENCODING_EMBSTR
REDIS_LIST REDIS_ENCODING_LINKEDLIST ,REDIS_ENCODING_ZIPLIST
REDIS_SET REDIS_ENCODING_INTSET ,REDIS_ENCODING_HT
REDIS_ZSET REDIS_ENCODING_ZIPLIST ,REDIS_ENCODING_SKIPLIST
REDIS_HASH REDIS_ENCODING_ZIPLIST ,REDIS_ENCODING_HT

访问时间

表示对象的最后一次访问时间。

引用计数

常见的管理方式,引用计数为0时回收。