如下:
package application; import com.alibaba.fastjson.JSONObject; import hbase.HbaseRowKeyUtil; import operator.*; import org.apache.commons.collections.IteratorUtils; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; import java.util.*; /** * todo 批量存储数据到hbase */ public class SaveDataToHbase { public static void main(String[] args) throws Exception { // String fileUrl = "D:\\wxgz-local\\resources_yace\\"; String fileUrl = "/zywa/job/storm/resources_new/"; // todo 2,读取kafka数据 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); //todo 获取kafka的配置属性 args = new String[]{"--input-topic", "topn_test", "--bootstrap.servers", "node2.hadoop:9092,node3.hadoop:9092", "--zookeeper.connect", "node1.hadoop:2181,node2.hadoop:2181,node3.hadoop:2181", "--group.id", "cc2"}; ParameterTool parameterTool = ParameterTool.fromArgs(args); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties sendPros = parameterTool.getProperties(); Properties pros = parameterTool.getProperties(); //todo 指定输入数据为kafka topic DataStream<String> kafkaDstream = env.addSource(new FlinkKafkaConsumer010<String>( pros.getProperty("input-topic"), new SimpleStringSchema(), pros).setStartFromLatest() ).setParallelism(4); DataStream<JSONObject> json_Dstream = kafkaDstream.map(new MapFunction<String, JSONObject>() { @Override public JSONObject map(String value) throws Exception { return JSONObject.parseObject(value); } }).setParallelism(4); //todo 做转换处理之后 存储在hbase DataStream<List<JSONObject>> processHbase = json_Dstream.keyBy(new KeySelector<JSONObject, String>() { @Override public String getKey(JSONObject jsonObject) throws Exception { return jsonObject.getString("interfaceKey"); } }).window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessWindowFunction<JSONObject, List<JSONObject>, String, TimeWindow>() { private org.apache.hadoop.conf.Configuration configuration; private Connection connection = null; private BufferedMutator mutator; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); configuration = HBaseConfiguration.create(); // configuration.set("hbase.master", "node1.hadoop:60020"); configuration.set("hbase.zookeeper.quorum", "node1.hadoop,node2.hadoop,node3.hadoop"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); try { connection = ConnectionFactory.createConnection(configuration); } catch (IOException e) { e.printStackTrace(); } BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("test_spark")); params.writeBufferSize(1 * 1024 * 1024); mutator = connection.getBufferedMutator(params); } @Override public void process(String keyBy, Context context, Iterable<JSONObject> iterable, Collector<List<JSONObject>> out) throws Exception { ArrayList<Put> putList = new ArrayList<>(); Iterator<JSONObject> iterator = iterable.iterator(); List<JSONObject> logList = IteratorUtils.toList(iterator); logList.stream().forEach(logJson -> { String interfaceKey = logJson.getString("interfaceKey"); String shortAppKey = logJson.getString("shortAppKey"); String deviceId = logJson.getString("deviceId"); String rowKeyOfStorm = HbaseRowKeyUtil.getRowKeyOfStorm(shortAppKey, deviceId); Put put = new Put(rowKeyOfStorm.getBytes()); Set<String> set = logJson.keySet(); set.stream().forEach(key -> { String keys = interfaceKey + "_" + key; String value = logJson.getString(key); if (!key.equals(interfaceKey)) { put.addColumn("cf1".getBytes(), keys.getBytes(), value.getBytes()); } }); putList.add(put); }); //todo 存储 if (putList.size() > 0) { mutator.mutate(putList); mutator.flush(); putList.clear(); //todo 数据下发 out.collect(logList); } } }).setParallelism(4); //todo 数据打散 DataStream<String> logDstream = processHbase.flatMap(new FlatMapFunction<List<JSONObject>, String>() { @Override public void flatMap(List<JSONObject> logList, Collector<String> out) throws Exception { System.out.println("logList.size() = " + logList.size()); logList.stream().forEach(log -> { out.collect(log.toJSONString()); }); } }).disableChaining().setParallelism(4); //todo 断开之后继续下一步的操作 //todo 定义一个测流输出 final OutputTag<JSONObject> mutiOutputTag = new OutputTag<JSONObject>("mutiStream") { }; //todo 2,过滤掉不满足格式的数据 // DataStream<JSONObject> jsonDstream = kafkaDstream.map(new MapOperator_01(fileUrl)).disableChaining().setParallelism(4); DataStream<JSONObject> jsonDstream = kafkaDstream.map(new MapOperator_01(fileUrl)).disableChaining().name("MapOperator_01").setParallelism(4); // SingleOutputStreamOperator<JSONObject> splitStream = jsonDstream.process(new ProcessOperator_01(mutiOutputTag)).startNewChain().setParallelism(4); SingleOutputStreamOperator<JSONObject> splitStream = jsonDstream.process(new ProcessOperator_01(mutiOutputTag)).disableChaining().setParallelism(4); //todo 3,需要查询hbase的流 DataStream<JSONObject> mutiStream = splitStream.getSideOutput(mutiOutputTag); mutiStream.print(); //todo 4,先做单条件流 ,去匹配场景表达式 DataStream<JSONObject> filterDstream = splitStream.filter(new FilterOperator_01()).disableChaining().setParallelism(4); // DataStream<JSONObject> filterDstream = splitStream.filter(new FilterOperator_01()).slotSharingGroup("group_03").setParallelism(4); DataStream<JSONObject> mapDstream = filterDstream.map(new MapOperator_02()).name("MapOperator_02").setParallelism(4); //todo 推送下发 mapDstream.filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject json) throws Exception { //推送 if (json.containsKey("Payload")) { return true; } return false; } }).setParallelism(4) .map(new MapFunction<JSONObject, String>() { @Override public String map(JSONObject s) throws Exception { return s.toJSONString(); } }).setParallelism(4) .addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test3", new SimpleStringSchema(), sendPros)).setParallelism(4); //todo 下发到kafka filter SingleOutputStreamOperator processDstream = mapDstream.filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject json) throws Exception { //推送 if (!json.containsKey("Payload")) { return true; } return false; } }).setParallelism(4) .keyBy(value -> value.getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessOperator_02()) .setParallelism(4); //todo 发送到kafka processDstream.addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test2", new SimpleStringSchema(), sendPros)) .setParallelism(4); //todo 匹配复杂情况 DataStream<JSONObject> mutiProcessDstream = mutiStream.keyBy(value -> value.getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessOperator_03()) .setParallelism(4); //todo 批量条件 DataStream<JSONObject> process = mutiProcessDstream.map(new MapOperator_03()) .setParallelism(4) .filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject jsonObject) throws Exception { if (jsonObject.containsKey("tiaojian")) { return true; } return false; } }).setParallelism(4) .keyBy(value -> value.getJSONObject("logJson").getString("appKey")) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(500))) .process(new ProcessOperator_04()) .setParallelism(4); //todo 已经匹配到场景的情况下,先发送到topic process.map(new MapOperator_04()) .setParallelism(4). filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if (StringUtils.isNotBlank(value)) { return true; } return false; } }) .setParallelism(4) .addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test3", new SimpleStringSchema(), sendPros)).setParallelism(4); process.map(new MapOperator_05()) .setParallelism(4).filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if (StringUtils.isNotBlank(value)) { return true; } return false; } }).setParallelism(4) .addSink(new FlinkKafkaProducer010<String>( "dianyou_wx_test2", new SimpleStringSchema(), sendPros)).setParallelism(4); env.execute("startExecute"); } }