MapOutputTrackerMaster

mac2025-08-10  10

MapOutputTrackerMaster diver的map任务输出跟踪器

MapOutputTrackerMaster 继承了MapOutputTracker MapOutputTrackerMaster 是在diver端的MapOutputTracker,在diver端MapOutputTracker要负责维护跟踪各个map任务的输出状态,所以会为存储映射状态和序列化状态各创建一个hashmap,并且还要处理shuffle的请求,在shuffle完成还会再缓存中清除关于shuffle的序列化版本以及信息,为了提高效率使用了多线程的方法。而在excutor端的MapOutputTracker只用作缓存,对输出状态进行查询。

源码清单和我的理解注释

将shuffle 输出状态的序列化后缓存下来,并在后续操作中更新 cacheEpoch 记录缓存中shuffle 输出状态的编号,epoch记录当前shuffle 输出状态的编号,每处理完一个shuffle ,epoch+1

/** Cache a serialized version of the output statuses for each shuffle to send them out faster */ private var cacheEpoch = epoch // Kept in sync with cachedSerializedStatuses explicitly // This is required so that the Broadcast variable remains in scope until we remove // the shuffleId explicitly or implicitly. //缓存中序列化的广播变量 private val cachedSerializedBroadcast = new HashMap[Int, Broadcast[Array[Byte]]]() def incrementEpoch() { epochLock.synchronized { epoch += 1 logDebug("Increasing epoch to " + epoch) } } // Check to see if we have a cached version, returns true if it does // and has side effect of setting retBytes. If not returns false // with side effect of setting statuses //若epoch > cacheEpoch为真,清空缓存中的序列化状态以及清空与上一个shuffle的广播变量 def checkCachedStatuses(): Boolean = { epochLock.synchronized { if (epoch > cacheEpoch) { cachedSerializedStatuses.clear() clearCachedBroadcast() cacheEpoch = epoch } cachedSerializedStatuses.get(shuffleId) match { case Some(bytes) => retBytes = bytes true case None => logDebug("cached status not found for : " + shuffleId) statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus]) epochGotten = epoch false } } } if (checkCachedStatuses()) return retBytes //清空与上一个shuffle有关的广播变量 private def clearCachedBroadcast(): Unit = { for (cached <- cachedSerializedBroadcast) removeBroadcast(cached._2) cachedSerializedBroadcast.clear() }

minSizeForBroadcast 为用广播将映射的输出状态发送到执行器的最小值,若不设置则为512k shuffleLocalityEnabled 为是否计算局部首选项以减少任务,若不设置则为true SHUFFLE_PREF_MAP_THRESHOLD 为map任务的数量 SHUFFLE_PREF_REDUCE_THRESHOLD 为reduce任务的数量 REDUCER_PREF_LOCS_FRACTION 为Map output输出到本地性的比率,是一个可以优化的参数

// The size at which we use Broadcast to send the map output statuses to the executors private val minSizeForBroadcast = conf.getSizeAsBytes("spark.shuffle.mapOutput.minSizeForBroadcast", "512k").toInt /** Whether to compute locality preferences for reduce tasks */ private val shuffleLocalityEnabled = conf.getBoolean("spark.shuffle.reduceLocality.enabled", true) // Number of map and reduce tasks above which we do not assign preferred locations based on map // output sizes. We limit the size of jobs for which assign preferred locations as computing the // top locations by size becomes expensive. private val SHUFFLE_PREF_MAP_THRESHOLD = 1000 // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000 // Fraction of total map output that must be at a location for it to considered as a preferred // location for a reduce task. Making this larger will focus on fewer locations where most data // can be read locally, but may lead to more delay in scheduling if those locations are busy. private val REDUCER_PREF_LOCS_FRACTION = 0.2

mapStatuses 在驱动程序中存储映射状态的hashmap cachedSerializedStatuses 缓存的序列化状态的hashmap shuffleIdLocks 防止shuffle生成多个序列化版本 mapOutputRequests 为映射输出状态请求

// HashMaps for storing mapStatuses and cached serialized statuses in the driver. // Statuses are dropped only by explicit de-registering. protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) // This is to prevent multiple serializations of the same shuffle - which happens when // there is a request storm when shuffle start. private val shuffleIdLocks = new ConcurrentHashMap[Int, AnyRef]() // requests for map output statuses private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]

处理映射输出状态的线程池 若在conf中没设置spark.shuffle.mapOutput.dispatcher.numThreads的数目,则线程数为8

// Thread pool used for handling map output status requests. This is a separate thread pool // to ensure we don't block the normal dispatcher threads. private val threadpool: ThreadPoolExecutor = { val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool }

广播变量的大小须小于RPC机制发送消息的大小,否则报错

if (minSizeForBroadcast > maxRpcMessageSize) { val msg = s"spark.shuffle.mapOutput.minSizeForBroadcast ($minSizeForBroadcast bytes) must " + s"be <= spark.rpc.message.maxSize ($maxRpcMessageSize bytes) to prevent sending an rpc " + "message that is too large." logError(msg) throw new IllegalArgumentException(msg) }

registerShuffle 是向mapStatuses中注册新的Shuffle registerMapOutput 是在mapStatuses添加指定的shuffleId的状态 registerMapOutputs 是向mapStatuses添加指定的shuffleId多个输出状态 unregisterMapOutput 是在mapStatuses中注销指定的shuffleId的输出信息 unregisterShuffle 是在mapStatuses移除指定的shuffleId并在缓存中删除关于shuffleId的信息

def registerShuffle(shuffleId: Int, numMaps: Int) { if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } // add in advance shuffleIdLocks.putIfAbsent(shuffleId, new Object()) } def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { val array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status } } /** Register multiple map output information for the given shuffle */ def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) { mapStatuses.put(shuffleId, statuses.clone()) if (changeEpoch) { incrementEpoch() } } /** Unregister map output information of the given shuffle, mapper and block manager */ def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { val arrayOpt = mapStatuses.get(shuffleId) if (arrayOpt.isDefined && arrayOpt.get != null) { val array = arrayOpt.get array.synchronized { if (array(mapId) != null && array(mapId).location == bmAddress) { array(mapId) = null } } incrementEpoch() } else { throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID") } } /** Unregister shuffle data */ override def unregisterShuffle(shuffleId: Int) { mapStatuses.remove(shuffleId) cachedSerializedStatuses.remove(shuffleId) cachedSerializedBroadcast.remove(shuffleId).foreach(v => removeBroadcast(v)) shuffleIdLocks.remove(shuffleId) }
最新回复(0)