7.Flink总结之一文彻底搞懂FlinkSQL
Flink在开发过程中主要还是用sql,使用sql能处理百分之八十左右的开发,其余的场景才需要使用处理函数进行处理,这次就对Flinksql模块进行总结汇总。
Flink除了sql模块还有Table Api模块,Table Api模块用的比较少,不做特殊讲解。
在生产环境中如果使用sql模块尽量使用1.13版本及以上,因为1.9版本合并了阿里巴巴的Blink才有了大的改变,1.12版本做了功能上的完善,但是不稳定,1.13是个版本,说明文档:Release Notes - Flink 1.13
官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/overview/
我们这次就是总结的是最高层语言,sql模块,如下图所示:
1. 配置依赖(IDE中运行程序需要)
Flink依赖明细说明:
table api相关依赖:
IDE中运行程序需要添加依赖如下:
部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:
扩展依赖,实现自定义格式来解析 Kafka 数据,或者自定义函数
拓展模块包括的内容:
- SerializationSchemaFactory
- DeserializationSchemaFactory
- ScalarFunction
- TableFunction
- AggregateFunction
2. 开发基本架构
代码测试:
public class FlinkSqlTest {
public static void main(String[] args) throws Exception {
// 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建流式表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 获取source
SingleOutputStreamOperator map = env.socketTextStream("hadoop110", 9999).map(t -> {
return new UserVisit(t,System.currentTimeMillis());
});
// sourceStream转换成表
Table table = tableEnv.fromDataStream(map);
// 创建临时表
tableEnv.createTemporaryView("user_visit",table);
// 创建执行sql并执行打印
Table queryResultTanle = tableEnv.sqlQuery("select `userName`,`time` from user_visit");
// 表转换成流并打印
tableEnv.toDataStream(queryResultTanle).print("Stream Print");
env.execute();
}
}
UserVisit代码
public class UserVisit {
public String userName;
public Long time;
public UserVisit() {
}
public UserVisit(String userName, Long time) {
this.userName = userName;
this.time = time;
}
}
测试代码:
3. 流和表的转换
前面两个模块只是案例,重点是仅插入流模块和日志更新流模块。
1. 表转换成流
在上面的例子中,使用了 Table table = tableEnv.fromDataStream(map); 来将Stream转换成Table,而后使用 tableEnv.toDataStream(queryResultTanle).print("Stream Print"); 将表转换成流并打印
// sourceStream转换成表
Table table = tableEnv.fromDataStream(map);
// 表转换成流
DataStream
使用Flink sql统计数据并输出的时候发现,当aa重复第二次的时候控制台的op列输出了-U +U的显示,表面上是将之前的数据删除又增加一条新的数据(回撤流下面会针对说明)。而同样的逻辑,我们将聚合后的table转换成Stream的时候就会报错,如下面两张图所示。
Table执行并打印:
table转换成Stream并打印:
对于这个问题官方给的说明如下:
意思就是我们是在流式环境下工作的,产生的表是一个流式管道,是一个动态表,不仅有新增的数据还会有更新的数据或者删除的数据,那么在表转换成流的情况下就报表不支持消费更新变化,在这种情况下,使用更新日志(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。
代码中将toDataStream方法换成toChangelogStream即可
2.流转换成表
上面的代码中使用了 tableEnv.fromDataStream(map) 将流转换成表,由于数据类型定义的是Java Bean,所以自动将bean的属性转换成表的列,当然也还可以手动增加参数来指定列,除了Java bean类型,Flink还支持原子类型、Tuple、Row这几种类型,这里使用Tuple进行说明
public class FlinkSqlTest {
public static void main(String[] args) throws Exception {
// 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建流式表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 获取source
SingleOutputStreamOperator<Tuple2> map = env.socketTextStream("hadoop102", 9999).map(t -> {
return Tuple2.of(t,System.currentTimeMillis());
})
// 要指定返回值类型。
.returns(TypeInformation.of(new TypeHint<Tuple2>() { }));
// sourceStream转换成表
Table table = tableEnv.fromDataStream(map,$("f0").as("userName"),$("f1").as("time"));
// 创建临时表
tableEnv.createTemporaryView("user_visit",table);
// 创建执行sql并执行打印
tableEnv.sqlQuery("select `userName`,count(1) as cnt from user_visit group by `userName`").execute().print();
env.execute();
}
}
3. 仅插入流(Insert-Only Stream)的说明(重点)
对于流表之间的转换上面两个只是简单的说明,仅插入流和日志更新流都有丰富的方法去调用,在使用Flinksql的时候一样需要去处理水位线,窗口等场景,所以这里会说明更丰富的用法:
- fromDataStream(DataStream): 将一个只允许插入的变化和任意类型的流解释为一个表。默认情况下,事件时间和水印不被传播。
- fromDataStream(DataStream, Schema): 将一个只需插入的变化流和任意类型的数据解释为一个表。可选的模式允许丰富列的数据类型并添加时间属性、水印策略、其他计算列或主键。
- createTemporaryView(String, DataStream): 在一个名字下注册流,以便在SQL中访问它。它是 createTemporaryView(String, fromDataStream(DataStream))的一个快捷方式。
- createTemporaryView(String, DataStream, Schema): 在一个名字下注册流,以便在SQL中访问它。它是createTemporaryView(String, fromDataStream(DataStream, Schema))的一个快捷键。
- toDataStream(DataStream): 将一个表转换为一个只能插入的变化流。默认的流记录类型是org.apache.flink.types.Row。一个单一的行时属性列被写回DataStream API的记录中。水印也会被传播。
- toDataStream(DataStream, AbstractDataType): 将一个表转换为一个仅有插入的变化流。这个方法接受一个数据类型来表达想要的流记录类型。规划器可能会插入隐含的转换和重新排序列,以将列映射到(可能是嵌套的)数据类型的字段。
- toDataStream(DataStream, Class): toDataStream(DataStream, DataTypes.of(Class))的一个快捷方式,可以快速创建所需的数据类型的反映。
4.日志更新流(Insert-Only)的说明(重点)
在内部,Flink的表运行时间是一个变化日志处理器。一个StreamTableEnvironment提供了以下方法来暴露这些变化数据捕获(CDC)的功能:
- fromChangelogStream(DataStream): 将变更日志条目的流解释为一个表。流记录类型必须是org.apache.flink.types.Row,因为其RowKind标志在运行时被评估。默认情况下,事件时间和水印不会被传播。这个方法期望一个包含各种变化(在org.apache.flink.types.RowKind中列举)的changelog作为默认的changelogMode。
- fromChangelogStream(DataStream, Schema): 允许为DataStream定义一个模式,类似于fromDataStream(DataStream, Schema)。否则语义等同于fromChangelogStream(DataStream)。
- fromChangelogStream(DataStream, Schema, ChangelogMode): 提供了关于如何将一个流解释为变更日志的完全控制。传递的ChangelogMode可以帮助规划器区分只插入、上移或下移的行为。
- toChangelogStream(Table): 与fromChangelogStream(DataStream)相反的操作。它产生一个带有org.apache.flink.types.Row实例的流,并在运行时为每个记录设置RowKind标志。所有种类的更新表都被这个方法所支持。如果输入表包含一个单一的rowtime列,它将被传播到流记录的时间戳中。水印也将被传播。
- toChangelogStream(Table, Schema): 与fromChangelogStream(DataStream, Schema)相反的操作。该方法可以丰富产生的列数据类型。如果有必要的话,规划者可能会插入隐式转换。有可能将行时间作为元数据列写出来。
- toChangelogStream(Table, Schema, ChangelogMode): 提供关于如何将表转换为变更日志流的完全控制。传递的ChangelogMode可以帮助计划者区分只插入、上移或下移的行为。
4. 流处理中的表
平常在开发过程中用的是hive多一些,不论是Hive还是Mysql,处理的都是有界数据,在流式场景中还是有很大区别的,如下图:
1. 动态表 & 连续查询(Continuous Query)
1. 动态表
当流中有新增的数据,初始表中就会插入一条数据,而基于这个表定义的查询sql应该基于之前的基础上更新结果,这种就被称为动态表
2.持续查询
查询的sql一般是查询完就结束了,在流式环境中,表数据是源源不断到来的,因此查询也不能只查询一次就结束,在动态表上的查询我们就称之为持续查询
2.时间属性
之前的文章中详细说明了Flink中的时间语义,在Stream API和ProcessFunction中可以方便的去设置时间和获取时间,在sql中我们也可以定义时间属性。
1. 在定义表DDL的时候指定
处理时间:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
事件时间:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
2.在 DataStream 到 Table 转换时定义
处理时间:
DataStream<Tuple2> stream = ...;
// 声明一个额外的字段作为时间属性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
WindowedTable windowedTable = table.window(
Tumble.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
事件时间:
// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
3.使用 TableSource 定义
处理时间:
// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource, DefinedProctimeAttribute {
@Override
public TypeInformation getReturnType() {
String[] names = new String[] {"user_name" , "data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// 这个名字的列会被追加到最后,作为第三列
return "user_action_time";
}
}
// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
事件时间:
// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));
3. 时区
在 Flink1.13 之前, PROCTIME() 函数返回的类型是 TIMESTAMP , 返回值是UTC时区下的 TIMESTAMP 。 例如: 当上海的时间为 2021-03-01 12:00:00 时, PROCTIME() 显示的时间却是错误的 2021-03-01 04:00:00 。 这个问题在 Flink 1.13 中修复了, 因此用户不用再去处理时区的问题了。
Flink SQL 使用函数 PROCTIME() 来定义处理时间属性, 该函数返回的类型是 TIMESTAMP_LTZ 。
4.时态表
时时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
版本: 时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
5. 窗口
从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,
Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。
官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/
支持以下几种窗口:
- 滚动窗口(Tumbling Windows);
- 滑动窗口(Hop Windows,跳跃窗口);
- 累积窗口(Cumulate Windows);
- 会话窗口(Session Windows,目前尚未完全支持)。
1. 滚动窗口(Tumbling Windows)
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
data:传入表,可以是任何带有时间属性列的关系
timecol:列描述符,表示数据的哪个时间属性列应该被映射到翻转窗口。
size:窗口大小
举例:基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将
表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)
2. 滑动窗口(Hop Windows,跳跃窗口)
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
data:传入表,可以是任何带有时间属性列的关系
timecol:列描述符,表示数据的哪个时间属性列应该被映射到翻转窗口。
slide:滑动步长
size:窗口大小
举例:基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));
3. 累积窗口(Cumulate Windows)
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data:传入表,可以是任何带有时间属性列的关系
timecol:列描述符,表示数据的哪个时间属性列应该被映射到翻转窗口。
step:持续时间,指定在连续累积窗口结束后增加的窗口大小
size:窗口大小
举例:基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
4. 会话窗口(Session Windows,目前尚未完全支持)
暂未支持
6.SQL
基本的sql用法大同小异,这里不做汇总,官方文档有说明:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/overview/