我们可以用toDebugString方法看看产生了几个RDD
val rdd = sc.textFile("file:///home/hadoop/data/wc.dat") rdd.toDebugString从下图中可以看出,产生了2个RDD,HadoopRDD和MapPartitionsRDD
我们进入textFile源码中进行查看
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() //执行第一步 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],minPartitions) //执行第二步 .map(pair => pair._2.toString).setName(path) }源码中先执行了hadoopFile,再执行了map,我们看下hadoopFile源码(下面的代码)最后返回了一个HadoopRDD,传入的参数有TextInpuFormat, LongWritable(每行数据的偏移量), Text(每行数据的内容),这就是MapReduce时mapper的参数;然后返回的内容是(1,xxxx) (7,yyyy) 而偏移量对我们来说,是没有用的,所以上面再对数据进行了map(pair=>pair._2.toString)就是获取每行的数据内容
所以会产生两个RDD:HadoopRDD和MapPartitionsRDD
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() FileSystem.getLocal(hadoopConfiguration) val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) //重点关注这里 new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }