这个实验由于时间关系PartB部分只完成了一点,但是我后续还是会更新的,读者可以不介意的话可以观看

ban

Shards实验简述

为何需要Shards?

Shard设计的目的简单来说就是为了并行性(parallel)、多个区域能够有Shards的副本能够加快访问就是是locally

在分布式的事务处理中数据通常情况不会放置到一个副本中,一个事务中通常情况下会包含许多的key分布在不同的副本中,为了保证分区的容错性副本也被设计为一个raft组,这样每个副本中的数据(数据库)就是一个Shard,如何操作这样一个事务具体实现也是和[[Spanner 论文笔记]]中的描述相同。

Sharded KV Service的组成

  • a set of replica groups: 每一个replica Group作为一个副本是Shards的子集,一个副本由一些服务器组成,这些服务器使用Raft来复制组间的Shards。
  • shard controller: shard controller是知道那个Raft Group含有客户请求的键,这些信息叫做Cofiguration,Cofiguration会随着时间进行改变。Client会向Shard controller询问那个replica Group有这个特定的键,replica Group为了查明那个Shards应该用于服务也会询问Shard Controller。为了实现容错shard Controller任然是一个Raft Group。

Challenge

本次实验主要挑战是处理Reconfiguration的问题——组间分片分配的改变

在单一给应该replica Group中,所有的Group members必须同意当一个reconfiguration发生在与Client有关的Put/Append/Get操作。Put可能与reconfigurantion同时到达,reconfigurantion会导致复制组停止对持有Put的key的Shard负责。组中的所有副本必须在Put发生在reconfigurantion之前还是之后达成一致。

reconfigurantion还需要副本组之间的交互。例如,在configuration10中,组G1可能负责分片S1。在configuration11中,组G2可能负责分片S1。在10到11的reconfigurantion过程中,G1和G2必须使用RPC将分片S1的内容(键/值对)从G1移动到G2。

本次实验的通用架构(一个配置服务和一组副本组)遵循与Flat Datacenter Storage、BigTable、Spanner、FAWN、Apache HBase、Rosebud、Spinnaker和许多其他相同的通用模式。然而,这些系统在许多细节上与这个lab不同,而且通常也更复杂和更有能力。

Part A: The Shard controller

实验概述

Shardctrler管理着一个序列的cofiguration,每个configuration描述着一个组的replica Group和分配给replica的分片。每当这个分配需要改变时,Shardctrler就会用新的recofiguration一个新的configuration。K/V客户端和服务器在想要知道当前(或过去)配置时联系shardctrler。

该实验也是一个像[[KVRaft]]一样的设计,需要检测冗余的设置

HINTS: Go maps are references. If you assign one variable of type map to another, both variables refer to the same map. Thus if you want to create a new Config based on a previous one, you need to create a new map object (with make()) and copy the keys and values individually.

本次Part A最重要的是实现分片的移动的问题(建议是自己独立实现),对于新添加group与删除group都应该做出对应的处理,其余的设计与lab3的设计相似,需要在applier中进行判断操作的类型并做出相应的处理。

Configuration

Configuration的参数是每个config的id、Shards一个整数数组存放GID、一个map Group ID对应的servers名称列表。我们将会根据Shards中的GID来分配,意思也是这个Shard最多由10replica Group来分配。例如有3个Group为1,2,3我们就需要将10个shards均匀的分配给这三个group可以是 {1,1,1,2,2,2,3,3,3,3}、 {1,1,1,2,2,2,2,3,3,3}、{1,1,1,1,2,2,2,3,3,3}这三种情况来是Shards的服务达到负载均衡

1
2
3
4
5
6
7
const NShards = 10

type Config struct {
    Num    int              // config number
    Shards [NShards]int     // shard -> gid
    Groups map[int][]string // gid -> servers[]
}

RPC完善

Part A我们需要去完成Join、Leave、Move和Query等RPC,这些RPC都是用于允许管理员(Client)去控制Shard controller:添加replica Group、删除replica Group、在replica Group间移动shards(RPC的参数也是在comm.go中进行添加)

Join RPC

管理者使用这个RPC去添加新的replica group,实参是mappings从唯一的,非零的replica group的标识符(GID)到服务器的名称列表的集合。当包含了新的replica group的时候shardctrler应该更新configuration,新的configuration应该在完整的组集中尽可能均匀地分配碎片,并且应该尽可能少地move shards以实现该目标。shardctrler应该允许重用GID,只要GID不是当前configuration的一部分.

Join分片策略

在设计上我们需要尽可能少的去移动分片以达到分片的服务的负载平衡,如我们在{1,1,1,1,1,2,2,2,2,2} ,G1与G2各有五个分片,此时我们添加有个G3,则我们需要重新分配一下每个组的分片,最大组与最小组的分片个数差不应该大于1,分配应该时G1:4,G2:3,G3:3 这样此时我们需要从G1中分配一个分片给G3,G2中分配两分片给G3,这样便可以到达负载均衡,结果为{1,1,1,1,3,2,2,2,3,3}

NOTE:在go语言中map的遍历是无序的,因此为了保持整个leader与follower的shards的分配是一致,需要将每个从map遍历中获取的gids进行一次排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (sc *ShardCtrler) MoveShards_Join(new_gids []int, add_gids []int, Shards [NShards]int) [NShards]int {
Assign_shards := make(map[int]int) //gid-> the num of assigning shards
Gshards := len(Shards) / len(new_gids)
Remain_shards := len(Shards) % len(new_gids)

//assign shards to group
//均匀分配给现存的groups
for _, g := range new_gids {
if Remain_shards != 0 {
Remain_shards--
Assign_shards[g] = Gshards + 1
} else {
Assign_shards[g] = Gshards
}
}

move_gid_i := 0 //the index of add_gids
move_gid := add_gids[move_gid_i] //assign the shard to add_id

for i, gid := range Shards {
if Assign_shards[gid] != 0 {
Assign_shards[gid]--
} else {
//当执行多个move操作后导致shard的负载不均衡
//因此需要使用改方法来调节
if Assign_shards[move_gid] == 0 && len(add_gids) <= move_gid_i+1 {
for id, shards := range Assign_shards {
if shards > 0 {
var is_exist bool
for m_i := i; m_i < len(Shards); m_i++ {
if Shards[m_i] == id {
is_exist = true
}
}
if !is_exist {
move_gid = id
break
}
}
}
}
//move shards
Shards[i] = move_gid
Assign_shards[move_gid]--
}
//更换需要添加分片的组
if Assign_shards[move_gid] == 0 && len(add_gids) > move_gid_i+1 {
move_gid_i++
move_gid = add_gids[move_gid_i]
}
}
return Shards
}

各个参数的意思

  1. new_gid: join后的分组,组号数组

  2. add_gid: join加入的分组,可能是多个组,后续用于修改shards对应索引的gid

  3. Assign_shards: 一个map用于组号与分片数量的映射,[Gid]shards

  4. Gshards: 每个组应该获得的“商”分片,也就是总分片/组数

  5. Remain_shards: 总分片%组数 获得的余数,用于均匀的分配分片

Leave RPC

参数是先前加入组的gid列表。shardctrler创建一个不包括这些组的新configuration,并将这些组的碎片分配给其余组。新的配置应该在组中尽可能均匀地分配碎片,并且应该尽可能少地移动碎片以实现该目标。

Leave分片策略

其原理与Join的相关参数相同,都是要尽可能通过少的去移动分片到达平衡。例如{1,1,1,4,2,2,2,3,3,4}分片的策略,此时我们需要移除G1这个组,则我们应该将G1的分片分配给G2,G3,G4。结果为{2,3,4,4,2,2,2,3,3,4}.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (sc *ShardCtrler) MoveShards_Leave(new_gids []int, leave_gids []int, Shards [NShards]int) [NShards]int {
Assign_shards := make(map[int]int) //gid-> the num of assigning shards
Gshards := len(Shards) / len(new_gids)
Remain_shards := len(Shards) % len(new_gids)

//assign shards to group
//均匀分配分片给现存的group
for _, g := range new_gids {
if Remain_shards != 0 {
Remain_shards--
Assign_shards[g] = Gshards + 1
} else {
Assign_shards[g] = Gshards
}
}

//提取出需要添加分片的Gid
for _, g := range Shards {
if Assign_shards[g] > 0 {
Assign_shards[g]--
}
}
//获得需要移动分片的gid的数组
var move_gids []int
for k, v := range Assign_shards {
if v > 0 {
move_gids = append(move_gids, k)
}
}
//不需要移动分片
if len(move_gids) == 0 {
return Shards
}
//map是无序遍历,对move_gids重新排序
sort.Ints(move_gids)

//load re-balance
move_gid_i := 0 //the index of add_gids
move_gid := move_gids[move_gid_i] //assign the shard to newGroups(gid)

//将leave_group的分片给剩余的groups
for i, g := range Shards {
for _, lg := range leave_gids {
if g == lg {//shards索引对应的id是需要移除的组号
if Assign_shards[move_gid] > 0 {
Shards[i] = move_gid
Assign_shards[move_gid]--
}
}
}
//跟换需要添加分片的组号
if Assign_shards[move_gid] == 0 && len(move_gids) > move_gid_i+1 {
move_gid_i++
move_gid = move_gids[move_gid_i]
}
}
return Shards
}

Move RPC

参数是一个shard号和一个GID。shardctrler应该创建一个新的配置,其中将分片分配给组。Move的目的是让我们能够测试我们的软件。Move后的join或leave可能会取消Move,因为join和leave重新平衡。(也就是将相应的Shard位置的Server改为Move的参数)(逻辑十分简单,可以自行完成)

Query RPC

RPC的参数是一个configuration号。shardctrler回复具有该编号的configuration。如果该编号为-1或大于已知的最大配置编号,则shardctrler应返回最新配置。Query(-1)的结果应该反映shardctrler在接收到Query(-1) RPC之前完成处理的每个Join、Leave或Move RPC。(逻辑十分简单,可以自行完成)

Part B:Sharded Key/Value Server

实验概述

PartB的实验简单来说就是Shards + KVServer,也就是我们有多台KVServer作为数据库的分片存储,因此我们可以将lab3中的代码迁移到lab4中,值得注意的是shardctrler(PartA)与本环节的搭配。

组成成分:

  • Client: 提供接口向ctrler与raft group进行通讯,如一个应用一样

  • shardctrler: 一个Raft Group,拥有指定Shard的对应Replica Group的信息

  • Replica Group: 一个Raft Group,存储了一个KV数据库表示着一个shard

在Client.go中有个接口叫做Key2shard(),通过该函数判断那个分片有我们请求的数据,实现原理简单来说就是根据输入的key进行hash(%10),得到的余数(也就是Shards中索引)作为返回值。在PartA中我们也知道一个索引在shards数组中对应的是一个Gid

主要任务

  • 通过第一个测试点没有配置改变的情形是静态的分片,将KVServer的基础上+shardctrler边可以通过该测试点

  • 处理动态配置的情形,同一个Replica Group中的所有server的需要对数据进行迁移,如join操作后,原来只有一个分组G1的Configuration[1,1,1,1,1,1,1,1,1,1],加入分组G2变为[1,1,1,1,1,2,2,2,2,2] ,因此G1需要检查配置变化并将shard5-shard9的数据迁移到G2。

实验过程交互图: 以3个Replica Group为例。开始系统会创建一个 shardctrler 组来负责配置更新,分片分配等任务,接着系统会创建多个 raft 组来承载所有分片的读写任务。此外,raft 组增删,节点宕机,节点重启,网络分区等各种情况都可能会出现。

interact of ShardKV.png

实现-概述

Relica Group应该周期性(<=100ms)的去轮询shardctrler配置是否发生改变,如果发生改变需要立即去处理迁移问题

Replica Group之间应该提供RPC,以便能够在发现配置改变后去转移分片(也就是传送KV表),shardctrler的Config结构体包含服务器名称,需要一个labrpc.ClientEnd去发送RPC。使用传递给StartServer()的make_end()函数将服务器名转换为客户端。

修改Lab3的内存模型

  • 将KV表转换为多个(NShards=10)Shard便于移动或删除
  • 添加深拷贝接口,map是一个引用
  • 在申请内存时需要先建立NShards个Shard分片,之后给每个分片申请内存
  • 添加Migration的Shard处理接口
    • 服务端: Shard_Migration()对请求的分片进行复制
    • 客户端: Shard_Get()将服务端请求回来的数据进行同步复制(WAL)到follower并应用
  • 添加分片状态:
    1. Pulling:该分片正处理拉取阶段,需要等待拉取成功,Get与PutAppend都需要等待Pulling状态改变
    2. Empty:该分片为空,GET进行读取时也需要等待,PutAppend操作进行写入时可以不用等待
    3. Used:分片已被使用,说明该分片是有数据的,可以进行直接进行读写,不需要任何等待

注意: 在lab3的kvserver中我们的snapshot的设计的快照存储需要进行对应的调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
type ShardState int

const (
Pulling = 1
Empty = 2
Used = 3
)

type ShardStateMachine struct {
mShards map[int]*Shard//也可以使用数组替代map
}

type Shard struct {
KV map[string]string
State ShardState
}

func NewShard() *Shard {
return &Shard{make(map[string]string), Empty}
}

func (mShard *Shard) KV_Get(key string) (string, Err) {
if value, ok := mShard.KV[key]; ok {
return value, OK
}
return "", ErrNoKey
}

func (mShard *Shard) KV_Put(key, value string) Err {
mShard.KV[key] = value
return OK
}

func (mShard *Shard) KV_Append(key, value string) Err {
if mShard.KV[key] == "" {
mShard.KV[key] = value
} else {
mShard.KV[key] += value
}
return OK
}

func (mShard *Shard) Shard_Migrate() (map[string]string, Err) {
KV := mShard.DeepCopy().KV
mShard.KV = nil
mShard.KV = make(map[string]string)
return KV, OK
}

func (mShard *Shard) Shard_Get(newKV map[string]string) Err {
for k, v := range newKV {
mShard.KV[k] = v
}
return OK
}

func (mShard *Shard) DeepCopy() *Shard {
newShard := NewShard()
for k, v := range mShard.KV {
newShard.KV[k] = v
}
return newShard
}

func (StateMachine *ShardStateMachine) ApplyToStateMachine(op Op) CommandRespond {
var cr CommandRespond

shard := key2shard(op.Key)

switch op.Opt {
case "Get":
cr.Value, cr.Err = StateMachine.mShards[shard].KV_Get(op.Key)
case "Append":
cr.Err = StateMachine.mShards[shard].KV_Append(op.Key, op.Value)
StateMachine.mShards[shard].State = Used
case "Put":
cr.Err = StateMachine.mShards[shard].KV_Put(op.Key, op.Value)
StateMachine.mShards[shard].State = Used
case "Migrate":
cr.KV, cr.Err = StateMachine.mShards[op.Shard].Shard_Migrate()
StateMachine.mShards[op.Shard].State = Empty
case "SetShard":
cr.Err = StateMachine.mShards[op.Shard].Shard_Get(op.KV)
StateMachine.mShards[op.Shard].State = Used
}
return cr
}

拉取最新的Configuration

获取配置(poll)像applier的异步设计一样,我们开启一个协程进行周期性(不大于100ms)的获取最新配置,而且在获得到新配置后,通过判断其分片状态与组号进行修改分片的状态

Pulling State: 在migration中进行拉取对应的shard的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (kv *ShardKV) poll() {
for !kv.killed() {
kv.mu.Lock()
for iShard := range kv.CurrentCfg.Shards {
//cancel old cfg of the Pulling state
if kv.StateMachine.mShards[iShard].State == Pulling &&
len(kv.StateMachine.mShards[iShard].KV) == 0 {
kv.StateMachine.mShards[iShard].State = Empty
} else if kv.StateMachine.mShards[iShard].State == Pulling &&
len(kv.StateMachine.mShards[iShard].KV) > 0 {
kv.StateMachine.mShards[iShard].State = Used
}
}

newCfg := kv.mck.Query(-1)
//check change cfg
if kv.Is_CfgChanged(newCfg.Num) {
//kv.LastCfg = kv.CurrentCfg
if _, isleader := kv.rf.GetState(); isleader {
DPrintf("G%v{S%v} poll Config change Cfg.Shards:%v CfgNum %d",
kv.gid, kv.me, newCfg.Shards, newCfg.Num)
}
kv.CurrentCfg = newCfg

}
//convert shard state to pull
for iShard, gid := range kv.CurrentCfg.Shards {
if kv.StateMachine.mShards[iShard].State == Empty && gid == kv.gid {
kv.StateMachine.mShards[iShard].State = Pulling
}
}
kv.mu.Unlock()
time.Sleep(75 * time.Millisecond)
}
}

NOTE: 需要增加WrongGroup的检测

分片迁移

思考:

  1. 配置改变后,以何种方式进行迁移分片(push or pull)?
  2. 在何种情况判断需要进行转移分片
  3. 如何设计RPC?
  4. 如何处理RPC?
  5. 如何处理RPC的回复(WAL)?
  1. 迁移分片的方式: 迁移分片有两种策略Pull与Push,两种方案没有具体的难度区别,我的设计采用的是Pull:

    • Push: 当G1的Leader发现配置变化(添加G2),那么G1的Leader向G2的服务器发送RPC(含有Shard的数据),等待RPC的回复。

      • Pull: 当G2的Leader发现配置变化(添加G2),那么G2的Leader的Leader向G1发送RPC,获得Shard的数据。
  2. 迁移分片(Pull)条件: 我们开启一个协程叫做migration,通过周期性的观察观察新旧配置和判断shard的状态来判断是否需要进行pull。

lastCfg=[1,1,1,1,1,1,1,1,1,1](Cfg.Num=1),currentCfg=[1,1,1,1,1,2,2,2,2,2](Cfg.Num=2),每个Replica Group的migration的协程中进行处理。那么我们新加入的G2需要shard5-shard9这五个分片的数据,那Leader需要遍历Cfg.Shards中的gid,若gid = kv.gid并且判断该分片的状态,为Empty说明需要向target group进行pull分片。

发送MigrationRPC:为了提高效率,采用了goroutine与waitgroup的方式进行并发发送RPC。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (kv *ShardKV) migration() {
for !kv.killed() {
var wg sync.WaitGroup
if _, is_leader := kv.rf.GetState(); is_leader {
kv.mu.Lock()
for iShard := range kv.CurrentCfg.Shards {
//current sending pull request
if kv.StateMachine.mShards[iShard].State == Pulling &&
len(kv.StateMachine.mShards[iShard].KV) == 0 {
wg.Add(1)
go func(iShard int) {
kv.mu.Lock()
defer wg.Done()
kv.CommandId++
KV := kv.StartPull(iShard)
//handle the err KV
kv.ReplicateShard(KV, iShard)
kv.mu.Unlock()
}(iShard)
}
}
kv.mu.Unlock()
wg.Wait()
}
time.Sleep(80 * time.Millisecond)
}
}
  1. RPC设计: 我们采用pull的方式,当主机发现了配置改变后,向拥有分片的分组发送请求迁移分片,此时我们的发送RPC的Replica Group是作为客户端进行发送请求(意味着需要像lab3的实验中client一样设计ClientId与CommandId)。

    1
    2
    3
    4
    5
    6
    type MigrateArgs struct {
    ClientId int64
    CommandId int64
    Shard int
    CfgNum int
    }

type MigrateReply struct {
KV map[string]string
Err Err
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
4. **处理RPC:**  接收方作为**服务端**接收RPC并发送了Raft层进行同步(**Migration操作不是幂等操作**)引用,因此这个操作的处理需要进行**冗余判断**。
```go
func (kv *ShardKV) Migrate(args *MigrateArgs, reply *MigrateReply) {
...//一些错误判断

if kv.CurrentCfg.Num < args.CfgNum {
DPrintf("G%d{S%d} outdate Cfg %d Args.Cfg %d", kv.gid, kv.me, kv.CurrentCfg.Num, args.CfgNum)
kv.mu.Unlock()
return
}
if kv.StateMachine.mShards[args.Shard].State == Empty {
reply.Err = ErrEmpty
reply.KV = nil
kv.mu.Unlock()
return
}

DPrintf("G%v{S%v} receive Migrate(pull) message", kv.gid, kv.me)
op := Op{
ClientId: args.ClientId,
CommandId: args.CommandId,
Opt: "Migrate",
CfgNum: args.CfgNum,
Shard: args.Shard,
CommandTerm: term,
}
kv.mu.Unlock()

index, _, is_leader := kv.rf.Start(op)
if !is_leader {
reply.Err = ErrWrongLeader
return
}

//handle the channle reply
}
  1. 处理RPC回复(Replicat Shard):

我们为什么需要通过调用Raft接口(Start)预写日志(WAL)来实现同步呢?

在Lab2、3中我们都深刻的体会到了Raft所提供的容错的作用,主要是在Leader服务器崩溃后实现数据的容错(follower的Shard的数据与leader相同),也能满足线性一致性

NOTE: 我们在migration()中是开启了锁,在处理RPC的回复(也就是复制Shard)是我们是调用了Start接口,因此我们在Raft层与Server层都有锁,造成死锁问题,需要在调用Start时解锁,并在收到applych的消息后进行加锁

分片迁移交互过程图如下:
image.png

  1. 新加入的组,根据配置向有指定分片的组发送Migration RPC
  2. 服务分片的组,处理MigrationRPC将指定的分片的数据进行深拷贝并删除的日志同步到follower并应用。
  3. 服务分片的组,返回MigrationRPC作为回复
  4. 新加入分组根据RPC的信息,写入日志同步到follower并应用
    Lab4B实验添加的流程大致就是如前两幅图所示的情形。接着我们还要继续讨论一些细节,例如并发请求的处理、配置丢失等问题。

细节处理

1.no-op空日志添加

在lab3中,实现了对于Read only query,在其中就使用了no-op的空日志使状态机重演(replay)以提交日志,使状态机的日志达到最新。

在本次实验中我们仍然需要使用no-op去使状态机达到最新,因为对于shard实验,我们需要考虑到shard的状态以及配置号是否匹配的问题,我们需要使状态机的shard恢复到以前的状态,这样才能使Get操作能够正常运行。

2.target_gid的寻找:

我们每次都需要通过查找旧配置与新配置比较,获知哪个组存储了指定分片,获得到一个target_gid后,但是发送MigrationRPC,回复的分片的数据并没有数据。

原因:

实验中PutAppend的操作间隔短,poll、migration周期尚未到达,导致我们在获取最新配置后跳过了很多的配置,因此我们需要进行一次重新获取target_gid

优化:

我们同时可以进行优化,RPC发送到Server回复的时间过长,我们需要记录下已经发送的target_gid避免无效的浪费。

3.MigrationRPC错误回复的处理

通过target_gid进行pull分片,如果跳过了许多配置导致target_gid,只能返回一个ErrEmpty,此时就需要寻找新的target_gid,进行重新pull

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// send Migration RPC
func (kv *ShardKV) SendMigrationRPC(iShard int, Group []string) (map[string]string, Err) {
/*...*/

for {
for _, si := range Group { //find the leader of target group
var reply MigrateReply
srv := kv.make_end(si)
ok := srv.Call("ShardKV.Migrate", &args, &reply)
if !ok || (ok && (reply.Err == ErrWrongLeader ||
reply.Err == ErrTimeout)) {
continue
}
if reply.Err == ErrWrongGroup || reply.Err == ErrOutdate ||
reply.Err == ErrEmpty {
return nil, reply.Err
}
return reply.KV, OK
}
}
}

// find targetid to send RPC
func (kv *ShardKV) StartPull(iShard int) map[string]string {
var KV map[string]string
var target_gid int
var Groups map[int][]string
var Err Err
CfgNum := kv.CurrentCfg.Num
term, _ := kv.rf.GetState()
for !kv.killed() {
// find target gid to pull shard
target_gid, CfgNum, Groups = kv.GetTargetId(iShard, kv.gid, CfgNum)
if target_gid != 0 {
//crecete gid chage shard state
Group := Groups[target_gid]

KV, Err = kv.SendMigrationRPC(iShard, Group)
//err targetid
if Err == ErrEmpty {
continue
}
if Err == ErrOutdate || Err == ErrWrongGroup {
return nil
}
break
} else {
return nil
}
}
return KV
}

4.服务端数据不匹配客户端的请求:

配置增长过大时有可能会导致客户端向请求服务端的数据回复了一个空数据(违背了线性一致),主要原因也是客户端请求的组还未Pull到正确的target_gid。

解决方案

  1. 当我们发现Shard状态为Pulling时,不能只回复一个ErrNoKey

当服务端处理Get请求发现Shard状态不为Used时

  • 我们可以让客户端进行重发Get请求
  • 或者等待一段时间让Shard的数据得到更新(Get是幂等操作是否Apply并不是很重要)

我的设计是将主处理线程暂停100毫秒,让poll与migrate线程去处理分片。如果还是错误则返回客户端,更新完配置后,重新发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*-------------server.go-------------------*/
if ishard := key2shard(args.Key); kv.StateMachine.mShards[ishard].State != Used {
count := 0
for kv.StateMachine.mShards[ishard].State != Used {
if count == 5 {
reply.Err = ErrOutdate
DPrintf("G%d Shard[%d] return ErrOutdate To Client", kv.gid, ishard)
kv.mu.Unlock()
return
}
count++
time.Sleep(20 * time.Millisecond)
}
}
/*-------------client.go-------------------*/
if ok && (reply.Err == ErrWrongGroup || reply.Err == ErrOutdate) {
if reply.Err == ErrOutdate {
ck.config = ck.sm.Query(-1)
args.CfgNum = ck.config.Num
DPrintf("resend")
}
break
}

当服务端处理PutAppend请求发现Shard状态为Pulling时

  • 在请求Start前进行判断Shard是否为Pulling,来决定是否停止下来等待pull和replicate shard操作完成
  1. 首先在处理Get与PutAppend时,要判断客户端与服务端的配置是否相同 :如果客户端是过期或是比服务器高的配置请求都会导致读写操作出现问题。写操作会导致分片与组号不匹配,读操作会导致一直读一个错误的分片导致死锁。
1
2
3
4
5
6
if kv.CurrentCfg.Num != args.CfgNum {
reply.Err = ErrOutdate
DPrintf("G%d Cfg is Outdate", kv.gid)
kv.mu.Unlock()
return
}