(大数据实时动态数据处理)flume+kafka+strom;zookeeper+hadoop+hbase

mac2025-03-02  4

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)


文章目录

flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)@[toc]一、启动zookeeper+hadoop+Hbase二、启动flume+kafka+strom三、代码测试四、测试Topology

安装教程: 大数据集群搭建(zookeeper、高可用hadoop、高可用hbase) 大数据集群搭建—《flume篇》 大数据集群搭建—《kafka(集群)篇》 大数据集群搭建—《strom(集群)篇》


一、启动zookeeper+hadoop+Hbase

zookeeper(三台) cd /opt/zookeeper/zookeeper-3.4.12/bin/ ./zkServer.sh start

2. hadoop(主)

cd /opt/hadoop/hadoop-2.7.3/sbin/ ./start-all.sh

可能出现zhiyou003的ResourceManager未正常启动zhiyou003启动yarn cd /opt/hadoop/hadoop-2.7.3/sbin/ ./start-yarn.sh

Hbase(主) cd /opt/hbase/hbase-2.0.0/bin/ ./start-hbase.sh ./hbase shell


二、启动flume+kafka+strom

创建配置flume //flume_kafka.conf此时配置kafka的主题为total ########################################################## ## ##主要作用是监听目录中的新增数据,采集到数据之后,输出到kafka ## 注意:Flume agent的运行,主要就是配置source channel sink ## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #具体定义source a1.sources.r1.type = exec #文件 a1.sources.r1.command =tail -F /orders.log #sink到kafka里面 a1.sinks.k1.channel = c1 a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink #设置Kafka的Topic a1.sinks.k1.kafka.topic = total #设置Kafka的broker地址和端口号 a1.sinks.k1.kafka.bootstrap.servers = zhiyou001:9092,zhiyou002:9092,zhiyou003:9092 #配置批量提交的数量 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.ki.kafka.producer.compression.type= snappy #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #通过channel c1将source r1和sink k1关联起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

启动kafka(三台) cd /opt/kafka/kafka_2.12-1.1.0 ./bin/kafka-server-start.sh -daemon config/server.properties 创建主题(主机)(启动几台factor、partitions后写几) cd /opt/kafka/kafka_2.12-1.1.0/bin/ ./kafka-topics.sh --create --zookeeper zhiyou001:2181,zhiyou002:2181,zhiyou003:2181 --replication-factor 3 --partitions 3 --topic total //启动消费者 ./kafka-console-consumer.sh --bootstrap-server zhiyou001:9092,zhiyou002:9092,zhiyou003:9092 --from-beginning --topic total

启动flume(主机) cd /opt/flume/apache-flume-1.8.0-bin/ bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console

测试(新窗口) cd / echo "1">>orders.log

三、代码测试

HbaseUtil package com.zhiyou.total; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; public class HbaseUtil { private static Configuration conf; private static Connection conn; private static Table table; static{ //1.连接zookeeper conf=new Configuration(); conf.set("hbase.zookeeper.quorum", "zhiyou001:2181,zhiyou002:2181,zhiyou002:2180"); //2.建立连接 try { conn=ConnectionFactory.createConnection(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static Table getTable(String tableName) { //3.获取表 try { table=conn.getTable(TableName.valueOf(tableName)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return table; } } Spout package com.zhiyou.total; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import kafka.api.OffsetRequest; public class Spout { public KafkaSpout createKafkSpout(){ //配置hosts String borkerZKStr="zhiyou001:2181,zhiyou002:2181,zhiyou003:2181"; BrokerHosts brokerHosts=new ZkHosts(borkerZKStr); String topic="total";//选择主题 String zkRoot="/"+topic; //zkRoot String id="id002"; //标识符 SpoutConfig config=new SpoutConfig(brokerHosts, topic, zkRoot, id); //最新的消息 config.startOffsetTime=OffsetRequest.LatestTime(); return new KafkaSpout(config); } } Bolt1 package com.zhiyou.total; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class Bolt1 extends BaseBasicBolt{ public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub byte[]bytes=input.getBinary(0); //字节码 //byte->String //192.168.53.123 0-001 100 手机 小米8 99 2 20181112(timeStamp) String line =new String(bytes); String[] str=line.split(" "); long price=Long.valueOf(str[5]); String date=str[7]; System.out.println(line); collector.emit(new Values(date,price)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("date","price")); } } Bolt2 package com.zhiyou.total; import java.io.IOException; import org.apache.hadoop.hbase.client.Table; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; public class Bolt2 extends BaseBasicBolt{ public void execute(Tuple input, BasicOutputCollector collector) { String date = input.getStringByField("date"); long price = input.getLongByField("price"); Table table = HbaseUtil.getTable("sum_money_date"); try { table.incrementColumnValue(("sum_money_"+date).getBytes(), "info".getBytes(), "summoney".getBytes(), price); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { } } Topology package com.zhiyou.total; import java.util.HashMap; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; public class Topology { public static void main(String[] args) { TopologyBuilder bulider=new TopologyBuilder(); bulider.setSpout("Spout", new Spout().createKafkSpout()); bulider.setBolt("Bolt1", new Bolt1()).shuffleGrouping("Spout"); bulider.setBolt("Bolt2", new Bolt2()).shuffleGrouping("Bolt1"); StormTopology stormTopology=bulider.createTopology(); LocalCluster localCluster=new LocalCluster(); localCluster.submitTopology("a", new HashMap(), stormTopology); } }

四、测试Topology

修改日志 cd / echo "192.168.53.123 0-001 100 手机 小米8 99 2 20181112">>orders.log 后台打印 hbase查看
最新回复(0)