Flink-cep 动态改变规则案例

mac2024-04-08  29

 

 

1,

2,编译

编译成功之后将cep的包替换maven库对应的cep jar

 1)我们编译成功之后得到了cep的包:

2)去本地库替换maven库里面的这个cep包

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>${project.version}</version> </dependency>

3,运行案例代码:

 

package org.apache.flink.streaming.examples.cep;

import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.RichPatternSelectFunction; import org.apache.flink.cep.listern.CepListener; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.RichIterativeCondition; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Map;

/**  * @Description:  * @Author: greenday  * @Date: 2019/8/15 14:05  */ public class Driver {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//        数据源         SingleOutputStreamOperator<Tuple3<String, Long, String>> source = env.fromElements(               new Tuple3<String, Long, String>("a", 1000000001000L, "22")             , new Tuple3<String, Long, String>("b", 1000000002000L, "23")             , new Tuple3<String, Long, String>("c", 1000000003000L, "23")             , new Tuple3<String, Long, String>("d", 1000000003000L, "23")             , new Tuple3<String, Long, String>("change", 1000000003001L, "23")             , new Tuple3<String, Long, String>("e", 1000000004000L, "24")             , new Tuple3<String, Long, String>("f", 1000000005000L, "23")             , new Tuple3<String, Long, String>("g", 1000000006000L, "23")         ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple3<String, Long, String>>() {             long maxTimsStamp;             @Nullable             public Watermark checkAndGetNextWatermark(Tuple3<String, Long, String> stringLongStringTuple3, long l) {                 return new Watermark(maxTimsStamp - 1000);             }

            public long extractTimestamp(Tuple3<String, Long, String> stringLongStringTuple3, long per) {                 long elementTime = stringLongStringTuple3.f1;                 if (elementTime > maxTimsStamp) {                     maxTimsStamp = elementTime;                 }                 return elementTime;             }         });

        Pattern<Tuple3<String, Long, String>,?> pattern = Pattern             .<Tuple3<String, Long, String>>begin("start").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {                 @Override                 public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {                     return value.f0.equals("a");                 }             })             .next("middle").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {                 @Override                 public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {                     return value.f0.equals("b");                 }             })             .within(Time.minutes(5));

        PatternStream patternStream = CEP.pattern(source, pattern);

        PatternStream patternstream = patternStream //            更新逻辑             .registerListener(new CepListener<Tuple3<String, Long, String>>(){                 @Override                 public Boolean needChange(Tuple3<String, Long, String> element) {                     return element.f0.equals("change");                 }                 @Override                 public Pattern returnPattern() {                     Pattern<Tuple3<String, Long, String>, ?> pattern = Pattern                         .<Tuple3<String, Long, String>>begin("start").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {                             @Override                             public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {                                 return value.f0.equals("e");                             }                         })                         .next("middle").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {                             @Override                             public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {                                 return value.f0.equals("f");                             }                         })                         .next("end").where(new RichIterativeCondition<Tuple3<String, Long, String>>() {                             @Override                             public boolean filter(Tuple3<String, Long, String> value, Context<Tuple3<String, Long, String>> ctx) throws Exception {                                 return value.f0.equals("g");                             }                         })                         .within(Time.minutes(5));                     return pattern;                 }             });

        patternstream.select(new RichPatternSelectFunction<Tuple3<String, Long, String>, Tuple3<String,String,String>>() {             @Override             public Tuple3 select(Map pattern) throws Exception {                 String start =  pattern.containsKey("start") ? ((ArrayList)pattern.get("start")).get(0).toString() : "" ;                 String middle = pattern.containsKey("middle") ? ((ArrayList)pattern.get("middle")).get(0).toString() : "" ;                 String end = pattern.containsKey("end") ? ((ArrayList)pattern.get("end")).get(0).toString() : "" ;                 return new Tuple3<String,String,String>(start,middle,end);             }         }).print();

        env.execute("cep");     } }

4,结果展示:

最新回复(0)