Flink学习系列之六 DataSetAPI

mac2025-11-29  11

1 DataSet API之Data Sources

基于文件 readTextFile(path)基于集合 fromCollection(Collection)

    上述的API之前都已经使用过,这里不在赘述。

2 Transformations部分详解

Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作FlatMap:输入一个元素,可以返回零个,一个或者多个元素MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值Aggregate:sum、max、min等Distinct:返回一个数据集中去重之后的元素,data.distinct() Join:内连接 OuterJoin:外链接Cross:获取两个数据集的笛卡尔积 Union:返回两个数据集的总和,数据类型需要一致First-n:获取集合中的前N个元素 Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序

上述的API,我红色标注出来的就是我在这里演示的。其余的API在DataStream API里面基本学习过了。

2.1 MapPartition

package com.caozg.batch; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】 * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: MapPartitionHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午7:32 * @UpdateDate: 19-11-1 下午7:32 * @Version: 1.0 */ public class MapPartitionHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); List<String> data=new ArrayList<>(); data.add("hello you"); data.add("hello me"); DataSource<String> dataSource = environment.fromCollection(data); DataSet<String> dataSet = dataSource.mapPartition(new MapPartitionFunction<String, String>() { @Override public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception { //这里是获取一个分区的链接,只获取一次就可以完成处理,不像map,需要多次获取。 for (Iterator<String> iterator = values.iterator(); iterator.hasNext(); ) { String[] valueArray = iterator.next().split("\\W+"); for (String value : valueArray) { out.collect(value); } } } }); dataSet.print(); } }

结果如下:

hello you hello me

2.2 Distinct

package com.caozg.batch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * Distinct:返回一个数据集中去重之后的元素,data.distinct() * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: MapDistinctHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午7:32 * @UpdateDate: 19-11-1 下午7:32 * @Version: 1.0 */ public class MapDistinctHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); List<String> data = new ArrayList<>(); data.add("hello you"); data.add("hello me"); DataSource<String> dataSource = environment.fromCollection(data); dataSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] split = value.split("\\W+"); for (String val : split) { System.out.println("单词:" + val); out.collect(val); } System.out.println("===============>"); } }).distinct().print(); } }

结果如下:

单词:hello 单词:you ===============> 单词:hello 单词:me ===============> you me hello

2.3 Join

package com.caozg.batch; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; import java.util.List; /** * 连接 类似于sql中的连接查询 * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: MapJoinHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午7:32 * @UpdateDate: 19-11-1 下午7:32 * @Version: 1.0 */ public class MapJoinHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1, "小明")); data1.add(new Tuple2<>(2, "丽丽")); data1.add(new Tuple2<>(3, "陆瓷")); List<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1, "江西")); data2.add(new Tuple2<>(2, "纽约")); data2.add(new Tuple2<>(3, "成都")); DataSource<Tuple2<Integer, String>> t1 = e.fromCollection(data1); DataSource<Tuple2<Integer, String>> t2 = e.fromCollection(data2); // where的条件是指定关联的字段,equalTO是关联字相等 t1.join(t2).where(0) .equalTo(0) .with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { return new Tuple3<>(first.f0, first.f1, second.f1); } }).print(); } }

结果如下:

(3,陆瓷,成都) (1,小明,江西) (2,丽丽,纽约)

2.4 Outer

这个和sql中的连接查询基本也是一致的。包括左外连接,右外连接,全连接等。demo如下:

import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import java.util.ArrayList; import java.util.List; /** * outer * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: OuterHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午9:07 * @UpdateDate: 19-11-1 下午9:07 * @Version: 1.0 */ public class OuterHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<Integer, String>> data1 = new ArrayList<>(); data1.add(new Tuple2<>(1, "小明")); data1.add(new Tuple2<>(2, "丽丽")); data1.add(new Tuple2<>(3, "陆瓷")); List<Tuple2<Integer, String>> data2 = new ArrayList<>(); data2.add(new Tuple2<>(1, "江西")); data2.add(new Tuple2<>(2, "纽约")); data2.add(new Tuple2<>(4, "兰州")); DataSource<Tuple2<Integer, String>> t1 = e.fromCollection(data1); DataSource<Tuple2<Integer, String>> t2 = e.fromCollection(data2); t1.leftOuterJoin(t2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (second == null) { return new Tuple3<>(first.f0, first.f1, "null"); } return new Tuple3<>(first.f0, first.f1, second.f1); } }).print(); System.out.println("======left join is end========="); t1.rightOuterJoin(t2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<>(second.f0, "null", second.f1); } return new Tuple3<>(first.f0, first.f1, second.f1); } }).print(); System.out.println("======right join is end========="); t1.fullOuterJoin(t2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() { @Override public Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception { if (first == null) { return new Tuple3<>(second.f0, "null", second.f1); } else if (second == null) { return new Tuple3<>(first.f0, first.f1, "null"); } return new Tuple3<>(first.f0, first.f1, second.f1); } }).print(); System.out.println("======full join is end========="); } }

结果如下:

(3,陆瓷,null) (1,小明,江西) (2,丽丽,纽约) ======left join is end========= (1,小明,江西) (2,丽丽,纽约) (4,null,兰州) ======right join is end========= (3,陆瓷,null) (1,小明,江西) (2,丽丽,纽约) (4,null,兰州) ======full join is end=========

2.5 Cross(笛卡尔积)

import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import java.util.Arrays; import java.util.List; /** * outer * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: CrossHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午9:07 * @UpdateDate: 19-11-1 下午9:07 * @Version: 1.0 */ public class CrossHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment(); List<Integer> s1 = Arrays.asList(10, 20); List<String> s2 = Arrays.asList("hello", "flink"); DataSource<Integer> data1 = e.fromCollection(s1); DataSource<String> data2 = e.fromCollection(s2); data1.cross(data2).print(); } }

结果如下:

(10,hello) (10,flink) (20,hello) (20,flink)

2.6 Union

public class UnionHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment(); List<Integer> s1 = Arrays.asList(10, 20); List<Integer> s2 = Arrays.asList(30, 40); DataSource<Integer> data1 = e.fromCollection(s1); DataSource<Integer> data2 = e.fromCollection(s2); data1.union(data2).print(); } }

将两块数据源汇聚成一块。

2.7 First-N以及Sort Partition

import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import java.util.ArrayList; import java.util.List; /** * outer * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: UnionHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午9:07 * @UpdateDate: 19-11-1 下午9:07 * @Version: 1.0 */ public class UnionHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<Integer,String>> data=new ArrayList<>(); data.add(new Tuple2<>(1,"xa")); data.add(new Tuple2<>(3,"aa")); data.add(new Tuple2<>(4,"pa")); data.add(new Tuple2<>(2,"oa")); data.add(new Tuple2<>(2,"na")); data.add(new Tuple2<>(2,"ma")); data.add(new Tuple2<>(7,"ba")); DataSource<Tuple2<Integer, String>> source = e.fromCollection(data); //获取前3个元素 source.first(3).print(); /* 结果如下: (1,xa) (3,aa) (4,pa) */ System.out.println("====================="); //先分组,然后获取每个分组的前2条数据 source.groupBy(0).first(2).print(); /* 结果如下: (3,aa) (7,ba) (1,xa) (2,oa) (2,na) (4,pa) */ System.out.println("====================="); //根据第一列分组,第二列排序,获取每组前2个 source.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print(); /* 结果如下: (3,aa) (7,ba) (1,xa) (2,ma) (2,na) (4,pa) */ System.out.println("====================="); //全局排序 在本地会对所有的数据进行排序 source.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print(); /* 结果如下: (1,xa) (2,oa) (2,na) */ } }

3 DataSet API之partition

Rebalance:对数据集进行再平衡,重分区,消除数据倾斜Hash-Partition:根据指定key的哈希值对数据集进行分区 partitionByHash()Range-Partition:根据指定的key对数据集进行范围分区 .partitionByRange()Custom Partitioning:自定义分区规则 自定义分区需要实现Partitioner接口 partitionCustom(partitioner, "someKey") 或者partitionCustom(partitioner, 0)

下面演示hash和range的分区策略:

import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** * outer * * @ProjectName: FlinkLearning * @Package: com.caozg.batch * @ClassName: PartionHandler * @Author: GoSaint * @CreateDate: 19-11-1 下午9:07 * @UpdateDate: 19-11-1 下午9:07 * @Version: 1.0 */ public class PartionHandler { public static void main(String[] args) throws Exception { ExecutionEnvironment e = ExecutionEnvironment.getExecutionEnvironment(); List<Tuple2<Integer,String>> data=new ArrayList<>(); data.add(new Tuple2<>(1,"xa")); data.add(new Tuple2<>(2,"aa")); data.add(new Tuple2<>(3,"pa")); data.add(new Tuple2<>(3,"oa")); data.add(new Tuple2<>(3,"na")); data.add(new Tuple2<>(3,"ma")); data.add(new Tuple2<>(4,"ba")); data.add(new Tuple2<>(4,"ba")); data.add(new Tuple2<>(4,"ba")); data.add(new Tuple2<>(4,"ba")); data.add(new Tuple2<>(5,"ba")); data.add(new Tuple2<>(5,"ba")); data.add(new Tuple2<>(5,"ba")); data.add(new Tuple2<>(5,"ba")); data.add(new Tuple2<>(5,"ba")); data.add(new Tuple2<>(6,"ba")); data.add(new Tuple2<>(6,"ba")); data.add(new Tuple2<>(6,"ba")); data.add(new Tuple2<>(6,"ba")); data.add(new Tuple2<>(6,"ba")); data.add(new Tuple2<>(6,"ba")); DataSource<Tuple2<Integer, String>> source = e.fromCollection(data); /*source.partitionByHash(0).mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() { @Override public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception { Iterator<Tuple2<Integer, String>> iterator = values.iterator(); while (iterator.hasNext()){ Tuple2<Integer, String> next = iterator.next(); System.out.println("当前线程ID是:"+Thread.currentThread().getId()+" value is "+next); } } }).print();*/ /* 结果如下: 当前线程ID是:61 value is (3,pa) 当前线程ID是:61 value is (3,oa) 当前线程ID是:61 value is (3,na) 当前线程ID是:61 value is (3,ma) 当前线程ID是:62 value is (1,xa) 当前线程ID是:62 value is (5,ba) 当前线程ID是:62 value is (5,ba) 当前线程ID是:62 value is (5,ba) 当前线程ID是:62 value is (5,ba) 当前线程ID是:62 value is (5,ba) 当前线程ID是:62 value is (6,ba) 当前线程ID是:62 value is (6,ba) 当前线程ID是:62 value is (6,ba) 当前线程ID是:62 value is (6,ba) 当前线程ID是:62 value is (6,ba) 当前线程ID是:62 value is (6,ba) 当前线程ID是:63 value is (2,aa) 当前线程ID是:63 value is (4,ba) 当前线程ID是:63 value is (4,ba) 当前线程ID是:63 value is (4,ba) 当前线程ID是:63 value is (4,ba)*/ source.partitionByRange(0).mapPartition(new MapPartitionFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>() { @Override public void mapPartition(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception { Iterator<Tuple2<Integer, String>> iterator = values.iterator(); while (iterator.hasNext()){ Tuple2<Integer, String> next = iterator.next(); System.out.println("当前线程ID是:"+Thread.currentThread().getId()+" value is "+next); } } }).print(); /*结果如下: 当前线程ID是:64 value is (1,xa) 当前线程ID是:64 value is (2,aa) 当前线程ID是:64 value is (3,pa) 当前线程ID是:64 value is (3,oa) 当前线程ID是:64 value is (3,na) 当前线程ID是:64 value is (3,ma) 当前线程ID是:65 value is (4,ba) 当前线程ID是:65 value is (4,ba) 当前线程ID是:65 value is (4,ba) 当前线程ID是:65 value is (4,ba) 当前线程ID是:65 value is (5,ba) 当前线程ID是:65 value is (5,ba) 当前线程ID是:65 value is (5,ba) 当前线程ID是:65 value is (5,ba) 当前线程ID是:65 value is (5,ba) 当前线程ID是:66 value is (6,ba) 当前线程ID是:66 value is (6,ba) 当前线程ID是:66 value is (6,ba) 当前线程ID是:66 value is (6,ba) 当前线程ID是:66 value is (6,ba) 当前线程ID是:66 value is (6,ba) */ } }

 

最新回复(0)