Lab‐2B - NUISTGY/MIT-6.824-Raft GitHub Wiki
论文笔记
个人认为Lab-2b的日志复制部分细节很多,故在此详细记录
Start函数详解
调用者
服务端使用Raft的组件(比如kv服务器)会调用这个函数(实验中,由config.go和test_test.go调用进行测试)。也就是客户端的请求会转发到Leader节点,由Leader节点的Start函数处理。
处理逻辑
- Start函数会检查自己是否是Leader。如果不是Leader,直接返回false。
- 如果是Leader,会为这个命令生成一个新的日志条目,然后启动日志复制流程,将日志条目复制给其他Follower。
返回值
Start函数会返回3个值:
- 命令的索引值index,如果命令被提交,表示在日志中的位置
- 当前的任期号term
- 一个布尔值,表示是否认为自己是Leader
在Raft中的作用
- Start函数实现了Leader接收客户端请求的入口逻辑。
- 它将客户端的命令转换为日志条目,作为一个新的日志复制过程的开始。
- Start函数启动了一次新的日志复制,将日志条目复制给其他服务器。
- 它实现了Raft算法中Leader的核心功能之一。
// Start
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
//只能往leader上附加日志,其他角色不接受该操作
if rf.role != ROLE_LEADER {
return -1, -1, false
}
log := LogEntry{
Command: command,
Term: rf.currentTerm,
}
rf.log = append(rf.log, log)
index = len(rf.log)
term = rf.currentTerm
rf.persist()
DPrintf("Leader-Node[%d] add new Command[%d], logIndex[%d] currentTerm[%d]", rf.me, log.Command, index, term)
return index, term, isLeader
}
electionLoop函数修改
此函数内部新增如下内容:nextIndex设置为leader当前最大日志长度+1,matchIndex置为0
// for lab-2b. 新选举出来的leader要刷新nextIndex和matchIndex
rf.nextIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = len(rf.log) + 1
}
rf.matchIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.peers); i++ {
rf.matchIndex[i] = 0
}
appendEntriesLoop函数修改(核心)
for peerId := 0; peerId < len(rf.peers); peerId++ {
if peerId == rf.me {
continue
}
//初始化RPC请求参数并填充
args := AppendEntriesArgs{}
args.Term = rf.currentTerm
args.LeaderId = rf.me
args.PrevLogIndex = rf.nextIndex[peerId] - 1
if args.PrevLogIndex > 0 {
args.PrevLogTerm = rf.log[args.PrevLogIndex-1].Term
}
args.Entries = make([]LogEntry, 0)
args.Entries = append(args.Entries, rf.log[rf.nextIndex[peerId]-1:]...)
args.LeaderCommit = rf.commitIndex
DPrintf("RaftNode[%d] appendEntries starts, currentTerm[%d] peer[%d] logIndex=[%d] nextIndex[%d] matchIndex[%d] args.Entries[%d] commitIndex[%d]",
rf.me, rf.currentTerm, peerId, len(rf.log), rf.nextIndex[peerId], rf.matchIndex[peerId], len(args.Entries), rf.commitIndex)
go func(id int, args *AppendEntriesArgs) {
reply := AppendEntriesReply{}
if ok := rf.sendAppendEntries(id, args, &reply); ok {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm { // 变成follower
rf.role = ROLE_FOLLOWER
rf.leaderId = -1
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.persist()
return // 身份变更,及时中断退出
}
if reply.Success {
// leader中该peerId对应的nextIndex和matchIndex增加
rf.nextIndex[id] = args.PrevLogIndex + len(args.Entries) + 1
rf.matchIndex[id] = rf.nextIndex[id] - 1
// 计算所有服务器的matchIndex
// 排序后取中位数
// 不包括自己在内
sortedMatchIndex := make([]int, 0)
sortedMatchIndex = append(sortedMatchIndex, len(rf.log))
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
sortedMatchIndex = append(sortedMatchIndex, rf.matchIndex[i])
}
sort.Ints(sortedMatchIndex)
// 更新commitIndex
// 如果中位数大于当前commitIndex且term匹配才更新,保证一致性
// 不同term仅完成复制,不更新commitIndex
newCommitIndex := sortedMatchIndex[len(rf.peers)/2]
if newCommitIndex > rf.commitIndex &&
rf.log[newCommitIndex-1].Term == rf.currentTerm {
rf.commitIndex = newCommitIndex
}
// 如果复制失败,则重置nextIndex,注意不能为0
} else {
rf.nextIndex[id] -= 1
if rf.nextIndex[id] < 1 {
rf.nextIndex[id] = 1
}
}
DPrintf("RaftNode[%d] appendEntries ends, peerTerm[%d] myCurrentTerm[%d] myRole[%s]", rf.me, reply.Term, rf.currentTerm, rf.role)
}
}(peerId, &args)
}
重点关注if reply.Success {...}内部
- leader中该peerId对应的nextIndex和matchIndex增加
- sort.Ints(sortedMatchIndex)利用排序取中位数法得到大多数节点的复制进度
- 严格遵守论文fig.8! term匹配才更新, 不同term仅完成复制,不更新commitIndex,具体原因见图片笔记
- 复制失败,则重置nextIndex -= 1,注意不能为0
AppendEntries函数修改(核心)
// applyLogLoop 定期检查是否有新的日志需要提交给应用层
// for lab-2b
// 如果本地没有前一个日志(PrevLogIndex)的话,那么false
if len(rf.log) < args.PrevLogIndex {
return
}
// 如果本地有前一个日志的话,那么term必须相同,否则false
if args.PrevLogIndex > 0 && rf.log[args.PrevLogIndex-1].Term != args.PrevLogTerm {
return
}
for i, logEntry := range args.Entries {
index := args.PrevLogIndex + i + 1
if index > len(rf.log) {
rf.log = append(rf.log, logEntry)
} else {
if rf.log[index-1].Term != logEntry.Term {
rf.log = rf.log[:index-1] // 删除当前以及后续所有log
rf.log = append(rf.log, logEntry) // 把新log加入进来
} // term一样啥也不用做,继续向后比对Log
}
}
rf.persist()
// 更新提交下标
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = args.LeaderCommit
if len(rf.log) < rf.commitIndex {
rf.commitIndex = len(rf.log)
}
}
reply.Success = true
这里新增代码逻辑主要用于处理日志复制以及相关属性更新。for循环中的强行覆盖复制符合论文描述,具体细节不再阐述。
applyLogLoop
Lab-2b新增函数,在Make函数中通过
go rf.applyLogLoop(applyCh)
启动。定时把节点中已经提交的日志apply到应用层。代码中的server.go和config.go会需要使用applyCh
// applyLogLoop 定期检查是否有新的日志需要提交给应用层
// 它会一直运行直到这个 Raft 节点被 kill
func (rf *Raft) applyLogLoop(applyCh chan ApplyMsg) {
for !rf.killed() {
time.Sleep(10 * time.Millisecond)
var appliedMsgs = make([]ApplyMsg, 0)
func() {
rf.mu.Lock()
defer rf.mu.Unlock()
for rf.commitIndex > rf.lastApplied {
rf.lastApplied += 1
appliedMsgs = append(appliedMsgs, ApplyMsg{
CommandValid: true,
Command: rf.log[rf.lastApplied-1].Command,
CommandIndex: rf.lastApplied,
CommandTerm: rf.log[rf.lastApplied-1].Term,
})
DPrintf("RaftNode[%d] applyLog, currentTerm[%d] lastApplied[%d] commitIndex[%d]", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex)
}
}()
// 释放锁,向应用层提交日志
for _, msg := range appliedMsgs {
applyCh <- msg
}
}
}