Lab1:MAPREDUCE

一、基础框架

paper中的mapreduce概念模型

image-20220521154605632

image-20220521154644882

程序调用

coordinator(master)及worker的调用框架

lab1中的主要是把单进程的map-reduce分开执行并且使用coordinator作为master进行调用

1.工具链(静态库)

将map与reduce的方法放置于wc.go中,然后生成工具链

1
go build -race -buildmode=plugin ../mrapps/wc.go

2.挂起coordinator例程

coordinator运行时挂起,打开监听socket,等待worker的请求,并作出响应(返回文件名)

1
go run -race mrcoordinator.go pg-*.txt

第二个参数为pg-*.txt,为输入文件

mrcoordinator中调用mr/coordinator.go中的Makecoordinator并传入input文件名与reduce的任务数量

3.打开worker例程(可以打开多个)

mrworker调用mr/worker.go ,mrworker中获取工具链中的mapf-reducef传入worker.go

1
go run -race mrworker.go wc.so

worker.go中需要向coordinator发出请求

worker.go 调用map程序输出intermediate文件

将intermediate文件传递给reduce完成工作

二、实验分析

上述为一部分网络上的mapduce的流程

实验心得

刚开始做本次实验,由于没有go语言基础以及做项目的基础,导致做本次实验十分的吃力,一共完成了三版

  1. 通过上述思路图的实验第一版较为粗糙以及复杂(结构体套结构体的版本,传递一些无效参数),但是没有处理并发问题,导致很多测试点过不了(踩了Struct成员需要首字母大写的坑)
  2. 实验第二版,实验简化了很多无效使调用关系以及无效参数,调用关系更加明确,但是没有处理并行问题导致出现了任务量达超标的问题。但是莫名其妙的过了Crash测试
  3. 实验第三版,参考了一点别人的代码,修改了请求任务阶段(使用了Channel),并行问题导致了超标的任务量

并行问题(实验第二版)

并发实现

在Coordianor中将每个Worker申请的服务例程放入goroutine中,多个Worker就是多线程

go http.Serve(l, nil)

在分配Map以及Reduce任务时Coordinate的并行调度问题

用GetMapTask进行示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (c *Coordinator) GetMapTask(args *MapArgs, reply *MapReply) error {

for _, m := range c.Mt {
if !m.State {
c.MapMu.Lock()
defer c.MapMu.Unlock()
reply.Inputfile = m.Inputfile
reply.ID = m.ID

reply.Nreduce = c.Nreduce
return nil
}
}
return nil
}

初始想法

遍历MapTask结构体的数组,去判断哪些任务还未被分配,然后将输入文件和文件ID传给Worker

这个方法偶尔会通过Crash测试,分析应该是任务未完成,State是未被置为true导致,因为Coordinator不断去遍历MapTask数组,就能够发现哪些任务未被完成

Jobcount测试

在该测试中,发现了原本为只需要8个Worker去完成的工作,居然出现了16个!通过查看测试脚本发现了,Jobcount中开启了两个Worker,那么回想并行的状态,一定是两个Worker去读了MapTask数组,又同时去更新了任务,导致了出现了两倍的任务量。

解决并行问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for {
select {
case reply.ID = <-c.MapID:
reply.Inputfile = c.Mt[reply.ID].Inputfile
reply.Nreduce = c.Nreduce
/*go func(ID int) {//Crash测试
time.Sleep(10 * time.Second)
c.MapMu.Lock()
defer c.MapMu.Unlock()
if !c.Mt[reply.ID].State {
c.MapID <- ID
}
}(reply.ID)*/
return nil
default:
reply.ID = -1
return nil
}
}
1
2
3
4
5
6
7
//主线程初始化Coordinator
c.MapID = make(chan int, c.Mapnum)
for i := range c.Mt {
c.Mt[i].Inputfile = files[i]
c.Mt[i].State = false
c.MapID <- i
}

分析

在实验的第二版中出现了并行问题在于同时这个点,解决方案就是在于同步的问题,当一个线程读取了数据后,另一个线程不能访问,或是及时将另一个线程阻塞或是告知它读取下个任务

使用了Go语言中的Channel的语法,主例程发送一个ID,就会导致阻塞,然后当有Worker调用GetMapTasK将ID读入,才会又下一个ID发送。读写会相互阻塞,这导致在多个Worker中不会去同时读入一个ID,解决了并发问题

Worker与Coordinator之间的关系

Coordinator(RPC的Server机)

  1. 在Coordinator中需要完成的是定义RPC的方法来完成Worker的请求
  2. Coordinator结构体中是与Worker通信中需要传递的参数,以及分配任务的一些判断依据,以及多线程中的保护共享数据需的互斥锁(Mutex)
  3. 由于是多线程并行问题,Coordiantor需要合适去调度任务

Worker(RPC的客户机)

  1. 在实验中我们可以用一个或则多个(并行)的去向Coordinator申请任务(在多个终端中开启Worker)

go run -race mrworker.go wc.so

  1. Worker需要去完成Map与Reduce两种任务,Map与Reduce的工具在mrapps中有所定义,所以我们只需定义一些请求任务所需的参数(需要在rpc.go中定义),将这许传回的参数进行相应的任务
  2. 一个Worker是双线程去完成相应的任务,Worker需要判断任务是否完成,执行怎样的任务,不断的向Coordinator申请任务
  3. 多个Worker是多线线程并行的去完成相应的任务(任务量测试点)

注意事项:

结构体成员需要以大写字母开始,否则会出现定义为private对象,无法初始化字

三、实验过程

定义对象

rpc中的参数以及回复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//请求Map
type MapArgs struct {
ID int
}
type MapReply struct {
ID int
Inputfile string
Nreduce int
}
//请求reduce
type ReduceArgs struct {
ID int
}
type ReduceReply struct {
ID int
Mapnum int
}

//检查map任务的是否全部完成
type MapfinishArgs struct {
}
type MapfinishReply struct {
Finished bool
}

//check state of map task
type FinishArgs struct {
}
type FinishReply struct {
Finished bool
}

coordinator对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Coordinator struct {
Nreduce int
MapID chan int
Mt []MapTask
Mapnum int //the number of map task
ReduceID chan int
Rt []ReduceTask
Finished bool
MapMu sync.Mutex //map task mutex
ReduceMu sync.Mutex //reduce task mutex
FinishedMu sync.Mutex //Done Mutex
}

type MapTask struct {
Inputfile string //bind file and ID
State bool
}

type ReduceTask struct {
State bool
}

Worker

主要框架

  1. 一个for死循环,不断的去向主例程申请任务
  2. 先判断所有任务是否完成也就是判断(向Coordinator询问Reduce任务是否全部完成),完成的话则执行break结束Worker例程,
  3. 否则进入Map或者Reduce任务,(向Coordinator询问所有Map任务是否完成),Map所有任务完成的话就执行Reduce任务,否则执行Map剩余的Map任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
//finished?
if !AllFinished() {
if !Mapfinished() { //tasktype
//do map
//update worker
} else {
//do reduce
//update task
}
} else {
break
}
}
}

Coordinator

定义Worker例程的调用方法

1
2
3
4
5
6
7
8
9
//MapTask
func (c *Coordinator) GetMapTask(args *MapArgs, reply *MapReply) error
func (c *Coordinator) UpdateMapTask(args *MapArgs, reply *MapReply) error
func (c *Coordinator) MapTaskFinished(args *FinishArgs, reply *FinishReply) error
//RedueceTask
func (c *Coordinator) GetReduceTask(args *ReduceArgs, reply *ReduceReply) error
func (c *Coordinator) UpdateReduceTask(args *ReduceArgs, reply *ReduceReply) error
func (c *Coordinator) ReduceTaskFinished(args *FinishArgs, reply *FinishReply) error
//Init Coordinator Object

Crash开启判断例程(GetMapTask)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
select {
case reply.ID = <-c.MapID:
reply.Inputfile = c.Mt[reply.ID].Inputfile
reply.Nreduce = c.Nreduce
go func(ID int) {
time.Sleep(10 * time.Second)
c.MapMu.Lock()
defer c.MapMu.Unlock()
if !c.Mt[reply.ID].State {
c.MapID <- ID//未完成重复读入
}
}(reply.ID)
return nil
default:
reply.Inputfile = ""
reply.Nreduce = 0
reply.ID = -1
return nil
}

分析

首先知道Worker是一个并行的线程,而这个GetMapTask可以作为Worker线程的一部分,于是Channel可以当作通信的一部分。

设置一个协程,当一个任务Crash掉后,我们要知道是那个任务失败,还要设置一个判断时间,当这个时间过去后判断该状态还未完成,就重新将ID发送,让GetTask能够再次读入并重新执行