在《深入理解spark》书中对MapOutputTracker的解释是:mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址以及中间输出结果。每个map任务或者 reduce任务都会有唯一的标识。分别为mapId和reduceId.每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务的所有节点上拉去Block,这一过程交shuffle,每批shuffle过程都有唯一的表示shuffleId。
由sparkEnv代码可以看到,如果是Diver,则创建 MapOutputTrackerMaster,然后创建 MapOutputTrackerMasterEndpoint,并且注册到 Dispatcher 中,注册名为 MapOutputTracker; 若是Excutor,则创建 MapOutputTrackerWorker,并从远端 Driver 实例的 NettyRpcEnv 的 Dispatcher 中查找 MapOutputTrackerMasterEndpoint 的引用。
初始化变量
/** Set to the MapOutputTrackerMasterEndpoint living on the driver. */ //创建RpcEndpoint的引用,RpcEndpoint是rpc的一个端点,在收到特定消息会触发特定的函数 var trackerEndpoint: RpcEndpointRef = _ /** * This HashMap has different behavior for the driver and the executors. * * On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks. * On the executors, it simply serves as a cache, in which a miss triggers a fetch from the * driver's corresponding HashMap. * * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a * thread-safe map. */ //对于driver和executors这个HashMap 会有不同的作用 //对于driver,它作为shufflemaptask记录的map输出源 //对于executors,一个错误会触发驱动器从相应的HashMap获取值 protected val mapStatuses: Map[Int, Array[MapStatus]] /** * Incremented every time a fetch fails so that client nodes know to clear * their cache of map output locations if this happens. */ protected var epoch: Long = 0 protected val epochLock = new AnyRef /** Remembers which map output locations are currently being fetched on an executor. */ //记住当前在执行器上获取的映射输出位置 private val fetching = new HashSet[Int] /** * Send a message to the trackerEndpoint and get its result within a default timeout, or * throw a SparkException if this fails. */ //发送信息到trackerEndpoint,并在规定时间内得到结果,若失败则抛出异常 protected def askTracker[T: ClassTag](message: Any): T = { try { trackerEndpoint.askWithRetry[T](message) } catch { case e: Exception => logError("Error communicating with MapOutputTracker", e) throw new SparkException("Error communicating with MapOutputTracker", e) } } /** Send a one-way message to the trackerEndpoint, to which we expect it to reply with true. */ //向trackerEndpoint发送单向信息,若返回false抛出异常 protected def sendTracker(message: Any) { val response = askTracker[Boolean](message) if (response != true) { throw new SparkException( "Error reply received from MapOutputTracker. Expecting true, got " + response.toString) } } /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given reduce task. * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size) tuples * describing the shuffle blocks that are stored at that block manager. */ //获取指定reduce任务的中需要读取的shuffle块号以及块的大小 def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) } /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but * endPartition is excluded from the range). * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size) tuples * describing the shuffle blocks that are stored at that block manager. */ //获取指定reduce任务的中需要读取的shuffle块号以及块的大小,块需要在给定范围的映射输出分区读取 def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) // Synchronize on the returned array because, on the driver, it gets mutated in place statuses.synchronized { return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } } ```map任务输出跟踪器 获取指定shuffle的输出统计信息以及状态 ```java /** * Return statistics about all of the outputs for a given shuffle. */ //获取所有关于指定shuffle的输出统计信息 def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { val statuses = getStatuses(dep.shuffleId) // Synchronize on the returned array because, on the driver, it gets mutated in place statuses.synchronized { val totalSizes = new Array[Long](dep.partitioner.numPartitions) for (s <- statuses) { for (i <- 0 until totalSizes.length) { totalSizes(i) += s.getSizeForBlock(i) } } new MapOutputStatistics(dep.shuffleId, totalSizes) } } /** * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize * on this array when reading it, because on the driver, we may be changing it in place. * * (It would be nice to remove this restriction in the future.) */ //获取所有关于指定shuffle的状态 private def getStatuses(shuffleId: Int): Array[MapStatus] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") val startTime = System.currentTimeMillis var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { try { fetching.wait() } catch { case e: InterruptedException => } } // Either while we waited the fetch happened successfully, or // someone fetched it in between the get and the fetching.synchronized. fetchedStatuses = mapStatuses.get(shuffleId).orNull if (fetchedStatuses == null) { // We have to do the fetch, get others to wait for us. fetching += shuffleId } } if (fetchedStatuses == null) { // We won the race to fetch the statuses; do so logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) // This try-finally prevents hangs due to timeouts: try { val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId)) fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) } finally { fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } } } logDebug(s"Fetching map output statuses for shuffle $shuffleId took " + s"${System.currentTimeMillis - startTime} ms") if (fetchedStatuses != null) { return fetchedStatuses } else { logError("Missing all output locations for shuffle " + shuffleId) throw new MetadataFetchFailedException( shuffleId, -1, "Missing all output locations for shuffle " + shuffleId) } } else { return statuses } }