一、全量抽取
#!/bin/bash set -e #取出当前执行文件名,既表名 file_name=`basename $0 .sh` #json文件名 json_name=${file_name}.json #当前目录路径 dir_path=`dirname $0` cd ${dir_path} hive -e "truncate table bigdata_ods.${file_name};" datax.py ${json_name}二、增量抽取
#!/bin/bash # 适用于常规增量抽取,通过创建和更新时间抽取 # 手工填写以下参数: # 分区表达式,非分区表可以写创建时间或者其他不为空的字段表达式,hive语法:pt_expr # 主键字段,联合主键写id1,id2,id3:pk set -e timer_start=`date "+%Y-%m-%d %H:%M:%S"` timediff(){ duration=$(($(date +%s -d "${3} ${4}") - $(date +%s -d "${1} ${2}"))) echo "开始时间:${1} ${2}" echo "结束时间:${3} ${4}" if [[ $duration -lt 60 ]] then echo "执行耗时:${duration}s" elif [[ $duration -lt $((60*60)) ]] then m=$(($duration/60)) s=$(($duration%60)) echo "执行耗时:${m}m${s}s" else h=$(($duration/60/60)) m=$(($(($duration%3600))/60)) s=$(($duration%60)) echo "执行耗时:${h}h${m}m${s}s" fi } table_name=`basename $0 .sh` tmp_name=${table_name/#ods_/tmp_} json_name=${table_name}.json dir_path=`dirname $0` # 分区表达式,非分区表可以写创建时间或者其他不为空的字段表达式,hive语法 pt_expr="from_unixtime(unix_timestamp(createdate,'yyyyMMdd'),'yyyy-MM-dd')" # 主键字段,联合主键写id1,id2,id3 pk='pk1,pk2,pk3' cd $dir_path # 1、建临时表装增量数据; echo "判断是否分区表" is_pt=`hive -e "desc bigdata_ods.${table_name}"|grep -e '^# Partition Information'|wc -l` if [[ $is_pt == 0 ]]; then echo "创建临时表:drop table if exists bigdata_tmp.${tmp_name};create table bigdata_tmp.${tmp_name} like bigdata_ods.${table_name}" hive -e "drop table if exists bigdata_tmp.${tmp_name};create table bigdata_tmp.${tmp_name} like bigdata_ods.${table_name}" else echo "创建临时表:drop table if exists bigdata_tmp.${tmp_name};create table bigdata_tmp.${tmp_name} like bigdata_ods.${table_name};alter table bigdata_tmp.${tmp_name} add partition (ds='tmp')" hive -e "drop table if exists bigdata_tmp.${tmp_name};create table bigdata_tmp.${tmp_name} like bigdata_ods.${table_name};alter table bigdata_tmp.${tmp_name} add partition (ds='tmp')" fi # alter table bigdata_tmp.${tmp_name} set SERDEPROPERTIES('field.delim'='\001') if [[ $# == 2 ]]; then # 手工执行不监控数据量 v_dt1=`date -d "$1" "+%Y-%m-%d"` v_dt2='2099-12-31' v_dt3=`date -d "$2" "+%Y-%m-%d"` elif [[ $# == 1 ]]; then #statements v_dt1=`date -d "$1" "+%Y-%m-%d"` v_dt2=`date -d "$1" "+%Y-%m-%d"` v_dt3='2099-12-31' else v_dt1=`date -d "-1 day" "+%Y-%m-%d"` v_dt2=`date -d "-1 day" "+%Y-%m-%d"` v_dt3='2099-12-31' fi datax.py -p "-Dt1=${v_dt1} -Dt2=${v_dt2} -Dt3=${v_dt3} -Dtmp_name=${tmp_name}" ${json_name} # 4、合并增量数据。 p1=`echo $pk|awk -F, '{print $1}'` join_str=`echo $pk|awk -v RS="," '{printf "%s","o."$1"=t."$1" and "}'` join_str=${join_str%and } if [[ $is_pt == 0 ]]; then #statements sql_str="insert overwrite table bigdata_ods.${table_name} select a.* from bigdata_tmp.${tmp_name} a union all select o.* from bigdata_ods.${table_name} o left join bigdata_tmp.${tmp_name} t on $join_str where t.$p1 is null" else # 获取表字段 echo "获取表字段" fields_all=`hive -e "set hive.cli.print.header=true; set hive.resultset.use.unique.column.names=false;select * from bigdata_ods.${table_name} where 1=2 limit 1"` fields=`echo $fields_all|awk -v OFS="," '{NF-=1;print}'` echo "表字段:$fields" echo "记录更新的分区到${dir_path}/${tmp_name}_ds" hive -e "select distinct $pt_expr from bigdata_tmp.${tmp_name}" > ${tmp_name}_ds sds=`awk '{printf "%s|",$1}' ${tmp_name}_ds` pts=${sds%|} sql_str="set hive.exec.dynamic.partition = true;set hive.exec.dynamic.partition.mode = nonstrict;set hive.exec.max.dynamic.partitions = 100000;set hive.exec.max.dynamic.partitions.pernode = 100000;insert overwrite table bigdata_ods.${table_name} partition(ds) select ${fields},$pt_expr as ds from bigdata_tmp.${tmp_name} a union all select o.* from bigdata_ods.${table_name} o left join bigdata_tmp.${tmp_name} t on $join_str where o.ds regexp '$pts' and t.$p1 is null" fi echo "合并增量数据" echo "hive -e ${sql_str};" hive -e "${sql_str}"; echo "***SUCCESS***" timer_end=`date "+%Y-%m-%d %H:%M:%S"` timediff $timer_start $timer_end对应的datax配置的JSON改动,reader.parameter.connection[0].querySql改为select c1,c2,...cn from schema.table where (create_time>='$t1' or update_time>='$t2') and create_time<adddate('$t3',1)
writer.parameter.path根据表是否为分区表改为:是分区表则/user/hive/warehouse/bigdata_tmp.db/${tmp_name}/ds=tmp,否则/user/hive/warehouse/bigdata_tmp.db/${tmp_name}
通过以上shell,只需要复制修改shell中的分区表达式和主键参数,同时修改配置JSON就能够进行增量抽取。