大数据开发之Flume篇----Flume入门(1)

mac2024-05-11  30

Flume简介 Flume官网:http://flume.apache.org/ 从官网上我们可以知道,其实Flume是就是一种分布式,高可靠数据采集聚合与移动的服务。对于流式数据,其有这简单灵活的特性,支持容错横向扩张。如何体现Flume的简单灵活呢?Flume只有三中类型的组件,分别是:source,channel和sink。我们只需要写好配置文件在运行的时候指定我们的配置文件就可以了。 source:就是我们要收集数据的源端,可以是一个文件也可以是一堆的文件,甚至是一个允许监听的端口或者是一个输入界面。 channel:连接源端和目标端的通道,可以是我们的内存或者磁盘的抽象,甚至是其他的组件。我们可以将其理解成Java中NIO的channel。 sink:数据的目标端,我们可以将数据存储到HDFS上面,也可以是下一个source端或者是我们消息中间件,甚至是控制台。 一般我们使用flume的版本是:flume-ng-1.6.0-cdh5.7.0 所以我们只要去cdh5那下载flume-ng-1.6.0-cdh5.7.0.tar.gz,并解压到指定目录就可以了。 同时,我们还要修改其配置文件,我们先到Flume目录下的conf目录中,输入命令; cp  flume-env.sh.template flume-env.sh vi flume-env.sh 将里面的JAVA_HOME变量指定好就行了。 使用Flume 我们先写一个简单的配置文件看看,里面需要写一些什么内容: #指定我们的agent以及里面的source,channel和sink这3个组件的名字,这里a1是agent的名字 a1.sources = e1 a1.channels = c1 a1.sinks = l1 #指定e1的类型,这里我们使用exec,这个是监控我们一个命令的结果,所以后面还要给出命令tail -F /home/test/data/log a1.sources.e1.type = exec a1.sources.e1.command = tail -F /home/test/data/log #这里指定channel的类型,这里使用内存,要注意如果源端和目标端的读写速度差很远的话,就会有大量数据滞留channel了 #这个时候如果用内存的话,和可能扛不住,我们要换成File channel a1.channels.c1.type = memory #指定sink的类型,这里使用logger,打印到控制台上面 a1.sinks.l1.type = logger #最后将各个组件拼在一次 a1.sources.e1.channels = c1 a1.sinks.l1.channel = c1 有了这个配置文件以后,我们就可以运行了: cd $FLUME_HOME/bin ./flume-ng agent --conf $FLUME_HOME/conf --conf-file {这里是你放你写的配置文件的路径} --name a1 -Dflume.root.logger=INFO,console 3个组件 反正配置文件就是这样写的,写好就按照上面的做法来执行就好了 现在,我们来介绍3个组件在生产上常用的类型。 Source 其实生产上我们常用的就是avro和taildir这两种。 avro是我们需要将多个agent拼接在一起的时候,这个前一个agent时候sink端使用了avro,下一个agent的source端也要使用avro。我们只需要为这两个avro规划好host和port就行。 taildir是我们要读取一个文件夹里面的一堆文件的时候使用的,同时它会为已经读取过的文件打上一个标签,记录在一个json文件里面,这样当我们的Flume重启的时候就不会再去重读已经读取过的文件了(如果使用exec类型作为source的话,一旦重启就又把全部文件又重读一遍了,而spooling这种方式的话,Flume会直接将要读取的文件加个后缀) channel 主要是memory和file两种,前者就是使用内存来充当渠道,这个时候channel将会和JVM公用内存,所以我们有必要在 flume-env.sh里面为JVM配置更高的内存的。而后者File channel就是会有数据落盘到磁盘上,这种是当我们的source端的读入数据的速度很快,但是sink的写出速度很低时,大量数据被滞留在channel的时候使用的。 sink 这个可以用的类型就多了,一般是avro和HDFS以及一些消息中间件(Kafka)。如果是写入到HDFS上面的话,我们要注意写入的间隔,多少数据以及时间生成一个文件(如果文件后缀是tmp的话说明这个文件正在被写入数据,当后缀没了以后就表明这个文件已经被写完数据了),如果我们是按时间来生成文件的话,因为在生产上我们一般都有时间上的延时,所以这个时间批次的文件里面可能含有上一个时间批次的数据,如果我们后面有处理过程就要注意多久处理每一个时间批次的数据了。另外再提示一点就是使用HDFS的话,可以使用压缩格式的,可以根据压缩比来定义rollsize这个参数的大小了。 接Kafka的话,一般是Flume + Kafka + SparkStreaming这种方式,这种方式我们后面接着介绍。 Flume接Kafka的坑 根据官网的话,如果我们的sink是Kafka的话,我们最起码需要配置type,kafka.bootstrap.servers,kafka.topic。这几个参数的,然而,这样写是有问题的。 我们应该写成: a1.sinks.l1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.l1.brokerList = master:9092 a1.sinks.l1.topic = xxx 所以我们一定要配置的是type,brokerList,topic这三个参数

最新回复(0)