Flink Table API示例

mac2024-06-27  55

Source&Sink

我们选取Bounded EventTime Tumble Window为例,编写一个完整的包括Source和Sink定义的Apache Flink Table API Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户ID和访问时间。我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV)。具体数据如下:

region userId accessTime ShangHai U0010 2017-11-11 10:01:00 BeiJing U1001 2017-11-11 10:01:00 BeiJing U2032 2017-11-11 10:10:00 BeiJing U1100 2017-11-11 10:11:00 ShangHai U0011 2017-11-11 12:10:00

Source 定义

自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。

Source Function定义

支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据:

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] { override def run(ctx: SourceContext[T]): Unit = { dataWithTimestampList.foreach { case Left(t) => ctx.collectWithTimestamp(t._2, t._1) case Right(w) => ctx.emitWatermark(new Watermark(w)) } } override def cancel(): Unit = ??? }

定义 StreamTableSource

我们自定义的Source要携带我们测试的数据,以及对应的WaterMark数据,具体如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { val fieldNames = Array("accessTime", "region", "userId") val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) val rowType = new RowTypeInfo( Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], fieldNames) // 页面访问表数据 rows with timestamps and watermarks val data = Seq( Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), Right(1510365660000L), Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), Right(1510365660000L), Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), Right(1510366200000L), Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), Right(1510366260000L), Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), Right(1510373400000L) ) override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { Collections.singletonList(new RowtimeAttributeDescriptor( "accessTime", new ExistingField("accessTime"), PreserveWatermarks.INSTANCE)) } override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1) } override def getReturnType: TypeInformation[Row] = rowType override def getTableSchema: TableSchema = schema }

Sink 定义

我们简单的将计算结果写入到Apache Flink内置支持的CSVSink中,定义Sink如下:

def getCsvTableSink: TableSink[Row] = { val tempFile = File.createTempFile("csv_sink_", "tem") // 打印sink的文件路径,方便我们查看运行结果 println("Sink path : " + tempFile) if (tempFile.exists()) { tempFile.delete() } new CsvTableSink(tempFile.getAbsolutePath).configure( Array[String]("region", "winStart", "winEnd", "pv"), Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG)) }

构建主程序

主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:

def main(args: Array[String]): Unit = { // Streaming 环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // 设置EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //方便我们查出输出数据 env.setParallelism(1) val sourceTableName = "mySource" // 创建自定义source数据结构 val tableSource = new MyTableSource val sinkTableName = "csvSink" // 创建CSV sink 数据结构 val tableSink = getCsvTableSink // 注册source tEnv.registerTableSource(sourceTableName, tableSource) // 注册sink tEnv.registerTableSink(sinkTableName, tableSink) val result = tEnv.scan(sourceTableName) .window(Tumble over 2.minute on 'accessTime as 'w) .groupBy('w, 'region) .select('region, 'w.start, 'w.end, 'region.count as 'pv) result.insertInto(sinkTableName) env.execute() }

执行并查看运行结果

执行主程序后我们会在控制台得到Sink的文件路径,通过Cat 方式查看计算结果,如下: ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1 BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1 BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2 ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1 上面这个端到端的完整示例也可以应用到其他算子示例中,只是需要根据Source和Sink的Schema不同来进行相应的构建。

最新回复(0)