深入join的过程 虽然我们在Spark篇的SparkSQL入门中也提到了一些join的东西,但是还是不够详细。这里我们将在Hive中执行join操作,看看在执行计划中究竟是怎么样的。 现在我们有这么一个SQL语句,里面是对两个表进行join: select a.empno,a.ename,a.deptno,b.dname from emp a join dept b on a.deptno = b.deptno; 这里是执行这个操作的日志截图: 我们可以从这个日志上看到,我们将两个表进行了join操作却没有reduce操作,那么就是没有shuffle操作呗,这是不是很棒呀。但是我们现在需要把shuffle弄出来才方便我们理解下面的东西,所以我们设置一下这个参数:hive.auto.convert.join,现在这个值是true,我们将它设置成false在走一次看看情况是什么,然后再开始将。 通过上面的两次操作,我们知道原来join还可以有两种的,一种是没有reduce的一种是有reduce的。 一般正常的join我们其实可以把其定义成一中CommonJoin/ReduceJoin,这种join就是在reduce端执行的,所以过程肯定是伴随着shuffle的;而另外一种join就是没有shuffle的,这个join是发生在map端的,所以我们可以定义成为MapJoin(其实就是使用了类似Spark过程呃广播变量) CommonJoin 我们先来介绍CommonJoin,同时结合执行计划来观察 explain select a.empno,a.ename,a.deptno,b.dname from emp a join dept b on a.deptno = b.deptno; 得到的结果是: STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 6 Data size: 701 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: deptno is not null (type: boolean) Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: deptno (type: int) sort order: + Map-reduce partition columns: deptno (type: int) Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE value expressions: empno (type: int), ename (type: string) TableScan alias: b Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: deptno is not null (type: boolean) Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: deptno (type: int) sort order: + Map-reduce partition columns: deptno (type: int) Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE value expressions: dname (type: string) Reduce Operator Tree: Join Operator condition map: Inner Join 0 to 1 keys: 0 deptno (type: int) 1 deptno (type: int) outputColumnNames: _col0, _col1, _col7, _col12 Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: int), _col1 (type: string), _col7 (type: int), _col12 (type: string) outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink 这里有两个stage stage-1中先是一个map阶段,里面先扫描表a,然后执行一个filter阶段(这个个阶段只用你的SQL里有join都会有的),并得到所需要的字段:a.empno,a.ename,a.deptno,其中a.deptno是key用来进行join操作的。然后即使扫描表b,同上的操作。 然后就是一个reduce阶段了,在这个阶段就会执行join操作了。 就是先对emp表进行一个map得到一堆mappers,同样的操作也对dept表执行了一次也是得到一堆的mappers,接着一个shuffle过程将相同的key的数据重新分配到相同的位置上,最后执行reduce操作。 MapJoin 我们修改hive.auto.convert.join的值为true,在来看看执行计划是怎么样: STAGE DEPENDENCIES: Stage-4 is a root stage Stage-3 depends on stages: Stage-4 Stage-0 depends on stages: Stage-3 STAGE PLANS: Stage: Stage-4 Map Reduce Local Work Alias -> Map Local Tables: b Fetch Operator limit: -1 Alias -> Map Local Operator Tree: b TableScan alias: b Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: deptno is not null (type: boolean) Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: NONE HashTable Sink Operator keys: 0 deptno (type: int) 1 deptno (type: int) Stage: Stage-3 Map Reduce Map Operator Tree: TableScan alias: a Statistics: Num rows: 6 Data size: 701 Basic stats: COMPLETE Column stats: NONE Filter Operator predicate: deptno is not null (type: boolean) Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: Inner Join 0 to 1 keys: 0 deptno (type: int) 1 deptno (type: int) outputColumnNames: _col0, _col1, _col7, _col12 Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col0 (type: int), _col1 (type: string), _col7 (type: int), _col12 (type: string) outputColumnNames: _col0, _col1, _col2, _col3 Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Local Work: Map Reduce Local Work Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink 这次stage就有3个了,首先执行一个Map Reduce Local Work工作。将b表转换成一个Map Local Table,就是广播到各个节点上去,再扫描b表定义好需要的key。就到下一个stage,在这个stage上面,就是扫描a表,定义好key,然后就执行了join操作了。最后就是一个stage-0,也没有了reduce操作。 具体就是,将一个小的表生成一个HashTable,然后加载到分布式缓存上面,接着为大表生成一堆的mappers,然后从分布式缓存里面将小表生成的HashTable和大表的每一个mappers进行join,得到我们需要的输出。 从运行日志上看: 2019-01-09 05:33:59 Starting to launch local task to process map join; maximum memory = 477626368 2019-01-09 05:34:00 Dump the side-table for tag: 1 with group count: 4 into file: file:/tmp/test/5180d098-6132-43ab-a32a-37688d2de617/hive_2019-01-09_17-33-56_429_8137188178107053233-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile31–.hashtable 2019-01-09 05:34:00 Uploaded 1 File to: file:/tmp/test/5180d098-6132-43ab-a32a-37688d2de617/hive_2019-01-09_17-33-56_429_8137188178107053233-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile31–.hashtable (373 bytes) 2019-01-09 05:34:00 End of local task; Time Taken: 1.22 sec. Execution completed successfully Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there’s no reduce operator Starting Job = job_1547022064410_0004, Tracking URL = http://doudou:8288/proxy/application_1547022064410_0004/ Kill Command = /home/test/app/hadoop/bin/hadoop job -kill job_1547022064410_0004 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0 2019-01-09 17:34:07,909 Stage-3 map = 0%, reduce = 0% 2019-01-09 17:34:13,193 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 1.91 sec MapReduce Total cumulative CPU time: 1 seconds 910 msec Ended Job = job_1547022064410_0004 MapReduce Jobs Launched: Stage-Stage-3: Map: 1 Cumulative CPU: 1.91 sec HDFS Read: 6665 HDFS Write: 310 SUCCESS Total MapReduce CPU Time Spent: 1 seconds 910 mse 先执行了一个local task(先定义好一个小表,这个的小表是小于一个具体的内存值,这个值可以我们自己手动设置的),然后Dump the side-table for tag(就是生辰一个hashtable),接着Uploaded 1 File to(这里这个file就是hashtable,看第三条日志),最后完成local task后才开始执行map操作,没有reduce操作。 UDF注册 一般我们需要注册一个UDF的话需要现在idea上以java代码写好我们自定义的函数的实体,所以我们需要一个依赖: org.apache.hive hive-exec ${hive.version} 代码主体 //类名可以自定义,但一定要继承UDF这个类 public class doudou extends UDF{ //方法名一定要是evaluate,但是返回值类型和参数列表可以自定义,但是因为是UDF所以返回值不要是集合类型的 public String evaluate(String str){ return "xxx" + str; } } 然后打成jar报,因为我们在Hive上面使用这个方法需要到这个jar包 注册UDF方法 注册UDF方法有三种方法,前两中是临时版的,当前会话没了就没了 方法一: 先找个文件加=夹放我们的jar包,比如:/home/doudou/app/hive/lib/doudou-udf.jar(jar包的全路径) 在Hive窗口里面: add jar /home/doudou/app/hive/lib/doudou-udf.jar create tempoary function douUDF as 'udfCreate.doudou'; 这里as后面跟的是你的包名加类名 方法二: 如果不想输入这么一句话:add jar /home/doudou/app/hive/lib/doudou-udf.jar 可以在hive目录下创建一个目录:auxlib(一定要是这个目录名哦,想改的话就要修改配置文件里面的参数了),把你的jar包放到里面去,那么每一次就是输入:create tempoary function douUDF as ‘udfCreate.doudou’;这句就好了。 方法三: 不想每一次都创建的话,可以使用下面这个方法,将这个函数注册到metastore里面 create function [db_name.]function_name as 'package_name.class_name' using jar 'hdfs://ip:port/some/path...' 这个方法就是将你的jar包放到HDFS上面,然后注册的时候使用using jar '…'来指定HDFS上的路径,然后你就能在你的MySQL上面的元数据地址看到这个函数了,可以直接使用了,只是每次打开一个会话时第一次使用这个函数都会自动加载一下的。如果哪位小伙伴不想这样的话,可以修改源码把方法写到源码上面去,然后重新编译一下,或者替换一下你现在使用的Hive目录里面lib目录下的hive-exec-1.1.0-cdh5.7.0.jar这个表,就是我们用idea编写我们自定义方法体的那个依赖,在里面添加你的函数。