flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)
文章目录
flume+kafka+strom;zookeeper+hadoop+hbase(整合启动)@[toc]一、启动zookeeper+hadoop+Hbase二、启动flume+kafka+strom三、代码测试四、测试Topology
安装教程: 大数据集群搭建(zookeeper、高可用hadoop、高可用hbase) 大数据集群搭建—《flume篇》 大数据集群搭建—《kafka(集群)篇》 大数据集群搭建—《strom(集群)篇》
一、启动zookeeper+hadoop+Hbase
zookeeper(三台)
cd /opt/zookeeper/zookeeper-3.4.12/bin/
./zkServer.sh start
2. hadoop(主)
cd /opt/hadoop/hadoop-2.7.3/sbin/
./start-all.sh
可能出现zhiyou003的ResourceManager未正常启动zhiyou003启动yarn
cd /opt/hadoop/hadoop-2.7.3/sbin/
./start-yarn.sh
Hbase(主)
cd /opt/hbase/hbase-2.0.0/bin/
./start-hbase.sh
./hbase shell
二、启动flume+kafka+strom
创建配置flume
//flume_kafka.conf此时配置kafka的主题为total
##########################################################
##
##主要作用是监听目录中的新增数据,采集到数据之后,输出到kafka
## 注意:Flume agent的运行,主要就是配置source channel sink
## 下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#具体定义source
a1.sources.r1.type = exec
#文件
a1.sources.r1.command =tail -F /orders.log
#sink到kafka里面
a1.sinks.k1.channel = c1
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的Topic
a1.sinks.k1.kafka.topic = total
#设置Kafka的broker地址和端口号
a1.sinks.k1.kafka.bootstrap.servers = zhiyou001:9092,zhiyou002:9092,zhiyou003:9092
#配置批量提交的数量
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type= snappy
#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动kafka(三台)
cd /opt/kafka/kafka_2.12-1.1.0
./bin/kafka-server-start.sh -daemon config/server.properties
创建主题(主机)(启动几台factor、partitions后写几)
cd /opt/kafka/kafka_2.12-1.1.0/bin/
./kafka-topics.sh --create --zookeeper zhiyou001:2181,zhiyou002:2181,zhiyou003:2181 --replication-factor 3 --partitions 3 --topic total
//启动消费者
./kafka-console-consumer.sh --bootstrap-server zhiyou001:9092,zhiyou002:9092,zhiyou003:9092 --from-beginning --topic total
启动flume(主机)
cd /opt/flume/apache-flume-1.8.0-bin/
bin/flume-ng agent --conf conf --conf-file conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
测试(新窗口)
cd /
echo "1">>orders.log
三、代码测试
HbaseUtil
package com.zhiyou.total;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
public class HbaseUtil {
private static Configuration conf;
private static Connection conn;
private static Table table;
static{
//1.连接zookeeper
conf=new Configuration();
conf.set("hbase.zookeeper.quorum", "zhiyou001:2181,zhiyou002:2181,zhiyou002:2180");
//2.建立连接
try {
conn=ConnectionFactory.createConnection(conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static Table getTable(String tableName) {
//3.获取表
try {
table=conn.getTable(TableName.valueOf(tableName));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return table;
}
}
Spout
package com.zhiyou.total;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import kafka.api.OffsetRequest;
public class Spout {
public KafkaSpout createKafkSpout(){
//配置hosts
String borkerZKStr="zhiyou001:2181,zhiyou002:2181,zhiyou003:2181";
BrokerHosts brokerHosts=new ZkHosts(borkerZKStr);
String topic="total";//选择主题
String zkRoot="/"+topic; //zkRoot
String id="id002"; //标识符
SpoutConfig config=new SpoutConfig(brokerHosts, topic, zkRoot, id);
//最新的消息
config.startOffsetTime=OffsetRequest.LatestTime();
return new KafkaSpout(config);
}
}
Bolt1
package com.zhiyou.total;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class Bolt1 extends BaseBasicBolt{
public void execute(Tuple input, BasicOutputCollector collector) {
// TODO Auto-generated method stub
byte[]bytes=input.getBinary(0); //字节码
//byte->String
//192.168.53.123 0-001 100 手机 小米8 99 2 20181112(timeStamp)
String line =new String(bytes);
String[] str=line.split(" ");
long price=Long.valueOf(str[5]);
String date=str[7];
System.out.println(line);
collector.emit(new Values(date,price));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("date","price"));
}
}
Bolt2
package com.zhiyou.total;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class Bolt2 extends BaseBasicBolt{
public void execute(Tuple input, BasicOutputCollector collector) {
String date = input.getStringByField("date");
long price = input.getLongByField("price");
Table table = HbaseUtil.getTable("sum_money_date");
try {
table.incrementColumnValue(("sum_money_"+date).getBytes(), "info".getBytes(), "summoney".getBytes(), price);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Topology
package com.zhiyou.total;
import java.util.HashMap;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
public class Topology {
public static void main(String[] args) {
TopologyBuilder bulider=new TopologyBuilder();
bulider.setSpout("Spout", new Spout().createKafkSpout());
bulider.setBolt("Bolt1", new Bolt1()).shuffleGrouping("Spout");
bulider.setBolt("Bolt2", new Bolt2()).shuffleGrouping("Bolt1");
StormTopology stormTopology=bulider.createTopology();
LocalCluster localCluster=new LocalCluster();
localCluster.submitTopology("a", new HashMap(), stormTopology);
}
}
四、测试Topology
修改日志
cd /
echo "192.168.53.123 0-001 100 手机 小米8 99 2 20181112">>orders.log
后台打印 hbase查看