当前位置:首页 > 技术文章 > 正文内容

7.Flink总结之一文彻底搞懂FlinkSQL

arlanguage1个月前 (03-28)技术文章19

Flink在开发过程中主要还是用sql,使用sql能处理百分之八十左右的开发,其余的场景才需要使用处理函数进行处理,这次就对Flinksql模块进行总结汇总。

Flink除了sql模块还有Table Api模块,Table Api模块用的比较少,不做特殊讲解。

在生产环境中如果使用sql模块尽量使用1.13版本及以上,因为1.9版本合并了阿里巴巴的Blink才有了大的改变,1.12版本做了功能上的完善,但是不稳定,1.13是个版本,说明文档:Release Notes - Flink 1.13

官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/overview/

我们这次就是总结的是最高层语言,sql模块,如下图所示:

1. 配置依赖(IDE中运行程序需要)

Flink依赖明细说明:

table api相关依赖:



org.apache.flink
flink-table-api-java-bridge_2.11
1.13.6
provided



org.apache.flink
flink-table-api-scala-bridge_2.11
1.13.6
provided

IDE中运行程序需要添加依赖如下:



org.apache.flink
flink-table-planner_2.11
1.13.6
provided



org.apache.flink
flink-table-planner-blink_2.11
1.13.6
provided

部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:


org.apache.flink
flink-streaming-scala_2.11
1.13.6
provided

扩展依赖,实现自定义格式来解析 Kafka 数据,或者自定义函数


org.apache.flink
flink-table-common
1.13.6
provided

拓展模块包括的内容:

  • SerializationSchemaFactory
  • DeserializationSchemaFactory
  • ScalarFunction
  • TableFunction
  • AggregateFunction

2. 开发基本架构

代码测试:

public class FlinkSqlTest {
public static void main(String[] args) throws Exception {
// 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建流式表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 获取source
SingleOutputStreamOperator map = env.socketTextStream("hadoop110", 9999).map(t -> {
return new UserVisit(t,System.currentTimeMillis());
});
// sourceStream转换成表
Table table = tableEnv.fromDataStream(map);
// 创建临时表
tableEnv.createTemporaryView("user_visit",table);
// 创建执行sql并执行打印
Table queryResultTanle = tableEnv.sqlQuery("select `userName`,`time` from user_visit");
// 表转换成流并打印
tableEnv.toDataStream(queryResultTanle).print("Stream Print");
env.execute();
}
}

UserVisit代码

public class UserVisit {
public String userName;
public Long time;
public UserVisit() {
}
public UserVisit(String userName, Long time) {
this.userName = userName;
this.time = time;
}
}

测试代码:

3. 流和表的转换

前面两个模块只是案例,重点是仅插入流模块和日志更新流模块。

1. 表转换成流

在上面的例子中,使用了 Table table = tableEnv.fromDataStream(map); 来将Stream转换成Table,而后使用 tableEnv.toDataStream(queryResultTanle).print("Stream Print"); 将表转换成流并打印

// sourceStream转换成表
Table table = tableEnv.fromDataStream(map);
// 表转换成流
DataStream rowDataStream = tableEnv.toDataStream(queryResultTanle);

使用Flink sql统计数据并输出的时候发现,当aa重复第二次的时候控制台的op列输出了-U +U的显示,表面上是将之前的数据删除又增加一条新的数据(回撤流下面会针对说明)。而同样的逻辑,我们将聚合后的table转换成Stream的时候就会报错,如下面两张图所示。

Table执行并打印:

table转换成Stream并打印:

对于这个问题官方给的说明如下:

意思就是我们是在流式环境下工作的,产生的表是一个流式管道,是一个动态表,不仅有新增的数据还会有更新的数据或者删除的数据,那么在表转换成流的情况下就报表不支持消费更新变化,在这种情况下,使用更新日志(change log)。这样一来,对于表的所有更新操作,就变成了一条更新日志的流,我们就可以转换成流打印输出了。

代码中将toDataStream方法换成toChangelogStream即可

2.流转换成表

上面的代码中使用了 tableEnv.fromDataStream(map) 将流转换成表,由于数据类型定义的是Java Bean,所以自动将bean的属性转换成表的列,当然也还可以手动增加参数来指定列,除了Java bean类型,Flink还支持原子类型、Tuple、Row这几种类型,这里使用Tuple进行说明

public class FlinkSqlTest {
public static void main(String[] args) throws Exception {
// 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建流式表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 获取source
SingleOutputStreamOperator<Tuple2> map = env.socketTextStream("hadoop102", 9999).map(t -> {
return Tuple2.of(t,System.currentTimeMillis());
})
// 要指定返回值类型。
.returns(TypeInformation.of(new TypeHint<Tuple2>() { }));
// sourceStream转换成表
Table table = tableEnv.fromDataStream(map,$("f0").as("userName"),$("f1").as("time"));
// 创建临时表
tableEnv.createTemporaryView("user_visit",table);
// 创建执行sql并执行打印
tableEnv.sqlQuery("select `userName`,count(1) as cnt from user_visit group by `userName`").execute().print();
env.execute();
}
}

3. 仅插入流(Insert-Only Stream)的说明(重点)

对于流表之间的转换上面两个只是简单的说明,仅插入流和日志更新流都有丰富的方法去调用,在使用Flinksql的时候一样需要去处理水位线,窗口等场景,所以这里会说明更丰富的用法:

  1. fromDataStream(DataStream): 将一个只允许插入的变化和任意类型的流解释为一个表。默认情况下,事件时间和水印不被传播。
  1. fromDataStream(DataStream, Schema): 将一个只需插入的变化流和任意类型的数据解释为一个表。可选的模式允许丰富列的数据类型并添加时间属性、水印策略、其他计算列或主键。
  1. createTemporaryView(String, DataStream): 在一个名字下注册流,以便在SQL中访问它。它是 createTemporaryView(String, fromDataStream(DataStream))的一个快捷方式。
  1. createTemporaryView(String, DataStream, Schema): 在一个名字下注册流,以便在SQL中访问它。它是createTemporaryView(String, fromDataStream(DataStream, Schema))的一个快捷键。
  1. toDataStream(DataStream): 将一个表转换为一个只能插入的变化流。默认的流记录类型是org.apache.flink.types.Row。一个单一的行时属性列被写回DataStream API的记录中。水印也会被传播。
  1. toDataStream(DataStream, AbstractDataType): 将一个表转换为一个仅有插入的变化流。这个方法接受一个数据类型来表达想要的流记录类型。规划器可能会插入隐含的转换和重新排序列,以将列映射到(可能是嵌套的)数据类型的字段。
  1. toDataStream(DataStream, Class): toDataStream(DataStream, DataTypes.of(Class))的一个快捷方式,可以快速创建所需的数据类型的反映。

4.日志更新流(Insert-Only)的说明(重点)

在内部,Flink的表运行时间是一个变化日志处理器。一个StreamTableEnvironment提供了以下方法来暴露这些变化数据捕获(CDC)的功能:

  1. fromChangelogStream(DataStream): 将变更日志条目的流解释为一个表。流记录类型必须是org.apache.flink.types.Row,因为其RowKind标志在运行时被评估。默认情况下,事件时间和水印不会被传播。这个方法期望一个包含各种变化(在org.apache.flink.types.RowKind中列举)的changelog作为默认的changelogMode。
  1. fromChangelogStream(DataStream, Schema): 允许为DataStream定义一个模式,类似于fromDataStream(DataStream, Schema)。否则语义等同于fromChangelogStream(DataStream)。
  1. fromChangelogStream(DataStream, Schema, ChangelogMode): 提供了关于如何将一个流解释为变更日志的完全控制。传递的ChangelogMode可以帮助规划器区分只插入、上移或下移的行为。
  1. toChangelogStream(Table): 与fromChangelogStream(DataStream)相反的操作。它产生一个带有org.apache.flink.types.Row实例的流,并在运行时为每个记录设置RowKind标志。所有种类的更新表都被这个方法所支持。如果输入表包含一个单一的rowtime列,它将被传播到流记录的时间戳中。水印也将被传播。
  1. toChangelogStream(Table, Schema): 与fromChangelogStream(DataStream, Schema)相反的操作。该方法可以丰富产生的列数据类型。如果有必要的话,规划者可能会插入隐式转换。有可能将行时间作为元数据列写出来。
  1. toChangelogStream(Table, Schema, ChangelogMode): 提供关于如何将表转换为变更日志流的完全控制。传递的ChangelogMode可以帮助计划者区分只插入、上移或下移的行为。

4. 流处理中的表

平常在开发过程中用的是hive多一些,不论是Hive还是Mysql,处理的都是有界数据,在流式场景中还是有很大区别的,如下图:

1. 动态表 & 连续查询(Continuous Query)

1. 动态表

当流中有新增的数据,初始表中就会插入一条数据,而基于这个表定义的查询sql应该基于之前的基础上更新结果,这种就被称为动态表

2.持续查询

查询的sql一般是查询完就结束了,在流式环境中,表数据是源源不断到来的,因此查询也不能只查询一次就结束,在动态表上的查询我们就称之为持续查询

2.时间属性

之前的文章中详细说明了Flink中的时间语义,在Stream API和ProcessFunction中可以方便的去设置时间和获取时间,在sql中我们也可以定义时间属性。

1. 在定义表DDL的时候指定

处理时间:

CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

事件时间:

CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

2.在 DataStream 到 Table 转换时定义

处理时间:

DataStream<Tuple2> stream = ...;
// 声明一个额外的字段作为时间属性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
WindowedTable windowedTable = table.window(
Tumble.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));

事件时间

// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));

3.使用 TableSource 定义

处理时间:

// 定义一个由处理时间属性的 table source
public class UserActionSource implements StreamTableSource, DefinedProctimeAttribute {
@Override
public TypeInformation getReturnType() {
String[] names = new String[] {"user_name" , "data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream getDataStream(StreamExecutionEnvironment execEnv) {
// create stream
DataStream stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// 这个名字的列会被追加到最后,作为第三列
return "user_action_time";
}
}
// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));

事件时间:

// Option 1:
// 基于 stream 中的事件产生时间戳和 watermark
DataStream<Tuple2> stream = inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
// Option 2:
// 从第一个字段获取事件时间,并且产生 watermark
DataStream<Tuple3> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
// Usage:
WindowedTable windowedTable = table.window(Tumble
.over(lit(10).minutes())
.on($("user_action_time"))
.as("userActionWindow"));

3. 时区

在 Flink1.13 之前, PROCTIME() 函数返回的类型是 TIMESTAMP , 返回值是UTC时区下的 TIMESTAMP 。 例如: 当上海的时间为 2021-03-01 12:00:00 时, PROCTIME() 显示的时间却是错误的 2021-03-01 04:00:00 。 这个问题在 Flink 1.13 中修复了, 因此用户不用再去处理时区的问题了。

Flink SQL 使用函数 PROCTIME() 来定义处理时间属性, 该函数返回的类型是 TIMESTAMP_LTZ 。

4.时态表

时时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。

版本: 时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。

版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。

普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

5. 窗口

从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,

Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。

官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/

支持以下几种窗口:

  1. 滚动窗口(Tumbling Windows);
  1. 滑动窗口(Hop Windows,跳跃窗口);
  1. 累积窗口(Cumulate Windows);
  1. 会话窗口(Session Windows,目前尚未完全支持)。

1. 滚动窗口(Tumbling Windows)


TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

data:传入表,可以是任何带有时间属性列的关系

timecol:列描述符,表示数据的哪个时间属性列应该被映射到翻转窗口。

size:窗口大小

举例:基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将

表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

2. 滑动窗口(Hop Windows,跳跃窗口)

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

data:传入表,可以是任何带有时间属性列的关系

timecol:列描述符,表示数据的哪个时间属性列应该被映射到翻转窗口。

slide:滑动步长

size:窗口大小

举例:基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)

HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

3. 累积窗口(Cumulate Windows)

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

data:传入表,可以是任何带有时间属性列的关系

timecol:列描述符,表示数据的哪个时间属性列应该被映射到翻转窗口。

step:持续时间,指定在连续累积窗口结束后增加的窗口大小

size:窗口大小

举例:基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

4. 会话窗口(Session Windows,目前尚未完全支持)

暂未支持

6.SQL

基本的sql用法大同小异,这里不做汇总,官方文档有说明:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/overview/

扫描二维码推送至手机访问。

版权声明:本文由AR编程网发布,如需转载请注明出处。

本文链接:http://www.arlanguage.com/post/3635.html

分享给朋友:

“7.Flink总结之一文彻底搞懂FlinkSQL” 的相关文章

nginx开启ssl并把http重定向到https的两种方式

1 简介Nginx是一个非常强大和流行的高性能Web服务器。本文讲解Nginx如何整合https并将http重定向到https。https相关文章如下:(1)Springboot整合https原来这么简单(2)HTTPS之密钥知识与密钥工具Keytool和Keystore-Explorer(3)Sp...

Nginx配置中root和alias分不清?本文3分钟帮你解惑

Nginx在配置映射路径的时候,有时候看到的是root,有时候看到的是alias,这两个有什么区别?本文笔者将带大家详细了解一下。什么是NginxNGINX 是全球最受欢迎的 Web 服务器之一,用于反向代理和负载均衡,还提供 HTTP 服务器功能,主要是为实现最大性能和稳定性而设计的。Ubuntu...

haproxy负载均衡入门到转行

haproxy概述haproxy是一款开源的高性能的反向代理或者说是负载均衡服务软件之一,支持双机热备,虚拟主机基于TCP/HTTP应用代理,具有图形界面等功能。其配置简单,而且拥有很好的对服务器节点的健康检查功能(相当于keepalived健康检查),当其代理的后端服务器出现故障时,haproxy...

Linux系统非root用户下安装Nginx

通常使用Nginx或者Apache作为Web服务器时,默认监听80端口,因此默认会使用root用户去安装,而且,使用yum命令安装时,通常会安装到默认的路径下,默认路径通常是root用户才有执行权限的。如果不需要使用Nginx监听1024以下的端口,且对权限和网络管理比较严格时,能用非root权限解...

推荐一款 Nginx 可视化配置神器

Nginx 是前后端开发工程师必须掌握的神器。该神器有很多使用场景,比如反向代理、负载均衡、动静分离、跨域等等。把 Nginx 下载下来,打开 conf 文件夹的 nginx.conf 文件,Nginx 服务器的基础配置和默认的配置都存放于此。配置是让程序员非常头疼的事,比如 Java 后端框架...

Linux非root用户安装及配置Nginx

该文章用到了2次root权限,其中有一次root权限是没有必要用的,小编会在文章中进行说明。一、安装前置依赖gcc编译器(第一次需要root权限)这里我没有研究非root用户安装(因为太麻烦了),后面我会单独出一篇文章,讲解非root用户安装gcc编译器。直接使用su - 切换到root用户下面,使...