Lab3:KVRaft

写在前面:lab3 的内容是要在 lab2 的基础上实现一个高可用的 KV 存储服务,算是要将 raft 真正的用起来。相关协调服务可以参考OngaroPhD的作者的博士论文或者是ZooKeeper 论文笔记的协调服务,另外Chain Replicate 论文笔记的设计中同样满足了高性能读的服务

PartA:Key/value service without snapshots

客户端:

Client创建

创建client自动生成ID号(使用nrand()函数生成)用于Session来记录lastRequest的回复,用LeaderID快速连接server,减少重试leader时间

1
2
3
4
5
6
7
8
9
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// You'll have to add code here.
ck.LeaderId = 0
ck.ClientId = nrand()
ck.CommandId = 0
return ck
}

RetryToLeader() :重连leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (ck *Clerk) RetryToLeader() {
oneRound := 0
args := &GetArgs{Key: "", ClientId: ck.ClientId, CommandId: ck.CommandId}
for {
ck.LeaderId = (ck.LeaderId + 1) % len(ck.servers)
oneRound++
reply := &GetReply{}
reply.Err = ""
ok := ck.servers[ck.LeaderId].Call("KVServer.Get", args, reply)
for !ok {
//在分区后client可能无法连接到该失联分区的server,发送失败则更换server进行连接
ck.LeaderId = (ck.LeaderId + 1) % len(ck.servers)
ok = ck.servers[ck.LeaderId].Call("KVServer.Get", args, reply)
time.Sleep(10 * time.Millisecond)
}
if reply.Err == ErrWrongLeader || reply.Err == ErrTimeout {

ck.LeaderId = (ck.LeaderId + 1) % len(ck.servers)
if oneRound == len(ck.servers) {
oneRound = 0
time.Sleep(700 * time.Millisecond)
}
continue
}
return
}
}

Client发送读写

读操作:Get()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (ck *Clerk) Get(key string) string {
ck.CommandId++
args := &GetArgs{
Key: key,
CommandId: ck.CommandId,
ClientId: ck.ClientId,
}

// You will have to modify this function.
for {
reply := &GetReply{}
ok := ck.servers[ck.LeaderId].Call("KVServer.Get", args, reply)

for !ok {
//this serverID is disconnect and change ServerID
ck.LeaderId = (ck.LeaderId + 1) % len(ck.servers)
ok = ck.servers[ck.LeaderId].Call("KVServer.Get", args, reply)
time.Sleep(10 * time.Millisecond)
}

if reply.Err == ErrWrongLeader || reply.Err == ErrTimeout {
ck.RetryToLeader()
continue
} else if reply.Err == ErrNoKey {
return ""
} else if reply.Err == OK {
return reply.Value
}
}
}

写操作:PutAppend()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.CommandId++
args := &PutAppendArgs{
Key: key,
Value: value,
Op: op,
CommandId: ck.CommandId,
ClientId: ck.ClientId,
}
for {
reply := &PutAppendReply{}
ok := ck.servers[ck.LeaderId].Call("KVServer.PutAppend", args, reply)
for !ok {
//this serverID is disconnect and change ServerID
ck.LeaderId = (ck.LeaderId + 1) % len(ck.servers)
ok = ck.servers[ck.LeaderId].Call("KVServer.PutAppend", args, reply)
time.Sleep(10 * time.Millisecond)
}
if reply.Err == ErrWrongLeader || reply.Err == ErrTimeout {
ck.RetryToLeader()
continue
} else if reply.Err == OK {
break
}
}
}

服务端:

创建服务器:StartKVServer():

  • 服务器数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()

maxraftstate int // snapshot if log grows this big

// Your definitions here.
servers int
lastapplied int
Term int
sm KVStateMachine
Session map[int64]LastRespond // (clientId, lastRespond)record last command's id ,respond
NotifyChans map[int]chan CommandRespond // (client , Chan Respond) ansynchoron to notif

// read only check whether all log are applied
isRecovery bool
}
  • 创建kv-server 与 下层raft进行交互,到达共识提高容错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
labgob.Register(Op{})

kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate

// You may need initialization code here.
kv.applyCh = make(chan raft.ApplyMsg)
//creat raft layer
kv.rf = raft.Make(servers, me, persister, kv.applyCh)

// You may need initialization code here.
kv.lastapplied = kv.rf.GetFirstLogEntry().Index
kv.Term = 0
kv.sm.mkv = NewMemoryKV()
kv.Session = make(map[int64]LastRespond)
kv.NotifyChans = make(map[int]chan CommandRespond)
kv.servers = len(servers)
//handle with recovery
kv.isRecovery = true

kv.recovery()

go kv.applier()

return kv
}

StateMachine的内存模型:

主要是创建一个kv-table,以一种内存模型的方式进行记录client端发来的修改信息,后续用于生成快照

内存模型定义

1
2
3
4
5
6
7
8
9
10
11
type KVStateMachine struct {
mkv *MemoryKV
}

type MemoryKV struct {//键值表的定义
KV map[string]string
}

func NewMemoryKV() *MemoryKV {//申请堆内存
return &MemoryKV{make(map[string]string)}
}

kv-table的内存模型的接口:

  • Append() Put() Get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (mkv *MemoryKV) KV_Get(key string) (string, Err) {
if value, ok := mkv.KV[key]; ok {
return value, OK
}
return "", ErrNoKey
}

func (mkv *MemoryKV) KV_Put(key, value string) Err {
mkv.KV[key] = value
return OK
}

func (mkv *MemoryKV) KV_Append(key, value string) Err {
if mkv.KV[key] == "" {
mkv.KV[key] = value
} else {
mkv.KV[key] += value
}
return OK
}

应用日志操作到状态机

处理request信息并记录其respond:将ApplyCh中的op msg应用至kvserver状态机中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (sm *KVStateMachine) ApplyToStateMachine(op Op) CommandRespond {
var cr CommandRespond

switch op.Opt {
case "Get":
cr.Value, cr.Err = sm.mkv.KV_Get(op.Key)
case "Append":
cr.Err = sm.mkv.KV_Append(op.Key, op.Value)
case "Put":
cr.Err = sm.mkv.KV_Put(op.Key, op.Value)
}

return cr
}

处理模型

异步处理

  1. 使用NotifyChans去通知处理request的接口,raft的op以及apply,以channel的方式进行异步的处理

  2. Session记录(clientID,LastRespond),将每个client的request的响应消息进行记录,用于防止client retry多次将命令应用于state machine,以及快速响应

applier():异步的接收raft层apply channel所传递上来的消息

  1. 防止日志回滚 msg.CommandIndex <= kv.lastapplied

  2. 处理duplicate消息

  3. Apply 将apply的消息应用到状态机中

  4. 回复客户端 leader需要通过NotifyChan将状态机应用命令后回复的消息传递给该消息所调用的Put/Get,从而回复给Client,leader只能通知当前周期的命令,非当前周期的命令没有接收者从而导致阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (kv *KVServer) applier() {
//for !kv.killed() {
for msg := range kv.applyCh {
if msg.CommandValid {
//avoid log rollback
kv.mu.Lock()
if msg.CommandIndex <= kv.lastapplied {
DPrintf("S%d discard outdate msg", kv.me)
kv.mu.Unlock()
continue
}
kv.lastapplied = msg.CommandIndex
var respond CommandRespond
op := msg.Command.(Op)
//apply msg -> kv machine
if kv.isRedundantRequest(op.ClientId, op.CommandId) {
respond = kv.Session[op.ClientId].Respond
} else {
respond = kv.sm.ApplyToStateMachine(op)
_, isLeader := kv.rf.GetState()
if isLeader && op.opt != "Get" {
kv.Session[op.ClientId] = LastRespond{op.CommandId, respond}
DPrintf("S%d -> C%d LastRequest type:%v K:%v V:%v CmdId:%d CMI:%d T:%d",
kv.me, op.ClientId%50, op.Opt, op.Key, op.Value,
op.CommandId, msg.CommandIndex, op.CommandTerm)
}
}
kv.mu.Unlock()

if currentTerm, isLeader := kv.rf.GetState(); isLeader &&
currentTerm == op.CommandTerm {
DPrintf("S%d Notify%v to C%d index %d", kv.me, respond,
op.ClientId%50, msg.CommandIndex)
ch := kv.GetNotifyChan(msg.CommandIndex)
ch <- respond
}

} else if msg.SnapshotValid {
}
}
}

读写操作:

读写操作需要满足线性一致性:[[ZooKeeper 论文笔记#线性一致性:Linearizability]]

  • PutAppend():处理client的修改请求
  1. Client请求该方法

  2. 冗余检测

  3. 调用Start将Command传递到下层达到共识

  4. 创建Notifychan等待applier将response消息push

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
_, is_leader := kv.rf.GetState()
if !is_leader {
reply.Err = ErrWrongLeader
return
}

kv.mu.Lock()
if kv.isRedundantRequest(args.ClientId, args.CommandId) {
err := kv.Session[args.ClientId].respond.Err
reply.Err = err
kv.mu.Unlock()
DPrintf("C%d PutAppend request is redundant(PA)", args.ClientId%5)
return
}
kv.mu.Unlock()

op := Op{
Opt: args.Op,
Key: args.Key,
Value: args.Value,
ClientId: args.ClientId,
CommandId: args.CommandId,
}

index, _, is_leader := kv.rf.Start(op)
if !is_leader {
reply.Err = ErrWrongLeader
return
}
DPrintf("S%d <- C%d PutAppendRequest CmdId %d", kv.me, args.ClientId%50, args.CommandId)

ch := kv.GetNotifyChan(index)

select {
case cr := <-ch:
reply.Err = cr.Err
case <-time.After(Timeout):
reply.Err = ErrTimeOut
}
    go kv.RecycleOutdateChan(ch)
}
  • Get():将read操作记录至log,主要是防止分区时向minority partition的leader请求返回stale data导致线性不一致,缺陷:将read操作同步会导致浪费磁盘空间以及同步写入read log的时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
_, is_leader := kv.rf.GetState()
if !is_leader {
//DPrintf("S%d is not KVLeader cmdID%d", kv.me, args.CommandId)
reply.Err = ErrWrongLeader
return
}
if args.Key == "" {
return
}

kv.mu.Lock()
if kv.isRedundantRequest(args.ClientId, args.CommandId) {
err := kv.Session[args.ClientId].respond.Err
value := kv.Session[args.ClientId].respond.Value
reply.Err, reply.Value = err, value
kv.mu.Unlock()
DPrintf("C%d Get request is redundant(Get) cmdId %d", args.ClientId%5, args.CommandId)
return
}
kv.mu.Unlock()

op := Op{
Opt: "Get",
Key: args.Key,
ClientId: args.ClientId,
CommandId: args.CommandId,
}
index, _, is_leader := kv.rf.Start(op)
if !is_leader {
reply.Err = ErrWrongLeader
return
}

DPrintf("S%d <- C%d GetRequest LI:%d CmdId:%d", kv.me, args.ClientId%50, index, args.CommandId)

ch := kv.GetNotifyChan(index)

select {
case cr := <-ch:
reply.Err, reply.Value = cr.Err, cr.Value
case <-time.After(Timeout):
reply.Err = ErrTimeout
}
    go kv.RecycleOutdateChan(ch)
}

Read-Only Query:

Read-only Query不讲读日志写入磁盘,因为read本就是幂等操作(idempotent),不会影响状态机的状态,因此可以不用写入磁盘同步

challenge:共识

  • 分区容错性(Partition-torlerance):由于不写入只读日志到raft层,无法使server到达共识(强一致性),因此当前leader无法知道是否处于大多数分区中,可能会导致分区时少部分区域的leader回复给client旧的数据。

    解决方案: 因此在回复client时需要确认自己的leader状态,发送heartbeat能否获得大多数的server响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
-------------raft.go----------------
func (rf *Raft) KV_Sendheartbeat() bool {
num := 1

var mu sync.Mutex
var wg sync.WaitGroup
rf.mu.Lock()
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeadId: rf.me,
LeaderCommit: rf.commitIndex,
}
rf.mu.Unlock()
for i, p := range rf.peers {
if p == rf.peers[rf.me] {
continue
}
wg.Add(1)
go func(server int) {
_, ok := rf.CallAE(server, args)
mu.Lock()
defer mu.Unlock()
if !ok {
wg.Done()
return
}
wg.Done()
num++
}(i)
}
var isMajority bool
wg.Wait()
mu.Lock()
//majority of peers can rececive
if num > len(rf.peers)/2 {
isMajority = true
} else {
isMajority = false
}
mu.Unlock()

return isMajority
}
1
2
3
4
5
6
7
8
---------kvraft.go-----------
func (kv *KVServer) ConfirmLeadership() bool {
var isMajority bool
if _, isLeader := kv.rf.GetState(); isLeader {
isMajority = kv.rf.KV_Sendheartbeat()
}
return isMajority
}
  • Leader日志applied到最新(Leader completeness guarantee) :出现分区等问题时新的Leader没有收到当前任期的Log因此之前的任期的log也不会提交并应用,导致当leader进行在处理只读操作时leader并没有同步所有日志,例如:Leader的currentTerm = 5,自己含有term=3或4的log没有applied,导致此时leader会返回旧数据。

    解决方案:当任期更改添加no-op,添加一个空的log使peers达到共识,并使leader的日志同步到达最新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//implement Read-Only in raft , read operation is idempotent
func (kv *KVServer) Get_RO(args *GetArgs, reply *GetReply) { //read-only
.......
kv.mu.Lock()
//Completeness guarantee:send no-op make leader applied all log
if term > kv.Term {
kv.Term = term
        kv.mu.Unlock()
isLeader := kv.getCompleteness(args.CommandId, kv.Term)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
} else {
kv.mu.Unlock()
}
.....
}

//Keep Log Completeness:Send no-op log let leader applied all log
func (kv *KVServer) getCompleteness(CommandId int64, term int) bool {
op := Op{
CommandId: CommandId,
CommandTerm: term,
Opt: "no-op",
}
index, _, is_leader := kv.rf.Start(op)
if !is_leader {
return false
}
ch := kv.GetNotifyChan(index)
DPrintf("S%d send no-op index %d ", kv.me, index)
select {
case <-ch:
return true
case <-time.After(Timeout):
DPrintf("no-op Channel is timeout")
go kv.RecycleOutdateChan(ch)
return false
}
}
  • recovery的时候进行Read-only操作(Recovery Reading):因为read-only操作不写入log(正在recovery的leader也可以返回只读操作),所以不会等待其applied后进行返回,会导致recovery的leader将正在replay的数据返回给client。

    解决方案:加入一个isRecovery变量,当service重启时赋值,只读操作需要让正在恢复的leader应用到所有的log,才能执行只读操作,因此当检测到本次只读操作时leader正在recovery就需要发送no-op让leader强制同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//implement Read-Only in raft , read operation is idempotent
func (kv *KVServer) Get_RO(args *GetArgs, reply *GetReply) { //read-only
.......
kv.mu.Lock()
//Completeness guarantee:send no-op make leader applied all log
if term > kv.Term || kv.isRecovery{

kv.mu.Unlock()
isLeader := kv.getCompleteness(args.CommandId, term)
kv.mu.Lock()
//将isRecovery、term放置在getCompletness
//防止并发客户端的只读请求跳过no-op操作
//例如:讲Term与Recovery放置在
kv.Term = term
kv.isRecovery = false
kv.mu.Unlock()
if !isLeader {
reply.Err = ErrWrongLeader
return
}
} else {
kv.mu.Unlock()
}
.....
}

Read-Only Query More efficiency:

SOFAJRaft 线性一致读实现剖析 | SOFAJRaft 实现原理 · SOFAStack

实现线性一致读最常规的办法是走 Raft 协议,将读请求同样按照 Log 处理,通过 Log 复制和状态机执行来获取读结果,然后再把读取的结果返回给 Client。因为 Raft 本来就是一个为了实现分布式环境下线性一致性的算法,所以通过 Raft 非常方便的实现线性 Read,也就是将任何的读请求走一次 Raft Log,等此 Log 提交之后在 apply 的时候从状态机里面读取值,一定能够保证这个读取到的值是满足线性要求的。

因为每次 Read 都需要走 Raft 流程,Raft Log 存储、复制带来刷盘开销、存储开销、网络开销,走 Raft Log不仅仅有日志落盘的开销,还有日志复制的网络开销,另外还有一堆的 Raft “读日志” 造成的磁盘占用开销,导致 Read 操作性能是非常低效的,所以在读操作很多的场景下对性能影响很大,在读比重很大的 系统中是无法被接受的,通常都不会使用。

在 Raft 里面,节点有三个状态:Leader,Candidate 和 Follower,任何 Raft 的写入操作都必须经过 Leader,只有 Leader 将对应的 Raft Log 复制到 Majority 的节点上面认为此次写入是成功的。所以如果当前 Leader 能确定一定是 Leader,那么能够直接在此 Leader 上面读取数据,因为对于 Leader 来说,如果确认一个 Log 已经提交到大多数节点,在 t1 的时候 apply 写入到状态机,那么在 t1 后的 Read 就一定能读取到这个新写入的数据。

那么如何确认 Leader 在处理这次 Read 的时候一定是 Leader 呢?在 Raft 论文里面,提到两种方法:

  • ReadIndex Read
  • Lease Read

ReadIndex Read

第一种是 ReadIndex Read,当 Leader 需要处理 Read 请求时,Leader 与过半机器交换心跳信息确定自己仍然是 Leader 后可提供线性一致读:

  1. Leader 将自己当前 Log 的 commitIndex 记录到一个 Local 变量 ReadIndex 里面;
  2. 接着向 Followers 节点发起一轮 Heartbeat,如果半数以上节点返回对应的 Heartbeat Response,那么 Leader就能够确定现在自己仍然是 Leader;
  3. Leader 等待自己的 StateMachine 状态机执行,至少应用到 ReadIndex 记录的 Log,直到 applyIndex 超过 ReadIndex,这样就能够安全提供 Linearizable Read,也不必管读的时刻是否 Leader 已飘走;
  4. Leader 执行 Read 请求,将结果返回给 Client。

使用 ReadIndex Read 提供 Follower Read 的功能,很容易在 Followers 节点上面提供线性一致读,Follower 收到 Read 请求之后:

  1. Follower 节点向 Leader 请求最新的 ReadIndex;
  2. Leader 仍然走一遍之前的流程,执行上面前 3 步的过程(确定自己真的是 Leader),并且返回 ReadIndex 给 Follower;
  3. Follower 等待当前的状态机的 applyIndex 超过 ReadIndex;
  4. Follower 执行 Read 请求,将结果返回给 Client。

不同于通过 Raft Log 的 Read,ReadIndex Read 使用 Heartbeat 方式来让 Leader 确认自己是 Leader,省去 Raft Log 流程。相比较于走 Raft Log 方式,ReadIndex Read 省去磁盘的开销,能够大幅度提升吞吐量。虽然仍然会有网络开销,但是 Heartbeat 本来就很小,所以性能还是非常好的。

Lease Read

虽然 ReadIndex Read 比原来的 Raft Log Read 快很多,但毕竟还是存在 Heartbeat 网络开销,所以考虑做更进一步的优化。Raft 论文里面提及一种通过 Clock + Heartbeat 的 Lease Read 优化方法,也就是 Leader 发送 Heartbeat 的时候首先记录一个时间点 Start,当系统大部分节点都回复 Heartbeat Response,由于 Raft 的选举机制,Follower 会在 Election Timeout 的时间之后才重新发生选举,下一个 Leader 选举出来的时间保证大于 Start+Election Timeout/Clock Drift Bound,所以可以认为 Leader 的 Lease 有效期可以到 Start+Election Timeout/Clock Drift Bound 时间点。Lease Read 与 ReadIndex 类似但更进一步优化,不仅节省 Log,而且省掉网络交互,大幅提升读的吞吐量并且能够显著降低延时。

Lease Read 基本思路是 Leader 取一个比 Election Timeout 小的租期(最好小一个数量级),在租约期内不会发生选举,确保 Leader 不会变化,所以跳过 ReadIndex 的第二步也就降低延时。由此可见 Lease Read 的正确性和时间是挂钩的,依赖本地时钟的准确性,因此虽然采用 Lease Read 做法非常高效,但是仍然面临风险问题,也就是存在预设的前提即各个服务器的 CPU Clock 的时间是准的,即使有误差,也会在一个非常小的 Bound 范围里面,时间的实现至关重要,如果时钟漂移严重,各个服务器之间 Clock 走的频率不一样,这套 Lease 机制可能出问题。

Lease Read 实现方式包括:

  1. 定时 Heartbeat 获得多数派响应,确认 Leader 的有效性;
  2. 在租约有效时间内,可以认为当前 Leader 是 Raft Group 内的唯一有效 Leader,可忽略 ReadIndex 中的 Heartbeat 确认步骤(2);
  3. Leader 等待自己的状态机执行,直到 applyIndex 超过 ReadIndex,这样就能够安全的提供 Linearizable Read。

PartB: Key/value service with snapshots

服务层使用快照(Snapshot):

执行流程:

SnapShot正常执行

  1. 调用Snapshot() :KV-Server在applier中应用日志时要判断,raft_State是否超过规定值,超过maxraftstate使用snapshot(sm_state) 拍摄状态机的状态快照,也就是将服务器的数据库、Session最后一次响应进行快照拍摄
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (kv *KVServer) needSnapshot() bool {
size := kv.rf.Persister.RaftStateSize()
if kv.maxraftstate == -1 {
return false
}
return kv.maxraftstate <= size
}
func (kv *KVServer) takeSnapshot(CommandIndex int) {
DPrintf("S%d Taking Snapshot CommandIndex %d", kv.me, CommandIndex)
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.sm.mkv)
e.Encode(kv.Session)
data := w.Bytes()
kv.rf.Snapshot(CommandIndex, data)
}

Think about when a kvserver should snapshot its state and what should be included in the snapshot.

Your kvserver must be able to detect duplicated operations in the log across checkpoints, so any state you are using to detect them must be included in the snapshots.

snapshot包含的信息:数据库+Session

  • 数据库:键值表

  • Session:客户端与服务端会话的最后一次request的响应

  1. 压缩日志持久化snapshot: snapshot中会对当前commitIndex进行压缩日志,将raft_state(压缩后的日志)

Crash后恢复

  1. 当所有Server重启后,服务端需要读取snapshot,调用ReadSnapshot(),快速恢复状态机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (kv *KVServer) recovery() {
data := kv.rf.Persister.ReadSnapshot()
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
var mkv *MemoryKV
var session map[int64]LastRespond
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
if d.Decode(&mkv) != nil ||
d.Decode(&session) != nil {
log.Fatal("read error")
} else {
kv.sm.mkv = mkv
kv.Session = session
}
}

处理滞后Follower

  1. raft层: Leader调用InstallSnapshot()发送snapshot给滞后的follower,follower处理此条消息用将snapshot消息 push到channel中

  2. 服务层: 检测本条channel的snap消息,使用CondInstallSnapshot() 对raft的日志进行调整,返回正确后,读取snapshot,将已有的SM_State覆盖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (kv *KVServer) applier() {
//for !kv.killed() {
for msg := range kv.applyCh {
if msg.CommandValid {
...apply log to SM_State
kv.mu.Lock()
if kv.needSnapshot() {
kv.takeSnapshot(msg.CommandIndex)
}
kv.mu.Unlock()
} else if msg.SnapshotValid {
if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {
kv.lastapplied = msg.SnapshotIndex
//updata snapshot to Sm_machine
kv.recovery()
}
}
}
}

调试中的bug

问题描述

服务层与raft层ch通讯延时导致服务层applied信息过慢(可能是锁的原因),而raft_State的logs已经增长的过多。

导致服务层对于raft 通过channel传递上来的消息都要调用一次Snapshot(),但是此时的索引却又很小,一次压缩可能就是压缩一个长度的log,因此下一条命令处理后raft_State仍是很大,又是重复此流程,程序的运行的则会报出logs were not trimmed 日志未裁剪的错误,改报错是超过了指定的测试指定的大小

总结

本次实验完成了KVRaft的所有测试点,对于Raft的博士论文中的讲解对于只读操作进行了实现,但是对于只读操作的调试过程是相对于比较困难的,还好最后完成了只读操作的实现,对于follower读取(Read-Only Query More effieciency)并没有打算进行实现(主要是只读操作花费太多精力了),如果你想要对该操作进行实现可以参考SOFAJRaft 线性一致读实现剖析 | SOFAJRaft 实现原理 · SOFAStack.