I. Source
- MIT-6.824 2020 课程官网
- Lab3: KVRaft 实验主页
- simviso 精品付费翻译 MIT 6.824 课程
- Paper - Raft extended version
II. My Code
- source code 的 Gitee 地址
- Lab3B: KVRaft with log compaction的 Gitee 地址
课程官网提供的 Lab 代码下载地址,我没有访问成功,于是我从 Github 其他用户那里 clone 到干净的源码,有需要可以访问我的 Gitee 获取
III. Motivation
Lab3A: KVRaft without log compaction 解决了分布式 KV 存储数据库问题,如果需要请移步此博文。而 Lab3B: KVRaft with log compaction 主要是解决存储的日志量过大的问题,原先的 KV Server 只要一直在运行,那么 Raft 层存储的日志条目就会一直增加。一直增加的态势是非常糟糕的,因为内存是有限的,撑爆内存只是早晚的事
所以,我们设计出一种压缩日志的机制,即是日志增长到一定程度之后,通过 snapshot 的手段记录下 kv 数据库的状态,并丢弃 Raft 层已存储的日志条目,从而达到压缩日志,节省空间的目的
其二,通过 snapshot 手段能够使落后的 follower 更快地跟上 leader 最新的状态,leader 向落后的 follower 发送 snapshot,让 follower 直接复制最新的 kv 表,而不是让落后的 follower 慢慢迭代成最新的状态。这样可以大大提高同步的速度
IV. Solution
先梳理一下 primary server 的 snapshot 流程。首先,client 会向 primary server 发送 Get/Put 请求,server 收到请求后会将请求转为日志条目,并追加至 Raft 层的日志中;待 Raft 集群同步完成之后,servers 会判断 Raft 层的日志是否已达到截断阈值(日志太长了)
这里请注意,不论是 primary server 还是 secondary server,它们都有资格制作 snapshot。在正常的同步流程中,都是让每位 follower 自己制作 snapshot 的,而不是等待 leader 发来的 InstallSnapshot RPC 进行被动同步。这样做的好处是减少集群中通信的开销,只有在 follower 的 snapshot 与 leader 的相差较远的时候,leader 才会向其发送 InstallSnapshot RPC 进行 snapshot 同步
回归正题,如果 Raft 层的日志条目太多了,则将此时的 kv 表抓取下来制作成 snapshot,并告知 Raft 层:kv 层已生成最新的 snapshot,Raft 层在收到 snapshot 之后,就可以丢掉 snapshot 点之前的日志条目了
另外,集群中的其他 secondary servers 在收到 leader 发来的 InstallSnapshot RPC 时,将会根据自己的状态选择是否更新 kv 表,其中也包含一些丢弃 snapshot 点之前的过期条目之类的操作
S1 - server制作/接收snapshot
和 Lab3A: KVRaft without log compaction 一样,代码的结构并没变化,只是在 kvraft/server.go:loop()
中增加了一些接收和发送 snapshot 的业务代码,
func (kv *KVServer) loop() {
for !kv.killed() {
msg := <-kv.applyCh /* Raft 集群已同步 */
if !msg.CommandValid { /* follower 的 kv 层要抄 leader 送来的作业 */
r := bytes.NewBuffer(msg.Snapshot)
d := gob.NewDecoder(r)
kv.mu.Lock()
kv.db = make(map[string]string)
kv.ack = make(map[int64]int)
d.Decode(&kv.db)
d.Decode(&kv.ack)
kv.mu.Unlock()
} else {
op := msg.Command.(Op) /* 将 Command 空接口部分强制转换为 Op*/
idx := msg.CommandIndex /* 这是第几条命令 */
kv.mu.Lock()
/* 准备将该命令应用到状态机 */
if kv.isUp2Date(op.ClntId, op.CmdId) { /* 不执行过期的命令 */
kv.updateDB(op)
kv.ack[op.ClntId] = op.CmdId /* ack 跟踪最新的命令编号 */
}
if kv.maxraftstate != -1 && kv.rf.GetPersisterSize() > kv.maxraftstate {
w := new(bytes.Buffer)
e := gob.NewEncoder(w)
e.Encode(kv.db)
e.Encode(kv.ack)
data := w.Bytes()
kv.rf.StartSnapshot(idx, data) /* 这里不能用 goroutine,否则 server->raft 的 snapshot 不及时,进而导致 statesize 过大 */
}
/*
* 分流,回应 client,即继续 Get 或 PutAppend 当中的流程,
* 最后再回复 client,不然会导致 leader 和 follower 制作 snapshot 不同步
*/
ch, ok := kv.results[idx]
if ok { /* RPC Handler 已经准备好读取已同步的命令了 */
select {
case <-kv.results[idx]:
default:
}
ch <- op
}
kv.mu.Unlock()
}
}
}
日志条目在 Raft 层同步完成之后,server 会判断该条目( Raft 层传来的命令)到底是要其接收 OR 制作 snapshot。何时接收 snapshot?无非是在 primary server 觉得 secondary server 有点落后的时候
leader 会在 Raft 层向 follower 发送 InstallSnapshot RPC,follower 收到之后进行一系列的 snapshot 更新操作,然后通过 kv 层的 secondary:目前收到了来自 primary 的最新 snapshot,望你取走,更新你的 kv 表,对应第 5 ~ 15 行
何时制作 snapshot?在日志条目达到一定数量时,超过 maxraftstate
阈值就开始考虑制作,对应第 26 ~ 33 行。且 maxraftstate
不为 -1,这在 Lab3: KVRaft 实验主页 上有过提示,
If
maxraftstate
is -1, you do not have to snapshot.maxraftstate
applies to the GOB-encoded bytes your Raft passes topersister.SaveRaftState()
.
其中的 GetPersisterSize()
是我自己增加的辅助函数,定义在 raft/snapshot.go
中,
/* 辅助函数,用于 kv 层感知 snapshot 阈值 */
func (rf *Raft) GetPersisterSize() int {
return rf.persister.RaftStateSize()
}
其实就是调用 persister
对象的 RaftStateSize()
方法,让 kv 层能够感知到当前 Raft 层已持久化了多少日志,好做进一步的制作 snapshot 判断
第 32 行必须立刻将制作好的 snapshot 发往 Raft 层,如果采用 goroutine 异步的手段,会有 snapshot 延迟发送的情况发生,进而导致日志条目数量与实际制作 snapshot 时不符
另外,制作和发送 snapshot 这件事必须在回应 client 请求(第 39 ~ 46 行)之前完成,这个顺序不能乱。用正常的逻辑思考这个问题也很容易理顺,即是只有在自己和集群中其他服务器同步好了之后,才会去回应外部的请求。其中,“同步完成” 如何定义?那自然包含制作和发送 snapshot 这一套子流程啦
S2 - raft层保存snapshot
server 在将 snapshot 从 kv 层发往 Raft 层后,后者是通过 StartSnapshot()
来接收 snapshot,定义在 raft/snapshot.go
中,
func (rf *Raft) StartSnapshot(idx int, snapshot []byte) {
if rf.killed() {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
baseIdx := rf.log[0].Idx
/* 已 snapshoted 不予处理 OR 超前的错误 snapshot */
if idx <= baseIdx || idx > rf.commitIdx { /* 何为超前? */
return
}
/* 设置 snapshot 截断点 */
lastIncludedIdx := idx
lastIncludedTerm := rf.log[idx-baseIdx].Term
/* 把截断点之前的日志丢掉 */
newLog := make([]LogEntry, 0)
newLog = append(newLog, LogEntry{Idx: lastIncludedIdx, Term: lastIncludedTerm})
for i := lastIncludedIdx - baseIdx + 1; i < len(rf.log); i++ {
newLog = append(newLog, rf.log[i])
}
rf.log = newLog
/* Lab2C 持久化操作 */
rf.persist()
rf.persister.SaveSnapshot(snapshot)
}
主要的逻辑即是保存 kv 层发来的 snapshot,首先会记录下 snapshot 的截断点(第 16 ~ 18 行),然后将截断点之前的日志条目统统丢掉(第 21 ~ 28 行),其中需要注意逻辑编号和物理下标之间的转换,这点非常重要,即 i := lastIncludedIdx-baseIdx+1
。最后,持久化一下日志条目和 snapshot 即可
S3 - leader定向发送InstallSnapshot RPC
leader 在广播 AE 包时如果发现该 follower 比较落后(体现在 nextIdx 上),会向其发送 InstallSnapshot RPC,要求该 follower 同步最新的 snapshot。具体的业务逻辑还是 Lab2C: Persist 的 raft/raft.go:boatcastAE()
框架,
func (rf *Raft) boatcastAE() {
rf.mu.Lock()
defer rf.mu.Unlock()
/* 所有 peers 应该收到相同的 AE 包 */
for i, _ := range rf.peers {
if i == rf.me || rf.role != Leader {
continue
}
baseIdx := rf.log[0].Idx
if rf.nextIdxs[i]-1 < baseIdx {
targs := InstallSnapshotArgs{
Term: rf.curTerm,
LeaderId: rf.me,
LastIncludedIdx: rf.log[0].Idx,
LastIncludedTerm: rf.log[0].Term,
Data: rf.persister.snapshot,
}
treply := InstallSnapshotReply{}
go func(id int, args InstallSnapshotArgs, reply InstallSnapshotReply) {
rf.sendInstallSnapshot(id, &args, &reply)
}(i, targs, treply)
} else {
/* 这一块 RPC 初始化操作必须写在 goroutine 之外,因为在 goroutine 内部,锁是失效的 */
targs := AppendEntriesArgs{
Term: rf.curTerm,
LeaderId: rf.me,
LeaderCommit: rf.commitIdx,
}
/*------------Lab2B Log Replication----------------*/
/* nextIdxs 和 prevLogIdx 都是逻辑编号 */
if rf.nextIdxs[i]-1 <= rf.lastLogIdx() { /* backup test 不加以限制可能会越界 */
targs.PrevLogIdx = rf.nextIdxs[i] - 1
} else {
targs.PrevLogIdx = rf.lastLogIdx() /* 意味着发送不含日志条目的心跳包 */
}
targs.PrevLogTerm = rf.log[targs.PrevLogIdx-baseIdx].Term
if rf.nextIdxs[i] <= rf.lastLogIdx() { /* rejoin test 不加以限制可能会越界 */
targs.Entries = make([]LogEntry, len(rf.log[rf.nextIdxs[i]-baseIdx:]))
copy(targs.Entries, rf.log[rf.nextIdxs[i]-baseIdx:])
}
treply := AppendEntriesReply{}
go func(id int, args AppendEntriesArgs, reply AppendEntriesReply) {
rf.sendAppendEntries(id, &args, &reply)
}(i, targs, treply)
}
}
}
主要是新增了发送 snapshot 的逻辑,对应第 13 ~ 26 行。同步之前的准备也很简单,就是做一些简单的封装,包括 snapshot 截断点及对应的 term,对应变量 tagrs
,然后就是调用 raft/snapshot.go:sendInstallSnapshot()
RPC 方法将其发走
这里需要注意的是如果是常规的发送 AE 包,接收方( appendEntries.go:AppendEntries )需要添加一段 snapshot 截断点的逻辑,
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
/*------------Lab2C Persist---------------*/
defer rf.persist()
reply.Success = false
reply.Term = rf.curTerm
if args.Term < rf.curTerm {
return
}
/* 主要为了让旧 leader 收到了新 leader 的心跳包后而被迫退位 */
if args.Term > rf.curTerm {
...
}
/* 心跳包只对 follower 和 candidate 管用,leader 是不会响应它的 */
rf.heartBeatCh <- struct{}{}
rf.votedFor = args.LeaderId
/*------------Lab2B Log Replication----------------*/
baseIdx := rf.log[0].Idx
/*------------Lab3B Log Compaction----------------*/
if baseIdx > args.PrevLogIdx { /* snapshot 之前的日志条目已提交了,请勿覆盖 */
reply.XTerm = XTermCommitted
reply.XIdx = baseIdx + 1
return
}
/* 已提交的日志条目不允许覆盖 */
if args.PrevLogIdx < rf.commitIdx {
...
return
}
if rf.lastLogIdx() < args.PrevLogIdx { /* 违法下标,越界了 */
...
return
}
...
}
通过第 27 ~ 31 行的代码判断 snapshot 截断点之前的日志条目是否已经提交了。翻译一下,即是如果 leader 发来的条目是 follower 已经提交过的,那么 follower 直接拒绝并告知 leader:下次发新的来( snapshot 截断点之后的)
S4 - follower回应InstallSnapshot RPC请求
在 follower 收到 InstallSnapshot RPC 请求后,首先会做一些常规的判断,包括 term 的比较等等,这些和 Lab2B: Log Replication 的 AppendEntries RPC 请求的处理逻辑相同,在这就不再赘述。定义在 raft/snapshot.go
中,
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.curTerm
if args.Term < rf.curTerm {
return
}
/* 主要为了让旧 leader 收到了新 leader 的心跳包后而被迫退位 */
if args.Term > rf.curTerm {
rf.curTerm = args.Term
rf.role = Follower
rf.votedFor = NoBody
}
/* 心跳包只对 follower 和 candidate 管用,leader 是不会响应它的 */
rf.heartBeatCh <- struct{}{}
rf.votedFor = args.LeaderId
baseIdx := rf.log[0].Idx
/* 版本较低的 snapshot 不予理会 */
if args.LastIncludedIdx <= baseIdx {
return
}
rf.truncateLog(args.LastIncludedIdx, args.LastIncludedTerm)
msg := ApplyMsg{CommandValid: false, Snapshot: args.Data}
rf.persist()
/* follower 保存 leader 发来的快照 */
rf.persister.SaveSnapshot(args.Data)
/* 不用提交 snapshot 之前的日志条目啦,但请为之后的提交做好准备 */
rf.commitIdx = args.LastIncludedIdx
rf.appliedIdx = args.LastIncludedIdx
/* 将 snapshot 直接交给 kv 层,无需 commit */
rf.applyCh <- msg
}
重点讲解第 21 行之后的逻辑,首先将上次 snapshot 截断点的标号记为 baseIdx
,如果请求所包含的 snapshot 比当前的 baseIdx
落后,则不予理会;反之就需要考虑如何同步请求发来的 snapshot 了
当然,第一步就是截断 snapshot 点之前的日志条目,在这里我封装了 truncateLog()
方法以便复用,
func (rf *Raft) truncateLog(lastIncludedIdx int, lastIncludedTerm int) {
idx := 0
/* 从日志后面往前扫,寻找 lastIncluded 条目所在的位置 */
for i := len(rf.log) - 1; i >= 0; i-- {
if rf.log[i].Idx == lastIncludedIdx && rf.log[i].Term == lastIncludedTerm {
idx = i
break
}
}
newLog := make([]LogEntry, 0)
newLog = append(newLog, LogEntry{Idx: lastIncludedIdx, Term: lastIncludedTerm})
if idx != 0 { /* 有找到 lastIncluded 条目 */
for i := idx + 1; i < len(rf.log); i++ {
newLog = append(newLog, rf.log[i])
}
}
rf.log = newLog
}
逻辑很简单,就是从日志后面往前扫,寻找 lastIncluded
条目所在的位置。然后,以该点为开端将之后的日志条目顺起来就可以了。额外需要考虑的情况是原来的日志条目中可能并不存在 lastIncluded
的条目,这个时候只需更新日志下标 0 处的条目即可( snapshot 截断点)
另外,为什么要新建 newLog
切片?主要是为了实现垃圾回收功能,这是 Golang 的 gc 机制的特色。如果还在引用 rf.log
,那么就回收不了 snapshot 截断点之前的日志条目的空间。所以,我们必须另起炉灶(恶心的一比)
言归正传,follower 在截断之后就可以将 snapshot 保存在本地了。这里需要注意的是不用再提交 snapshot 截断点之前的日志条目了,但请为之后的 commit 做好准备,对应代码在第 37 和 38 行更新 appliedIdx
和 commitIdx
操作。最后,就可以将包含来自于 leader 的 snapshot 数据打包发往 kv 层了
S5 - leader收到InstallSnapshot Reply
leader 在收到 follower 关于 InstallSnapshot RPC 回应之后,同样也是做一些常规的判断,如同 Lab2B: Log Replication 的 AppendEntries Reply,
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
DPrintf("[%v->%v] send install rpc in snapshot.go:sendInstallSnapshot\n", rf.me, server)
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if !ok {
return ok
}
term := rf.curTerm
/* 自身过期的情况下 */
if rf.role != Leader || args.Term != term {
return ok
}
/* 仅仅是被动退位,不涉及到需要投票给谁 */
if reply.Term > term {
rf.curTerm = reply.Term
rf.role = Follower /* 主动回滚至 follower */
rf.votedFor = NoBody
rf.persist()
return ok
}
rf.nextIdxs[server] = args.LastIncludedIdx + 1
rf.matchIdxs[server] = rf.nextIdxs[server] - 1
return ok
}
需要关注的即是第 27 和 28 行新增的 nextIdx 调整的逻辑部分,leader 下一次再向该 follower 发送 snapshot 截断点下一个日志条目即可
S6 - server启动初始化
kv server 和 Raft 一样,需要保证持久化机制,所以在 kvraft/server.go:StartKVServer()
中添加重启之后读取 snapshot 的操作,
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
// You may need initialization code here.
/* applyCh 要是异步的,不然会阻塞 */
kv.applyCh = make(chan raft.ApplyMsg, 100)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
// You may need initialization code here.
kv.db = make(map[string]string)
kv.ack = make(map[int64]int)
kv.results = make(map[int]chan Op)
/*------------Lab3B Log Compaction----------------*/
kv.readSnapshot(kv.rf.Persister().ReadSnapshot())
go kv.loop()
return kv
}
对应第 22 行 readSnapshot()
,它也定义在 kvraft/server.go
中,
/* 辅助函数,读取已持久化的 snapshot */
func (kv *KVServer) readSnapshot(data []byte) {
if data == nil || len(data) == 0 {
return
}
r := bytes.NewBuffer(data)
d := gob.NewDecoder(r)
var db map[string]string
var ack map[int64]int
if d.Decode(&db) != nil || d.Decode(&ack) != nil {
DPrintf("%v readSnapshot err in server.go:readSnapshot\n", kv.rf.GetId())
} else {
kv.db = db
kv.ack = ack
}
}
其中的 rf.Persister()
是我定义在 raft/raft.go
中的辅助函数,它返回 Raft 的 persister 对象,
func (rf *Raft) Persister() *Persister {
return rf.persister
}
最后,还需要注意的是 Raft 层重启初始化时需要更新 appliedIdx
的值为 snapshot 截断点,在 raft/raft.go:readPersister
中,
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := gob.NewDecoder(r)
var curTerm int
var votedFor int
var log []LogEntry
if d.Decode(&curTerm) != nil || d.Decode(&votedFor) != nil || d.Decode(&log) != nil {
DPrintf("read persist fail\n")
} else {
rf.curTerm = curTerm
rf.votedFor = votedFor
rf.log = log
}
/* restart 之后一定要将 appliedIdx 重置成 snapshot 点 */
rf.appliedIdx = rf.log[0].Idx
}
意在告诉 Raft 请勿再提交已提交过的日志条目了!至此,梳理完 KVRaft with log compaction 的整套流程
V. Result
golang 比较麻烦,它有 GOPATH 模式,也有 GOMODULE 模式,6.824-golabs-2020 采用的是 GOPATH,所以在运行之前,需要将 golang 默认的 GOMODULE 关掉,
$ export GO111MODULE="off"
随后,就可以进入 src/kvraft
中开始运行测试程序,
$ go test -run 3B
仅此一次的测试远远不够,可以通过 shell 循环,让测试跑个一百次就差不多了
$ for i in {1..100}; go test -run 3B
这样,如果还没错误,那应该是真的通过了。分布式的很多 bug 需要通过反复模拟才能复现出来的,它不像单线程程序那样,永远是幂等的情况。也可以用我写的脚本 test_3b.py,文章来源:https://www.toymoban.com/news/detail-617259.html
import os
ntests = 100
nfails = 0
noks = 0
if __name__ == "__main__":
for i in range(ntests):
print("*************ROUND " + str(i+1) + "/" + str(ntests) + "*************")
filename = "out" + str(i+1)
os.system("go test -run 3B | tee " + filename)
with open(filename) as f:
if 'PASS' in f.read():
noks += 1
print("✔️ok, " + str(noks) + "/" + str(ntests))
os.system("rm " + filename)
else:
nfails += 1
print("✖️fails, " + str(nfails) + "/" + str(ntests))
continue
我已经跑过一百次,无一 FAIL文章来源地址https://www.toymoban.com/news/detail-617259.html
到了这里,关于「实验记录」MIT 6.824 KVRaft Lab3B With Log Compaction的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!