MIT 6.824 lab1:mapreduce

mac2022-06-30  111

这是 MIT 6.824 课程 lab1 的学习总结,记录我在学习过程中的收获和踩的坑。

我的实验环境是 windows 10,所以对lab的code 做了一些环境上的修改,如果你仅仅对code 感兴趣,请移步 : github/zouzhitao

mapreduce overview

先大致看一下 mapreduce 到底是什么

我个人的简单理解是这样的: mapreduce 就是一种分布式处理用户特定任务的系统。它大概是这样处理的。

用户提供两个函数

mapFunc(k1,v1)-> list(k2,v2) reduceFunc(k2,list(v2)) -> ans of k2

这个 分布式系统 将用户的任务做分布式处理,最终为每一个 k2 生成答案。下面我们就来描述一下,这个分布式系统是如何处理的。

首先,他有一个 master 来做任务调度。

master

先调度 worker 做 map 任务,设总的 map 任务的数目为 $M$ , 将result 存储在 中间文件 m-i-j 中, $i \in {0,\dots ,M-1}, j \in {0,\dots,R-1}$调度 worker 做 reduce 任务,设总的 reduce 任务数目为 $R$, 将答案储存在 $r_j$然后将所有的renduce 任务的ans merge起来作为答案放在一个文件中交给用户。

detail 都在实验中

detail

这部分讲 实验内容(观看code), 不过不按照 lab 顺序将。个人认为 做lab的目的,不是做lab 而是为了搞懂 mapreduce system

master

我们先来看看 master 这部分的代码

// Master holds all the state that the master needs to keep track of. type Master struct { sync.Mutex address string doneChannel chan bool // protected by the mutex newCond *sync.Cond // signals when Register() adds to workers[] workers []string // each worker's UNIX-domain socket name -- its RPC address // Per-task information jobName string // Name of currently executing job files []string // Input files nReduce int // Number of reduce partitions shutdown chan struct{} l net.Listener stats []int }

master 维护了执行一个 job 需要的所有状态

master.run

这部分是 master 具体做的事情

// Distributed schedules map and reduce tasks on workers that register with the // master over RPC. func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) { mr = newMaster(master) mr.startRPCServer() go mr.run(jobName, files, nreduce, func(phase jobPhase) { ch := make(chan string) // worker 的地址 go mr.forwardRegistrations(ch) schedule(mr.jobName, mr.files, mr.nReduce, phase, ch) }, func() { mr.stats = mr.killWorkers() mr.stopRPCServer() }) return } // run executes a mapreduce job on the given number of mappers and reducers. // // First, it divides up the input file among the given number of mappers, and // schedules each task on workers as they become available. Each map task bins // its output in a number of bins equal to the given number of reduce tasks. // Once all the mappers have finished, workers are assigned reduce tasks. // // When all tasks have been completed, the reducer outputs are merged, // statistics are collected, and the master is shut down. // // Note that this implementation assumes a shared file system. func (mr *Master) run(jobName string, files []string, nreduce int, schedule func(phase jobPhase), finish func(), ) { mr.jobName = jobName mr.files = files mr.nReduce = nreduce fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName) schedule(mapPhase) schedule(reducePhase) finish() mr.merge() fmt.Printf("%s: Map/Reduce task completed\n", mr.address) mr.doneChannel <- true }

schedule

我们需要实现的其实是这个 schedule 也是最核心的, schedule 实现任务调度,注意这里有 $M$ 个 map 任务,$R$ 个 reduce 任务,只有 $n$ 个 worker, 通常情况下,$M>n,R>n$ 这样才能尽可能利用 worker 的性能,让流水线充沛。

// // schedule() starts and waits for all tasks in the given phase (mapPhase // or reducePhase). the mapFiles argument holds the names of the files that // are the inputs to the map phase, one per map task. nReduce is the // number of reduce tasks. the registerChan argument yields a stream // of registered workers; each item is the worker's RPC address, // suitable for passing to call(). registerChan will yield all // existing registered workers (if any) and new ones as they register. // func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { var ntasks int var nOther int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mapFiles) nOther = nReduce case reducePhase: ntasks = nReduce nOther = len(mapFiles) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther) // All ntasks tasks have to be scheduled on workers. Once all tasks // have completed successfully, schedule() should return. // // Your code here (Part III, Part IV). // //Part III var wg sync.WaitGroup wg.Add(ntasks) for i := 0; i < ntasks; i++ { go func(i int) { defer wg.Done() filename := "" if i <= len(mapFiles) { filename = mapFiles[i] } taskArgs := DoTaskArgs{ JobName: jobName, File: filename, Phase: phase, TaskNumber: i, NumOtherPhase: nOther, } taskFinished := false for taskFinished == false { workAddr := <-registerChan taskFinished = call(workAddr, "Worker.DoTask", taskArgs, nil) go func() { registerChan <- workAddr }() } }(i) } wg.Wait() fmt.Printf("Schedule: %v done\n", phase) }

schedule 要做的事情就是对于每一个任务,调用 call 函数去执行 一个rpc调用,让 worker 执行 Worker.DoTask 这是 PART III/IV 的代码。

这里注意几点细节

registerChan 用的是管道,传输可用worker 的地址,所以 执行完一个 task之后要将 worker 的地址重新放到 registerChanmaster 是串行调度的,也就是说他要等待所有 map 任务做完,才会调度 reduce 任务,所以在schedule 里不能提前返回,要等待 说有task完成

接下来我们来看看这个 call 到底干了什么,其实它调用了 worker.DOTASK, 所以我们简单看看 worker.Dotask 干了什么就好

worker

// DoTask is called by the master when a new task is being scheduled on this // worker. func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error { //... switch arg.Phase { case mapPhase: doMap(arg.JobName, arg.TaskNumber, arg.File, arg.NumOtherPhase, wk.Map) case reducePhase: doReduce(arg.JobName, arg.TaskNumber, mergeName(arg.JobName, arg.TaskNumber), arg.NumOtherPhase, wk.Reduce) } //.... }

它核心就是调用了 doMap 和 doReduce

这也是 PART 1 的类容,我们来看看 doMap 和 doReduce 做了什么

doMap

func doMap( jobName string, // the name of the MapReduce job mapTask int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(filename string, contents string) []KeyValue, ) { // // doMap manages one map task: it should read one of the input files // (inFile), call the user-defined map function (mapF) for that file's // contents, and partition mapF's output into nReduce intermediate files. // // There is one intermediate file per reduce task. The file name // includes both the map task number and the reduce task number. Use // the filename generated by reduceName(jobName, mapTask, r) // as the intermediate file for reduce task r. Call ihash() (see // below) on each key, mod nReduce, to pick r for a key/value pair. // // mapF() is the map function provided by the application. The first // argument should be the input file name, though the map function // typically ignores it. The second argument should be the entire // input file contents. mapF() returns a slice containing the // key/value pairs for reduce; see common.go for the definition of // KeyValue. // // Look at Go's ioutil and os packages for functions to read // and write files. // // Coming up with a scheme for how to format the key/value pairs on // disk can be tricky, especially when taking into account that both // keys and values could contain newlines, quotes, and any other // character you can think of. // // One format often used for serializing data to a byte stream that the // other end can correctly reconstruct is JSON. You are not required to // use JSON, but as the output of the reduce tasks *must* be JSON, // familiarizing yourself with it here may prove useful. You can write // out a data structure as a JSON string to a file using the commented // code below. The corresponding decoding functions can be found in // common_reduce.go. // // enc := json.NewEncoder(file) // for _, kv := ... { // err := enc.Encode(&kv) // // Remember to close the file after you have written all the values! // // Your code here (Part I). // content := safeReadFile(inFile) ans := mapF(inFile, string(content)) jsonEncoder := make([]*json.Encoder, nReduce) for i := 0; i < nReduce; i++ { f := safeCreaFile(reduceName(jobName, mapTask, i)) jsonEncoder[i] = json.NewEncoder(f) defer f.Close() } for _, kv := range ans { r := ihash(kv.Key) % nReduce err := jsonEncoder[r].Encode(&kv) if err != nil { log.Fatal("jsonEncode err", err) } } } 读取文件内容调用用户的 mapF 生成一系列的 key/val 将所有的 key/val list 以key hash 到每个 reduce 文件中 也就是说,每个 map 任务产生 $nReduce$ 个中间文件,因此总共有 MxR 个中间文件产生,同时 由于 是以key hash 到reduce 任务的,可以保证同样的 key 一定到同一个 reduce

reduce

func doReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { // // doReduce manages one reduce task: it should read the intermediate // files for the task, sort the intermediate key/value pairs by key, // call the user-defined reduce function (reduceF) for each key, and // write reduceF's output to disk. // // You'll need to read one intermediate file from each map task; // reduceName(jobName, m, reduceTask) yields the file // name from map task m. // // Your doMap() encoded the key/value pairs in the intermediate // files, so you will need to decode them. If you used JSON, you can // read and decode by creating a decoder and repeatedly calling // .Decode(&kv) on it until it returns an error. // // You may find the first example in the golang sort package // documentation useful. // // reduceF() is the application's reduce function. You should // call it once per distinct key, with a slice of all the values // for that key. reduceF() returns the reduced value for that key. // // You should write the reduce output as JSON encoded KeyValue // objects to the file named outFile. We require you to use JSON // because that is what the merger than combines the output // from all the reduce tasks expects. There is nothing special about // JSON -- it is just the marshalling format we chose to use. Your // output code will look something like this: // // enc := json.NewEncoder(file) // for key := ... { // enc.Encode(KeyValue{key, reduceF(...)}) // } // file.Close() // // Your code here (Part I). // kvs := make(map[string][]string) for i := 0; i < nMap; i++ { kv := jsonDecode(reduceName(jobName, i, reduceTask)) for _, v := range kv { kvs[v.Key] = append(kvs[v.Key], v.Value) } } f := safeCreaFile(outFile) defer f.Close() enc := json.NewEncoder(f) for k, v := range kvs { reduceAns := reduceF(k, v) enc.Encode(KeyValue{k, reduceAns}) } }

reduce 干的事情也很简单,它先读取所有传给它的任务。做成一个 list of key/val

然后调用用户的 reduceF。将答案传给用json 编码到一个文件

PART I 完。

接下来是两个实例

example

这里的两个例子是 word count 和倒排索引 invert index

word count

这个任务,是统计每个单词出现的次数

// // The map function is called once for each file of input. The first // argument is the name of the input file, and the second is the // file's complete contents. You should ignore the input file name, // and look only at the contents argument. The return value is a slice // of key/value pairs. // func mapF(filename string, contents string) []mapreduce.KeyValue { // Your code here (Part II). var ret []mapreduce.KeyValue words := strings.FieldsFunc(contents, func(x rune) bool { return unicode.IsLetter(x) == false }) for _, w := range words { kv := mapreduce.KeyValue{w, ""} ret = append(ret, kv) } return ret } // // The reduce function is called once for each key generated by the // map tasks, with a list of all the values created for that key by // any map task. // func reduceF(key string, values []string) string { // Your code here (Part II). return strconv.Itoa(len(values)) }

part II 完

这里有一点要注意, test 用的是 diff,这个比对会将 \n,\n\r 认成不一样的,注意将ans 中的东西改成 \n 就好。

invert index

// The mapping function is called once for each piece of the input. // In this framework, the key is the name of the file that is being processed, // and the value is the file's contents. The return value should be a slice of // key/value pairs, each represented by a mapreduce.KeyValue. func mapF(document string, value string) (res []mapreduce.KeyValue) { // Your code here (Part V). words := strings.FieldsFunc(value, func(x rune) bool { return unicode.IsLetter(x) == false }) kvmap := make(map[string]string) for _, w := range words { kvmap[w] = document } for k, v := range kvmap { res = append(res, mapreduce.KeyValue{k, v}) } return } // The reduce function is called once for each key generated by Map, with a // list of that key's string value (merged across all inputs). The return value // should be a single output value for that key. func reduceF(key string, values []string) string { // Your code here (Part V). numberOfDoc := len(values) sort.Strings(values) res := strconv.Itoa(numberOfDoc) + " " + strings.Join(values, ",") return res }

这个地方要注意将同一个文档中的重复单词去除掉,用一个 map 储存一下就好

最后说一下环境的坑点

windows 环境注意事项

lab 中注册用的unix 文件地址不能用,我将其改成了 tcp注意改成 tcp 后,worker在 shutdown 的时候 close 掉tcp链接

reference

google mapreduce paperlab1github/zouzhitao code repo

版权声明

本作品为作者原创文章,采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议

作者: taotao

转载请保留此版权声明,并注明出处

转载于:https://www.cnblogs.com/zt-zou/p/10661879.html

相关资源:数据结构—成绩单生成器
最新回复(0)