Extract Fact Table Distinct Columns(MR) 流程和源码解析

mac2025-11-06  13

Extract Fact Table Distinct Columns job 进行统计估算和字典编码、范围计算,具体主要做3件事情:1、HLL估算统计每个cuboid的结果条数;2、所有非Derived维度列的范围(min-max);3、所有需要字典编码的列进行字典编码;这3部分内容都会存储到HDFS上,共后续步骤使用。本文所有的讲解根据MR实现进行讲解,Spark大家对照思路进行分析。

执行入口类: MapReduceExecutable.doWork 中调用job类的run方法即调用FactDistinctColumnsJob.run

输入数据: (1)、采样的数据比例,kylin.job.sampling-percentage,default 100,已经第一步flatTable中的数据。

输出目录: (1)、/user/prod_kylin/kylin_engine/BIGDATA_KYLIN-kylin_meadata_test/kylin-jobId/cubeName/fact_distinct_columns/

MR Mapper: (1)、FactDistinctColumnsMapper.java (2)、inputFormat 为flat table(flatTableInputFormat) (3)、Mapper输出: a、需要字典列的每列在此mapper中不同的值,输出一个selfKey(1个字节的reduce编号+某行此列的值+数据类型),value为null; b、每个mapper中所有非Derived维度列且不属于a中的列的每列出现的最大值和最小值,输出一个selfKey(1个字节的reduce编号+此列的max值(会min值)+数据类型),value为null,每个mapper会输出某列两个此类型输出,一个max一个min; c、每个mapper对每个cubid的估算值,输出一个selfKey(固定第一个字节为输出的类型(-1表示为 HLL(HyperLogLog)类型的Reduce编号计算规则,具体参见partitioner计算reduce编号) + 某个cubid(8个字节) +数据类型(此处固定为0)),value为此mapper对此cubid的估算值;

MR Combiner: CombinerFactDistinctColumnsCombiner 由于Mapper的3种输出,其中dict列的value为空,只需要去重key值即可;hll因为每个cubid就一个所以在combiner没有做任何处理; max和min每个mapper也只有一个也没有在commbiner做任何处理;

MR Partitioner: (1)、 PartitionerClass:FactDistinctColumnPartitioner.java 根据mapper key的第一个字节除hll外其他的就是存储的实际的reduce编号直接返回即可,hll类型的则根据这个cubid%hllreducecount(计算hll总的reduce数),然后用nDimReducers的个数加上cubid%hllreducecount值即为此cubid的reduce编号,通常hllreducecount为1,即所有的cubid的hll计算均在一个reduce中完成,同一个cuboid总会分配到同一个reduce计算hll。

计算Reduce编号源码:

@Override public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) { Text key = skey.getText(); if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {//key的第一个字节如果是-1的情况表示是HLL Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); return reducerMapping.getReducerIdForCuboidRowCount(cuboidId); } else { return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } }

计算某个cuboid HLL Reduce 编号源码:

public int getReducerIdForCuboidRowCount(long cuboidId) { int rowCounterId = (int) (Math.abs(cuboidId) % nCuboidRowCounters); return nDimReducers + rowCounterId; }

MR Reducer: 根据Mapper的输入进行需要字典列的字典构建、计算所有非derived维度列max和Min、HLL估算的统计值。 (1)、FactDistinctColumnsReducer.java (2)、MultipleOutputs: HLL估算统计结果输出: 输出路径:baseDir/statistics/statistics-r-0000x 输出内容:共有3+cuboid.length行记录,其中==前3行的内容分别为: == key=-1(表示mapper重叠率),value=合并前的每个mapper对每个cuboid 的估算的rowcount和/合并每个mapper对每个cuboid估算的rowcount后的和;key=-2(表示此reduce的输入的mapper数量),value即mapper的数量; key=0(表示抽样的百分比),value为抽样的百分比,这是咱们从配置文件中配置的默认是14%。 其他行的内容为每个cuboid及其对应的估算结果。 所有非Derived维度列的范围: 输出路径:baseDir/colName/colName.dci-r-0000x 输出内容: 每个reduce只有两行内容,某列此reduce max和min值,第一行为Min值,第二行为Max值,key为空即只有一个字段。 所有需要字典编码列字典编码内容: 输出路径:baseDir/colName/colName.rldict-r-0000x 输出内容: 只有一行记录,即某列此reduce的字典值,key为空即只有一个字段。

名词解释: 某个cube的所有需要编码的列: (1)、此cube维度列的编码类型为dict的列(Rowkeys中Encoding,同时需要排除掉此列为MR HIVE类型的列或derived的列、Extended的列); (2)、度量的类型为Bitmap(bitmap且列类型为bigint类型,如果此列是MR HIVE类型则也不包含)、Raw、TopN(topN measuer且此列为dict列或者未配置encoding);排除MR HIVE的列的度量类型为Bitmap且该列类型为bigint类型的列或Raw或TopN且此列为dict列或者未配置encoding。

ShardByColumns: Global Dictionaries 全局字典列且此列的Build Class类型为GlobalDictionaryBuilder或SegmentAppendTrieDictBuilder类型。

最新回复(0)