[flink]#14

mac2024-05-15  30

需要的依赖

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.6.1</version> </dependency>

Table API 和 SQL

创建TableEnvironment //流数据查询 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); //批数据查询 ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); ------------------------------------------------------------------------------ //流数据查询 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); //批数据查询 ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); 注册Table //流数据查询 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); //批数据查询 ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); ----------------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //创建一个TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) //注册一个TableSource,称为CvsTable tableEnv.registerTableSource("CsvTable", csvSource) TableSink把数据写到外部存储介质 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //创建一个TableSink TableSink csvSink = new CsvTableSink("/path/to/file", ...); //定义字段名称和类型 String[] fieldNames = {"a", "b", "c"}; TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; //注册一个TableSink,称为CsvSinkTable tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink); ------------------------------------------------------------------ val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //创建一个TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) //定义字段名称和类型 val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) //注册一个TableSink,称为CsvSinkTable tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) 使用Table API StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //注册一个Orders表 ... //通过scan操作获取到一个Table对象 Table orders = tableEnv.scan("Orders"); //计算所有来自法国的收入 Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum"); ----------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //注册一个Orders表 ... //通过scan操作获取到一个Table对象 val orders = tableEnv.scan("Orders") //计算所有来自法国的收入 val revenue = orders .filter('cCountry === "FRANCE") .groupBy('cID, 'cName) .select('cID, 'cName, 'revenue.sum AS 'revSum) 使用SQL StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //注册一个Orders表 ... //计算所有来自法国的收入 Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); ------------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //注册一个Orders表 ... //计算所有来自法国的收入 val revenue = tableEnv.sqlQuery(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin)

DataStream DataSet 和Table的转换

DataStream/DataSet 注册为Table对象 注册实现 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //获取DataStream DataStream<Tuple2<Long, String>> stream = ... //把DataStream注册为Table,称为myTable,表中的字段为f0,f1 tableEnv.registerDataStream("myTable", stream); //在注册Table的时候也可以手工指定字段的名称 tableEnv.registerDataStream("myTable2", stream, "myLong, myString"); ----------------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //获取DataStream val stream: DataStream[(Long, String)] = ... //把DataStream注册为Table,称为myTable,表中的字段为f0,f1 tableEnv.registerDataStream("myTable", stream) //在注册Table的时候也可以手工指定字段的名称 import org.apache.flink.table.api.scala._ tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString) 直接转化 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //获取DataStream DataStream<Tuple2<Long, String>> stream = ... //把DataStream转化为Table,使用默认的字段名称f0,f1 Table table1 = tableEnv.fromDataStream(stream); //把DataStream转化为Table,使用指定的字段名称"myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, "myLong, myString"); ----------------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //获取DataStream val stream: DataStream[(Long, String)] = ... //把DataStream转化为Table,使用默认的字段名称_1, '_2 val table1: Table = tableEnv.fromDataStream(stream) //把DataStream转化为Table,使用指定的字段名称'myLong, 'myString import org.apache.flink.table.api.scala._ val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString) Table转化为DataStream/DataSet Table转为DataStream StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //Table中有两个字段(String name, Integer age) Table table = ... //把Table中的数据转成DataStream<Row> DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); //或者把Table中的数据转成DataStream<Tuple2> TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); //将Table转化成Retract形式的DataStream<Row> //一个Retract Stream的类型X为DataStream<Tuple2<Boolean, X>> //Boolean字段指定了更改的类型 //true表示INSERT,false表示DELETE DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class); ------------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //Table中有两个字段(String name, Integer age) val table: Table = ... //把Table中的数据转成DataStream<Row> val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) //或者把Table中的数据转成DataStream<Tuple2> val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table) val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table) Table转为DataSet ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //Table中有两个字段(String name, Integer age) Table table = ... //把Table中的数据转成DataSet<Row> DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); //或者把Table中的数据转成DataSet<Tuple2> TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType); ---------------------------------------------------------------- val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //Table中有两个字段(String name, Integer age) val table: Table = ... //把Table中的数据转成DataSet<Row> val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) //或者把Table中的数据转成DataSet<Tuple2> val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
最新回复(0)