package com.caimh.spark import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} /** * Created by caimh on 2019/11/2. */ class CustomPartitioner(numPart: Int) extends Partitioner { override def numPartitions: Int = numPart //自定义分区逻辑 override def getPartition(key: Any): Int = { key.toString.hashCode % numPart } } object CustomPartitionerTest { def main(args: Array[String]) { //创建SparkConf对象 val conf: SparkConf = new SparkConf().setAppName("CustomPartitioner").setMaster("local[*]") //创建SparkContext val sc: SparkContext = new SparkContext(conf) //创建rdd val rdd1: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6), 2) //查看原始分区存放信息 rdd1.mapPartitionsWithIndex((x, y) => Iterator(x + ":" + y.mkString("-"))).foreach(println) //查看自定义分区后的存放信息 rdd1.map((_, 1)).partitionBy(new CustomPartitioner(5)).mapPartitionsWithIndex((x, y) => Iterator(x + ":" + y.mkString("-"))).foreach(println) //释放资源 sc.stop() } }
运行测试:
0:1-2-3 1:4-5-6
0:(2,1) 1:(3,1) 2:(4,1) 3:(5,1) 4:(1,1)-(6,1)
