flatmap和map以及keyby都已经演示过了,可以参考我之前的博客:
https://blog.csdn.net/GoSaint/article/details/102827822 ,https://blog.csdn.net/GoSaint/article/details/102788390
https://blog.csdn.net/GoSaint/article/details/102731991。下面演示filter。表示过滤,剔除不需要的,根据业务规则留下需要的数据。
这里的SelfdDefinitionSerialSourceFunction是自定义并行度为1的source。可以参见https://blog.csdn.net/GoSaint/article/details/102827822文章,结果如下所示:
原始接受到的数据: 0 Filter过滤接受到的数据: 0 0 原始接受到的数据: 1 原始接受到的数据: 2 Filter过滤接受到的数据: 2 2 原始接受到的数据: 3 原始接受到的数据: 4 Filter过滤接受到的数据: 4 4只选择了偶数。奇数进行了剔除。
union合并数据源,前提是合并的数据源必须数据类型一致。
package com.caozg.stream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * Union演示 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: DefinitionSerialUnionSource * @Author: GoSaint * @Version: 1.0 */ public class DefinitionSerialUnionSource { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text1 = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStreamSource<Long> text2 = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStream<Long> text = text1.union(text2); DataStream<Long> res = text.map(new MapFunction<Long, Long>() { public Long map(Long out) throws Exception { System.out.println("原始接受到的数据: " + out); return out; }//每2秒接受数据,然后sum求和 }).timeWindowAll(Time.seconds(2)).sum(0); res.print().setParallelism(1); try { env.execute("DefinitionSerialUnionSource"); } catch (Exception e) { e.printStackTrace(); } } }Connect只能合并两个数据源,数据类型可以一致,也可以不一致。
package com.caozg.stream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; /** * Connect演示 * connect合并两个流,数据类型可以一致,也可以不一致。 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: DefinitionSerialConnectSource * @Author: GoSaint * @CreateDate: 19-10-31 下午3:05 * @UpdateDate: 19-10-31 下午3:05 * @Version: 1.0 */ public class DefinitionSerialConnectSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text1 = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStreamSource<Long> text2 = env.addSource(new SelfdDefinitionSerialSourceFunction()); //将text的输出转换为String类型 SingleOutputStreamOperator<String> outputStreamOperator = text2.map(new MapFunction<Long, String>() { public String map(Long value) throws Exception { return "str_" + value; } }); ConnectedStreams<Long, String> connectedStreams = text1.connect(outputStreamOperator); SingleOutputStreamOperator<Object> res = connectedStreams.map(new CoMapFunction<Long, String, Object>() { public Object map1(Long value) throws Exception { return value; } public Object map2(String value) throws Exception { return value; } }); res.print().setParallelism(1); env.execute("DefinitionSerialConnectSource"); } }结果如下所示:
str_0 0 1 str_1 2 str_2 str_3 3这里注意的是SplitStream已经过时啦。在https://github.com/apache/flink/commit/e0efabe8884f22b4a1c7ab9df3274b3fca03dcfb#diff-163a9f7f1b682eab706eb5e2a94faabcR36这里有一些相关的信息,可以参考。
package com.caozg.stream; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; /** * split演示,可以对同一个流进行切割 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: SplitStream * @Author: GoSaint * @CreateDate: 19-10-31 下午3:24 * @UpdateDate: 19-10-31 下午3:24 * @Version: 1.0 */ public class SplitStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStream<Long> splitStream = text.split(new OutputSelector<Long>() { public Iterable<String> select(Long o) { //切割规则,奇数和偶数分开 List<String> list = new ArrayList<String>(); if (o % 2 == 0) { list.add("even"); } else { list.add("odd"); } return list; } }); //取出偶数数据流 DataStream<Long> even = ((org.apache.flink.streaming.api.datastream.SplitStream<Long>) splitStream).select("even"); even.print().setParallelism(1); env.execute(SplitStream.class.getSimpleName()); } }运行结果如下:
0 2 4 6备注:Rescaling解释: 举个例子: 如果上游操作有2个并发,而下游操作有4个并发,那么上游的一个并发结果分配给下游的两个并发操作,另外的一个并发结果分配给了下游的另外两个并发操作.另一方面,下游有两个并发操作而上游又4个并发操作,那么上游的其中两个操作的结果分配给下游的一个并发操作而另外两个并发操作的结果则分配给另外一个并发操作。 Rescaling与Rebalancing的区别: Rebalancing会产生全量重分区,而Rescaling不会。
这里两个分区。一个奇数一个偶数处理。
package com.caozg.stream.partition; import com.caozg.stream.SelfdDefinitionSerialSourceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class PartitionSelfStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction()); //将Long转换为Tuple1 DataStream<Tuple1<Long>> tuple1DataStream = text.map(new MapFunction<Long, Tuple1<Long>>() { public Tuple1 map(Long value) throws Exception { return new Tuple1(value); } }); //分区 DataStream<Tuple1<Long>> dataStream = tuple1DataStream.partitionCustom(new SelfDefinitionPartition(), 0); SingleOutputStreamOperator<Long> res = dataStream.map(new MapFunction<Tuple1<Long>, Long>() { public Long map(Tuple1<Long> value) throws Exception { System.out.println("当前线程id:" + Thread.currentThread().getId() + " value is:" + value); return value.getField(0); } }); res.print().setParallelism(1); env.execute(PartitionSelfStream.class.getSimpleName()); } }结果如下:
分区总数为: 4 当前线程id:63 value is:(0) 0 分区总数为: 4 当前线程id:66 value is:(1) 1 分区总数为: 4 当前线程id:63 value is:(2) 2 分区总数为: 4 当前线程id:66 value is:(3) 3 分区总数为: 4 当前线程id:63 value is:(4)