Hive执行SQL的主要流程图 看着有很多阶段,实际上很简单。Hive就是把SQL通过AST解析,然后遍历若干次(进行算子替换以及优化),最后再次遍历算子,如果为reduceSink操作符则划分出一个stage,类似Spark中通过shuffle来划分stage,生成MapReduce任务。最后将这些任务按照执行计划的顺序提交到Yarn上执行。
Hive架构图:
由于源码中有很多很多细节,本文在分析时会忽略部分不是非常重要的细节。 执行SQL的主要入口方法为Driver.runInternal,该方法涉及了整个Hive SQL执行流程,从SQL到编译,解析,执行,收集返回结果。
首先该方法中会判断SQL是否经过编译,若未进行编译,则会调用compileInternal->compile方法。 ① SQL->compile->获得plan org/apache/hadoop/hive/ql/Driver.java -> compile方法 进一步详细看下: SQL->AST 这部分比较复杂,笔者暂时也没有仔细研究,先略过。 AST->Task 这部分会用到BaseSemanticAnalyzer.analyze,大致流程是先通过SemanticAnalyzerFactory.get(queryState, tree),初始化BaseSemanticAnalyzer对象,并且确定了该SQL的类型。SQL的类型以及使用了哪些算子都在org/apache/hadoop/hive/ql/parse/HiveParser.g语法文件中枚举出来了。 然后会通过sem.analyze(tree, ctx)调用analyzeInternal 而analyzeInternal是一个抽象方法,有很多种实现: 具体实现还是比较复杂的,大致上的思路就是上面说的,对语法树进行递归,把每个节点用switch枚举匹配,替换成Hive的算子。
Task->QueryPlan 这部分直接实例化了一个QueryPlan对象,实际上就是把上一步的结果给copy出来组成了一个新对象。 其中存放task的属性为; private ArrayList<Task<? extends Serializable>> rootTasks; private FetchTask fetchTask;
QueryPlan->Job 前面部分把编译的流程大致梳理了一遍,现在我们看下任务的提交与执行。 Driver.execute: 开始提交任务了: lauchTask会将任务提交到Yarn,先继续往下看: 看到这边,整个链路都跑完了,在控制台的最后一行,也会看到一个熟悉的OK。
那么最后我们来仔细看一下Driver.launchTask,看看任务是怎么被提交到Yarn的: 无论是否并行执行,都会执行taskRunner.runSequential方法: Task.executeTask: 这边调用的execute是抽象方法,由task类型决定何种实现方法。 这边就以MapRedTask为例,进行分析: 获取reducer数量,如果没有明确,则根据input数据量估算: super.execute即ExecDriver.execute:
向Yarn RM提交任务,之后的逻辑可以参考之前写过的文章MapReduce框架源码解析
断断续续花了2周多,终于把Hive整个执行流程注释完了。看上面整理的代码注释,可以了解整个执行过程核心的一些步骤:
首先SQL进入Driver.compile使用ANTLR进行编译,生成AST; 然后使用BaseSemanticAnalyzer.analyze调用语法文件获得初步的执行计划Plan; 之后调用Driver.execute根据Plan生成Job,Job中包含了MapTask和ReduceTask; 最后就是将任务按stage提交到Yarn执行。