Flink学习系列之四 DataStream API之Transformations&Partition

mac2024-04-20  4

1 Transformations API介绍

map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作flatmap:输入一个元素,可以返回零个,一个或者多个元素filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区【典型用法见备注】 两种典型用法 dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key 注意:以下类型是无法作为key的 1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法 2:一个任意形式的数组类型 3:基本数据类型,int,longreduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值aggregations:sum(),min(),max()等window:在后面单独详解Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmapSplit:根据规则把一个数据流切分为多个流Select:和split配合使用,选择切分后的流

2 Transformations demo演示

flatmap和map以及keyby都已经演示过了,可以参考我之前的博客:

https://blog.csdn.net/GoSaint/article/details/102827822 ,https://blog.csdn.net/GoSaint/article/details/102788390

https://blog.csdn.net/GoSaint/article/details/102731991。下面演示filter。表示过滤,剔除不需要的,根据业务规则留下需要的数据。

演示①:filter

package com.caozg.stream; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * Filter演示 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: DefinitionSerialFilterSource * @Author: GoSaint * @Version: 1.0 */ public class DefinitionSerialFilterSource { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStream<Long> res = text.map(new MapFunction<Long, Long>() { public Long map(Long out) throws Exception { System.out.println("原始接受到的数据: " + out); return out; }//每2秒接受数据,然后sum求和 }); res = res.filter(new FilterFunction<Long>() { public boolean filter(Long out) throws Exception { if (out % 2 == 0) { System.out.println("Filter过滤接受到的数据: " + out); return true; } return false; } }); res.timeWindowAll(Time.seconds(2)).sum(0); res.print().setParallelism(1); try { env.execute("DefinitionSerialSource"); } catch (Exception e) { e.printStackTrace(); } } }

这里的SelfdDefinitionSerialSourceFunction是自定义并行度为1的source。可以参见https://blog.csdn.net/GoSaint/article/details/102827822文章,结果如下所示:

原始接受到的数据: 0 Filter过滤接受到的数据: 0 0 原始接受到的数据: 1 原始接受到的数据: 2 Filter过滤接受到的数据: 2 2 原始接受到的数据: 3 原始接受到的数据: 4 Filter过滤接受到的数据: 4 4

只选择了偶数。奇数进行了剔除。

演示②:union

union合并数据源,前提是合并的数据源必须数据类型一致。

package com.caozg.stream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; /** * Union演示 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: DefinitionSerialUnionSource * @Author: GoSaint * @Version: 1.0 */ public class DefinitionSerialUnionSource { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text1 = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStreamSource<Long> text2 = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStream<Long> text = text1.union(text2); DataStream<Long> res = text.map(new MapFunction<Long, Long>() { public Long map(Long out) throws Exception { System.out.println("原始接受到的数据: " + out); return out; }//每2秒接受数据,然后sum求和 }).timeWindowAll(Time.seconds(2)).sum(0); res.print().setParallelism(1); try { env.execute("DefinitionSerialUnionSource"); } catch (Exception e) { e.printStackTrace(); } } }

演示③:Connect

Connect只能合并两个数据源,数据类型可以一致,也可以不一致。

package com.caozg.stream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; /** * Connect演示 * connect合并两个流,数据类型可以一致,也可以不一致。 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: DefinitionSerialConnectSource * @Author: GoSaint * @CreateDate: 19-10-31 下午3:05 * @UpdateDate: 19-10-31 下午3:05 * @Version: 1.0 */ public class DefinitionSerialConnectSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text1 = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStreamSource<Long> text2 = env.addSource(new SelfdDefinitionSerialSourceFunction()); //将text的输出转换为String类型 SingleOutputStreamOperator<String> outputStreamOperator = text2.map(new MapFunction<Long, String>() { public String map(Long value) throws Exception { return "str_" + value; } }); ConnectedStreams<Long, String> connectedStreams = text1.connect(outputStreamOperator); SingleOutputStreamOperator<Object> res = connectedStreams.map(new CoMapFunction<Long, String, Object>() { public Object map1(Long value) throws Exception { return value; } public Object map2(String value) throws Exception { return value; } }); res.print().setParallelism(1); env.execute("DefinitionSerialConnectSource"); } }

结果如下所示:

str_0 0 1 str_1 2 str_2 str_3 3

演示④ split

这里注意的是SplitStream已经过时啦。在https://github.com/apache/flink/commit/e0efabe8884f22b4a1c7ab9df3274b3fca03dcfb#diff-163a9f7f1b682eab706eb5e2a94faabcR36这里有一些相关的信息,可以参考。

package com.caozg.stream; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; /** * split演示,可以对同一个流进行切割 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream * @ClassName: SplitStream * @Author: GoSaint * @CreateDate: 19-10-31 下午3:24 * @UpdateDate: 19-10-31 下午3:24 * @Version: 1.0 */ public class SplitStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction()); DataStream<Long> splitStream = text.split(new OutputSelector<Long>() { public Iterable<String> select(Long o) { //切割规则,奇数和偶数分开 List<String> list = new ArrayList<String>(); if (o % 2 == 0) { list.add("even"); } else { list.add("odd"); } return list; } }); //取出偶数数据流 DataStream<Long> even = ((org.apache.flink.streaming.api.datastream.SplitStream<Long>) splitStream).select("even"); even.print().setParallelism(1); env.execute(SplitStream.class.getSimpleName()); } }

运行结果如下:

0 2 4 6

3 DataStream API之partition

Random partitioning:随机分区 dataStream.shuffle()Rebalancing:对数据集进行再平衡,重分区,消除数据倾斜 dataStream.rebalance()Rescaling:解释见备注 dataStream.rescale()Custom partitioning:自定义分区 自定义分区需要实现Partitioner接口 dataStream.partitionCustom(partitioner, "someKey") 或者dataStream.partitionCustom(partitioner, 0);Broadcasting:在后面单独详解

备注:Rescaling解释: 举个例子: 如果上游操作有2个并发,而下游操作有4个并发,那么上游的一个并发结果分配给下游的两个并发操作,另外的一个并发结果分配给了下游的另外两个并发操作.另一方面,下游有两个并发操作而上游又4个并发操作,那么上游的其中两个操作的结果分配给下游的一个并发操作而另外两个并发操作的结果则分配给另外一个并发操作。 Rescaling与Rebalancing的区别: Rebalancing会产生全量重分区,而Rescaling不会。

自定义分区demo

package com.caozg.stream.partition; import org.apache.flink.api.common.functions.Partitioner; /** * 自定义分区 * * @ProjectName: FlinkLearning * @Package: com.caozg.stream.partition * @ClassName: SelfDefinitionPartition * @Author: GoSaint * @CreateDate: 19-10-31 下午4:30 * @UpdateDate: 19-10-31 下午4:30 * @Version: 1.0 */ public class SelfDefinitionPartition implements Partitioner<Long> { public int partition(Long key, int i) { System.out.println("分区总数为: " + i); if (key % 2 == 0) { return 0; } return 1; } }

这里两个分区。一个奇数一个偶数处理。

package com.caozg.stream.partition; import com.caozg.stream.SelfdDefinitionSerialSourceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class PartitionSelfStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Long> text = env.addSource(new SelfdDefinitionSerialSourceFunction()); //将Long转换为Tuple1 DataStream<Tuple1<Long>> tuple1DataStream = text.map(new MapFunction<Long, Tuple1<Long>>() { public Tuple1 map(Long value) throws Exception { return new Tuple1(value); } }); //分区 DataStream<Tuple1<Long>> dataStream = tuple1DataStream.partitionCustom(new SelfDefinitionPartition(), 0); SingleOutputStreamOperator<Long> res = dataStream.map(new MapFunction<Tuple1<Long>, Long>() { public Long map(Tuple1<Long> value) throws Exception { System.out.println("当前线程id:" + Thread.currentThread().getId() + " value is:" + value); return value.getField(0); } }); res.print().setParallelism(1); env.execute(PartitionSelfStream.class.getSimpleName()); } }

结果如下:

分区总数为: 4 当前线程id:63 value is:(0) 0 分区总数为: 4 当前线程id:66 value is:(1) 1 分区总数为: 4 当前线程id:63 value is:(2) 2 分区总数为: 4 当前线程id:66 value is:(3) 3 分区总数为: 4 当前线程id:63 value is:(4)

 

最新回复(0)