0604-6.1.0-如何使用StreamSets实时采集指定数据目录文件并写入库Kudu

mac2022-10-04  39

Fayson的github: https://github.com/fayson/cdhproject

推荐关注微信公众号:“Hadoop实操”,ID:gh_c4c535955d0f

1 文档编写目的

Fayson在前面写过多篇StreamSets的文章,本篇文章主要介绍通过StreamSets实时的方式读取本地的数据文件,通过解析处理将文件中的内容写入到Kudu中。在进行本篇文章学习前你还需要了解:

《如何在CDH中安装和使用StreamSets》

内容概述

1.测试环境准备

2.准备测试数据

3.配置StreamSets

4.流程测试及数据验证

测试环境

1.RedHat7.4

2.CM和CDH版本为6.1.0

3.Kudu 1.8.0

2 测试环境准备

1.通过Hue使用Impala创建一个Kudu表,创建脚本如下:

CREATE TABLE user_info_kudu ( id STRING COMPRESSION snappy, name STRING COMPRESSION snappy, sex STRING COMPRESSION snappy, city STRING COMPRESSION snappy, occupation STRING COMPRESSION snappy, mobile_phone_num STRING COMPRESSION snappy, fix_phone_num STRING COMPRESSION snappy, bank_name STRING COMPRESSION snappy, address STRING COMPRESSION snappy, marriage STRING COMPRESSION snappy, child_num INT COMPRESSION snappy, PRIMARY KEY (id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='master,hadoop13' );

在创建Kudu表的时候增加了kudu.master的配置参数,如果Impala中未集成kudu则需要增加该参数,集成方式如下:

2.准备测试数据文件

[root@hadoop13 data]# cat user_infoaa.txt 411025200708151236,濮敬才,1,竹山县,生产工作、运输工作和部分体力劳动者,13702734056,15103111241,广州银行48,台东东二路21号-20-7,0,2 653000199408254560,人思巧,0,怀化,商业工作人员,15305590235,15306212544,广州银行17,台东东二路21号-20-7,0,0 500000195305076075,詹致,1,商丘,企事业单位的负责人,13507721161,15105419035,广州银行81,台东东二路21号-20-2,0,4 130522198207211990,和东,1,阳泉,商业工作人员,13205104083,13105301541,广州银行6,台东东二路21号-2-9,0,0

准备了两个数据文件共100条测试数据,数据的id是唯一的。

3.在StreamSets服务所在节点上创建一个/data1/tmp的数据目录,用于配置StreamSets的采集目录

3 创建Pipline

1.登录StreamSets,创建一个directory2kudu的Pipline

2.在Pipline流程中添加Directory作为源并配置基础信息

3.配置Kafka相关信息,如Broker、ZK及Topic

配置采集的数据目录及文件读取方式

配置数据格式化方式,由于数据文件是以“,”分割因此选择CSV方式

Root Field Type选择为List,为会每行数据转化成List<Map<String, String>>格式的数据。

4.配置数据解析模块,这里选择使用“JavaScript Evaluator”

在JavaScript配置项选择处理数据的方式为Batch by Batch

配置数据解析代码,在Script配置项增加如下代码片段

for(var i = 0; i < records.length; i++) { try { var info = records[i]; var newRecord = sdcFunctions.createRecord(true); var userInfoMap = sdcFunctions.createMap(true); userInfoMap.id = info.value[0]['value']; userInfoMap.name = info.value[1]['value']; userInfoMap.sex = info.value[2]['value']; userInfoMap.city = info.value[3]['value']; userInfoMap.occupation = info.value[4]['value']; userInfoMap.tel = info.value[5]['value']; userInfoMap.fixPhoneNum = info.value[5]['value']; userInfoMap.bankName = info.value[7]['value']; userInfoMap.address = info.value[8]['value']; userInfoMap.marriage = info.value[9]['value']; userInfoMap.childNum = info.value[10]['value']; newRecord.value = userInfoMap; output.write(newRecord); } catch (e) { // Send record to error error.write(records[i], e); } }

5.添加Kudu模块及配置基本信息

6.配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割

Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀

Field to Column Mapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。

Default Opertation:设置操作类型如:insert、upsert、delete

4 流程测试验证

1.启动的directory2kudu,启动成功如下图显示

2.向/data1/tmp目录下拷贝一个准备好的数据文件

可以看到Pipline监控数据的变化,采集到50条数据

user_info_kudu表数据显示有50条记录

3.再次向/data1/tmp目录拷贝一个数据文件

可以看到Pipline监控数据的变化,采集到100条数据

user_info_kudu表数据显示有100条记录

入库的数据总条数

5 总结

1.通过StreamSets可以方便的监听指定的数据目录进行数据采集,可以在Directory模块上配置文件的过滤规则、采集频率以及数据的格式化方式。

2.StreamSets的Directory模块会将数据文件的数据以行为单位解析传输,通过List或着Map的方式封装

3.通过Process提供的JavaScript Evaluator模块来进行数据解析转换为能Kudu接收大数据格式。

最新回复(0)