SQL客户端旨在提供一种简单的方式,无需一行Java或Scala代码,即可将表程序编写、调试和提交到Flink集群。SQL客户机CLI允许检索和可视化命令行上运行的分布式应用程序的实时结果。 SQL客户机绑定在常规Flink发行版中,因此可以开箱即用。它只需要一个正在运行的Flink集群,其中可以执行表程序。
可以自己安装flink环境,也可以通过docker一键安装 : github 下载sql-raining https://github.com/ververica/sql-training 使用docker执行: docker-compose up -d 我这里是在Linux虚拟机上安装的Docker,建议Docker内存4G左右。 使用 Docker Compose 来安装,包含了所需的各种服务的容器,包括:
Flink SQL Client:用来提交query,以及可视化结果Flink JobManager 和 TaskManager:用来运行 Flink SQL 任务。Apache Kafka:用来生成输入流和写入结果流。Apache Zookeeper:Kafka 的依赖项ElasticSearch:用来写入结果 docker-compose 命令会启动所有所需的容器。第一次运行的时候,Docker 会自动地从 Docker Hub 下载镜像,这可能会需要一段时间(将近 2.3GB)。 运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UIdocker环境中运行方式:
docker-compose exec sql-client ./sql-client.sh该命令会在容器中启动 Flink SQL CLI 客户端.
自己安装的环境(我这里使用自己安装的flink环境):
./bin/start-cluster.sh ./bin/sql-client.sh embedded默认情况下,SQL客户端将读取环境文件./conf/sql-client-defaults.yaml中的配置
使用HELP命令列出所有可用的SQL语句
该查询不需要表源,只生成一行结果。CLI将从集群中检索结果并将其可视化。您可以通过按Q键关闭结果视图。 CLI支持两种模式来维护和可视化结果。 表模式在内存中实现结果,并以规则的分页表表示将结果可视化。它可以通过在CLI中执行以下命令来启用: SET execution.result-mode=table;
变更日志模式不会物化结果,而是可视化由插入(+)和撤销(-)组成的连续查询生成的结果流。 SET execution.result-mode=changelog;
使用如下代码验证,该查询执行一个有界word count示例:
SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;表模式下结果:
变更日志模式结果: 在这两种模式中,结果都存储在SQL客户机的Java堆内存中。为了保持CLI接口的响应,变更日志模式只显示最新的1000个更改。table模式允许在较大的结果中导航,这些结果只受可用主内存和配置的最大行数(max-table-result-rows)的限制。
在批处理环境中执行的查询只能使用表结果模式检索。
可以使用以下可选CLI命令启动SQL客户机。 通过help查看命令选项:
./bin/sql-client.sh embedded --help我们可以看到,可以通过-e指定配置环境等。
SQL查询需要一个执行它的配置环境。所谓的环境文件定义了可用的表源和接收、外部目录、用户定义的函数以及执行和部署所需的其他属性。 每个环境文件都是一个常规YAML文件。之前说了默认会选择读取环境文件./conf/sql-client-defaults.yaml中的配置。我们可以自己设置yaml文件。 文件的示例:
# Define tables here such as sources, sinks, views, or temporal tables. tables: - name: MyTableSource type: source-table update-mode: append connector: type: filesystem path: "/path/to/something.csv" format: type: csv fields: - name: MyField1 type: INT - name: MyField2 type: VARCHAR line-delimiter: "\n" comment-prefix: "#" schema: - name: MyField1 type: INT - name: MyField2 type: VARCHAR - name: MyCustomView type: view query: "SELECT MyField2 FROM MyTableSource" # Define user-defined functions here. functions: - name: myUDF from: class class: foo.bar.AggregateUDF constructor: - 7.6 - false # Execution properties allow for changing the behavior of a table program. execution: type: streaming # required: execution mode either 'batch' or 'streaming' result-mode: table # required: either 'table' or 'changelog' max-table-result-rows: 1000000 # optional: maximum number of maintained rows in # 'table' mode (1000000 by default, smaller 1 means unlimited) time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default) parallelism: 1 # optional: Flink's parallelism (1 by default) periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default) max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time restart-strategy: # optional: restart strategy type: fallback # "fallback" to global restart strategy by default # Deployment properties allow for describing the cluster to which table programs are submitted to. deployment: response-timeout: 5000配置: 使用从CSV文件读取的表源mytables源定义一个环境 定义使用SQL查询声明虚拟表的视图MyCustomView 定义一个用户定义的函数myUDF,该函数可以使用类名和两个构造函数参数实例化 指定在此流环境中执行的查询的并行度为1 指定事件时间特征 在表结果模式运行查询。
根据用例的不同,配置可以分为多个文件。因此,环境文件可以用于一般目的(使用缺省环境文件—缺省环境),也可以基于每个会话创建(使用会话环境文件—环境)。每个CLI会话都使用默认属性和会话属性初始化。例如,默认环境文件可以指定在每个会话中查询的所有表源,而会话环境文件仅声明特定的状态保持时间和并行性。启动CLI应用程序时,可以传递缺省环境文件和会话环境文件。 如果没有指定默认环境文件,SQL客户机将在Flink的配置目录中搜索./conf/sql-client-defaults.yaml文件。 在CLI会话中设置的属性(例如使用set命令)具有最高的优先级: CLI commands > session environment file > defaults environment file
重启策略控制在失败的情况下Flink作业如何重新启动。与Flink集群的全局重启策略类似,可以在环境文件中声明更细粒度的重启配置。 支持以下策略:
execution: # falls back to the global strategy defined in flink-conf.yaml restart-strategy: type: fallback # job fails directly and no restart is attempted restart-strategy: type: none # attempts a given number of times to restart the job restart-strategy: type: fixed-delay attempts: 3 # retries before job is declared as failed (default: Integer.MAX_VALUE) delay: 10000 # delay in ms between retries (default: 10 s) # attempts as long as the maximum number of failures per time interval is not exceeded restart-strategy: type: failure-rate max-failures-per-interval: 1 # retries in interval until failing (default: 1) failure-rate-interval: 60000 # measuring interval in ms for failure rate delay: 10000 # delay in ms between retries (default: 10 s)SQL客户机不需要使用Maven或SBT设置Java项目。相反,可以将依赖项作为常规JAR文件传递到集群。 可以单独指定每个JAR文件(使用–jar),也可以定义整个库目录(使用–library)。对于外部系统(如Apache Kafka)和相应数据格式(如JSON)的连接器,Flink提供了现成的JAR包。这些JAR文件以sql-jar作为后缀,可以从Maven中央存储库下载每个版本的JAR文件。
下面的示例显示了一个环境文件,该文件定义了一个从Apache Kafka读取JSON数据的表源。
tables: - name: TaxiRides type: source-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: TaxiRides startup-mode: earliest-offset properties: - key: zookeeper.connect value: localhost:2181 - key: bootstrap.servers value: localhost:9092 - key: group.id value: testGroup format: property-version: 1 type: json schema: "ROW<rideId LONG, lon FLOAT, lat FLOAT, rideTime TIMESTAMP>" schema: - name: rideId type: LONG - name: lon type: FLOAT - name: lat type: FLOAT - name: rowTime type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "rideTime" watermarks: type: "periodic-bounded" delay: "60000" - name: procTime type: TIMESTAMP proctime: trueSQL客户机允许用户创建用于SQL查询的自定义用户定义函数。可以使用Java/Scala以编程模式定义。 为了提供用户定义的函数,首先需要实现和编译扩展ScalarFunction、AggregateFunction或TableFunction的函数类(参见用户定义的函数)。然后可以将一个或多个函数打包到SQL客户机的依赖项JAR中。 所有函数在调用之前必须在环境文件中声明。对于函数列表中的每个项,必须指定: name:函数注册时使用的名称, from:指定函数的源函数(目前限制为class) class:指示函数的完全限定类名和用于实例化的可选构造函数参数列表。
functions: - name: ... # required: name of the function from: class # required: source of the function (can only be "class" for now) class: ... # required: fully qualified class name of the function constructor: # optimal: constructor parameters of the function class - ... # optimal: a literal parameter with implicit type - class: ... # optimal: full class name of the parameter constructor: # optimal: constructor parameters of the parameter's class - type: ... # optimal: type of the literal parameter value: ... # optimal: value of the literal parameter确保指定参数的顺序和类型严格匹配函数类的一个构造函数。
根据用户定义的函数,可能需要在SQL语句中使用实现之前对其进行参数化。 如上例所示,在声明用户定义的函数时,可以通过以下三种方法之一使用构造函数参数来配置类: 隐式类型的字面值:SQL客户机将根据文字值本身自动派生类型。目前,这里只支持布尔值、INT值、DOUBLE**值和VARCHAR值。**如果自动派生没有按照预期工作(例如,您需要一个VARCHAR false),则使用显式类型。
true # -> BOOLEAN (case sensitive)42 # -> INT1234.222 # -> DOUBLEfoo # -> VARCHAR **具有显式类型的字面值:**为了类型安全,显式声明具有类型和值属性的参数。type: DECIMAL value: 11111111111111111 下表说明了受支持的Java参数类型和相应的SQL类型字符串。 Java类型 SQL类型 java.math.BigDecimal DECIMAL java.lang.Boolean BOOLEAN java.lang.Byte TINYINT java.lang.Double DOUBLE java.lang.Float REAL,FLOAT java.lang.Integer INTEGER,INT java.lang.Long BIGINT java.lang.Short SMALLINT java.lang.String VARCHAR 还不支持其他更多类型(例如,TIMESTAMP或ARRAY)、基本类型和null。 (嵌套)类实例:除字面值外,您还可以通过指定class和constructor属性为构造函数参数创建(嵌套)类实例。可以递归地执行此过程,直到所有构造函数参数都使用字面值表示。 - class: foo.bar.paramClass constructor: - StarryName - class: java.lang.Integer constructor: - class: java.lang.String constructor: - type: VARCHAR value: 3为了定义端到端SQL管道,可以使用SQL的INSERT INTO语句向Flink集群提交长时间运行的分离查询。这些查询将结果生成到外部系统,而不是SQL客户机。这允许处理更高的并行性和更大数量的数据。CLI本身对提交后分离的查询没有任何控制。
INSERT INTO MyTableSink SELECT * FROM MyTableSource必须在环境文件中声明表sink MyTableSink。下面是Apache Kafka表接收器的一个例子。
tables: - name: MyTableSink type: sink-table update-mode: append connector: property-version: 1 type: kafka version: "0.11" topic: OutputTopic properties: - key: zookeeper.connect value: localhost:2181 - key: bootstrap.servers value: localhost:9092 - key: group.id value: testGroup format: property-version: 1 type: json derive-schema: true schema: - name: rideId type: LONG - name: lon type: FLOAT - name: lat type: FLOAT - name: rideTime type: TIMESTAMPSQL Client在提交后不会跟踪正在运行的Flink作业的状态。提交后可以关闭CLI进程,而不会影响分离的查询。 Flink的重启策略负责容错。可以使用Flink的Web界面,命令行或REST API取消查询。
视图允许从SQL查询中定义虚拟表。视图定义立即被解析和验证。但是,实际执行是在提交一般INSERT INTO或SELECT语句时访问视图。 视图可以在环境文件中定义,也可以在CLI会话中定义。 下面的例子展示了如何在一个文件中定义多个视图。这些视图是按照在环境文件中定义它们的顺序注册的。支持像视图A依赖于视图B依赖于视图C这样的引用链。
tables: - name: MyTableSource # ... - name: MyRestrictedView type: view query: "SELECT MyField2 FROM MyTableSource" - name: MyComplexView type: view query: > SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) FROM MyTableSource WHERE MyField2 > 200还可以使用CREATE VIEW语句在CLI会话中创建视图:
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;在CLI会话中创建的视图也可以使用DROP VIEW语句再次删除:
DROP VIEW MyNewView;与表source和sink类似,在会话环境文件中定义的视图具有最高的优先级。
