1. Regular Join:灵活性最高但资源消耗较大的连接方式
Regular Join 是最常见的一种 Join 类型,语法形式如下:
SELECT * FROM Orders INNER JOIN Product ON Orders.product_id = Product.id;
虽然在写法上与离线 SQL 几乎一致,但在 Flink 的流处理环境中,其执行机制存在显著差异。
1.1 流场景下 Regular Join 的性能开销
由于流式数据是无界的,Regular Join 需要满足这样的语义:
- 任意一侧的数据流只要出现新记录或更新,就必须能与另一侧的所有历史乃至未来的匹配记录完成关联。
为实现这一目标,Flink 必须将两个参与 Join 的表的全部数据都保存在状态(State)中。这导致:
- 状态数据可能持续增长,理论上没有上限;
- 实际状态大小取决于两方面:
- 两张表各自的去重后行数;
- 多表链式连接时产生的中间结果规模。
因此,官方文档多次提醒用户:必须根据业务逻辑合理设置 State TTL(Time-To-Live),否则极易因状态膨胀引发内存溢出(OOM)问题。
1.2 支持的等值连接类型
Flink 中的 Regular Join 当前支持以下几种连接模式:
- Inner Join
- Left Join
- Right Join
- Full Outer Join
但对 Join 条件有严格限制:
- 必须包含至少一个等值条件(Equi-Join),例如:
Orders.product_id = Product.id; - 不支持纯笛卡尔积;
- 不允许仅使用非等值条件(如大于、小于)作为唯一 Join 判据(即 Theta Join 不被允许)。
a.col = b.col
a.x > b.y
常见的合法写法包括:
-- 内连接 SELECT * FROM Orders INNER JOIN Product ON Orders.product_id = Product.id; -- 左外连接 SELECT * FROM Orders LEFT JOIN Product ON Orders.product_id = Product.id; -- 右外连接 SELECT * FROM Orders RIGHT JOIN Product ON Orders.product_id = Product.id; -- 全外连接 SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.product_id = Product.id;
1.3 多表连接建议使用 MultiJoin 优化
当涉及多个表进行链式连接时,若按顺序逐个执行 Join,容易造成中间状态急剧膨胀。
Flink 提供了 MultiJoin 算子来合并和优化多表连接操作。实践建议如下:
- 尽量一次性写出所有参与连接的表,以便优化器能够识别并启用 MultiJoin;
- 明确指定所有 Join 条件,避免产生隐式的笛卡尔积。
A JOIN B JOIN C JOIN D
2. Interval Join:基于时间窗口的事件关联机制
Interval Join 主要用于解决“在一定时间范围内关联两个事件”的需求。典型应用场景包括:
- 将订单与其对应的支付或发货记录进行匹配,但仅限于特定的时间区间内。
示例语句:
SELECT * FROM Orders o, Shipments s WHERE o.id = s.order_id AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time;
该查询的含义是:
- 订单 ID 相同;
- 订单发生的时间落在发货时间前 4 小时至发货时刻之间。
[ship_time - 4 小时, ship_time]
2.1 Interval Join 的使用限制
相比 Regular Join,Interval Join 更加受限但也更高效:
- 仅适用于具有时间属性且数据为追加模式(append-only)的表;
- Join 条件必须同时满足:
- 至少一个等值匹配条件;
- 一个有效的时间区间谓词。
合法的时间条件示例如下:
ltime = rtimeltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTEltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
得益于时间字段的单调递增特性(或结合 Watermark 处理乱序数据),Flink 可以:
- 利用 Watermark 判断哪些记录不会再有匹配机会;
- 安全地从状态中清除过期数据。
简而言之,Interval Join 相当于一种带有时间边界约束的 Join 操作,在保障正确性的同时显著降低了状态存储的压力。
3. Temporal Join:实现事实流的“时点维度填充”
Temporal Join(时态连接)是构建 Flink 流式数据仓库的核心功能之一。其实质是:
- 将一条不断流入的事实数据流,与一张随时间变化的维度表,在事件发生的那个确切时间点上进行快照匹配。
在处理不断变化的维度表时,若需在特定时间点还原出“当时的快照版本”并将其与事实数据流进行关联(Join),这一需求在实际业务中十分普遍。例如:
- 币种汇率表
- 商品价格表
- 用户等级信息表
同一条订单或行为记录,在当天查看和隔日查看时,所关联的维度信息可能存在差异——这正是时态 Join 发挥作用的场景。
Flink 提供了两种类型的时态 Join 操作,分别基于不同的时间语义:
- 事件时间时态 Join:依据事件发生时刻的状态进行关联,用于还原历史真实情况;
- 处理时间时态 Join:依据当前系统处理时刻的最新状态进行关联,更接近实时查询语义。
3.1 事件时间时态 Join:还原“当时”的维度状态
一个典型的使用案例是“订单金额 + 汇率”转换:
订单金额应按照下单那一刻的汇率换算为美元(USD),而非采用后续更新后的最新汇率。
核心 SQL 示例(DDL 已省略):
SELECT order_id, price, orders.currency, conversion_rate, order_time FROM orders LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency = currency_rates.currency;
语义说明:
- 左侧为订单流表,包含 watermark 机制以支持事件时间处理;
order_time - 右侧为带有版本管理能力的汇率维表,具备主键定义;
currencyupdate_time - 对于每条订单记录,系统会利用其事件时间
去查找该时刻对应的汇率版本;order_time - 一旦完成关联,即使后续汇率发生变化,历史结果也不会被重新计算或更新。
注意事项:
- Join 条件必须包含对右表主键的等值匹配条件;
orders.currency = currency_rates.currency - Watermark 的设定决定了两个方面:
- 多长时间后可以安全清理某个时间之前的旧版本数据;
- 系统对乱序到达事件的容忍窗口。
3.2 处理时间时态 Join:以“当前最新值”补全维度字段
这种 Join 方式更贴近“Lookup”语义:
每当一条事实数据进入流中,即刻使用此时维度表中的最新可用数据来填充相关信息。
典型语法示例:
SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
主要特点包括:
- 不保存维度表的历史变更版本;
- 相同 key 在不同时间点执行 Join,可能返回不同结果(总是取最新值);
- 结果具有非确定性,适用于“实时监控当前状态”类场景,而不适合严格审计或回溯分析。
4. Lookup Join:最常用的“流 + 外部维表”字段补全方式
Lookup Join 可视为“处理时间时态 Join”与“外部维表连接器”的结合体。
常见应用场景包括:
- 实时订单流或用户行为流
- 结合存储于 MySQL、HBase 或 Redis 中的维度信息进行字段扩展
DDL 定义示例:
CREATE TEMPORARY TABLE Customers ( id INT, name STRING, country STRING, zip STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb', 'table-name' = 'customers' );
查询语句:
SELECT o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
关键要素解析:
- Orders 表作为主数据流,携带处理时间属性;
proc_time - Customers 为外部维表,由指定的 connector 实现访问;
jdbc - FOR SYSTEM_TIME AS OF 子句表示:在处理当前订单的瞬间,从 MySQL 中读取对应的一行数据;
FOR SYSTEM_TIME AS OF o.proc_time - 此后即使数据库中的客户信息发生变更,也不会触发对已生成结果的回溯更新。
Lookup Join 的典型特征:
- 延迟受外部系统响应速度影响;
- 仅使用当前最新记录,不保留历史版本;
- Join 条件必须为等值条件,通常基于主键或索引字段。
5. UNNEST / 数组 & Map & Multiset 展开
在实际业务建模中,常出现“单行数据内嵌数组或 Map 结构”的情形,例如:
- 一条订单记录包含多个商品项
- 一条日志中包含统计用的 Map 或多重集合(Multiset)结构
UNNEST 展示了如何将这类嵌套结构展开为扁平化的多行记录,UNNEST 正是解决此类问题的核心工具。
5.1 数组展开
示例一:常量数组展开
SELECT * FROM (VALUES('order_1')), UNNEST(ARRAY['shirt', 'pants', 'hat']);
示例二:基于表字段的数组展开
SELECT order_id, product_name FROM Orders
5.2 WITH ORDINALITY:同时获取元素下标
在使用 UNNEST 展开结构时,可以通过添加 WITH ORDINALITY 来一并返回元素的序号。例如:
SELECT *
FROM (VALUES ('order_1'), ('order_2'))
CROSS JOIN UNNEST(ARRAY['shirt', 'pants', 'hat'])
WITH ORDINALITY AS t(product_name, index);
需要注意的是:
- 下标从 1 开始计数;
- 对于数组类型,展开顺序是确定且有保证的;
- 而对于 Map 或 multiset 类型,遍历顺序则不作保证。
以 Map 为例,进行展开操作:
SELECT *
FROM
(VALUES('order_1'))
CROSS JOIN UNNEST(MAP['shirt', 2, 'pants', 1, 'hat', 1]) WITH ORDINALITY;
当处理 multiset 时,若某元素的出现次数(multiplicity)为 2,则会展开为两行相同记录。
index
6. 表函数 Join:实现“一行变多行”的动态扩展
表函数 Join 结合 LATERAL 关键字,其核心语义是:
将左表的每一行作为输入参数,调用一次表函数,将其返回的多行结果与该行进行连接。
LATERAL TABLE
6.1 内连接(Inner Join)表函数
如果表函数执行后未返回任何结果,则对应的左表行将被过滤掉:
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res);
6.2 左外连接(Left Outer Join)表函数
即使表函数无返回结果,左表记录依然保留,右侧字段填充 NULL:
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE;
NULL
常见应用场景包括:
- 对字段内容进行拆分或解析,如将 JSON 字符串转换为多行多列数据;
- 对文本内容进行分词处理,输出多个关键词行;
- 调用外部服务接口,基于单行输入返回多个候选结果行。
7. 如何选择合适的 Join 类型?一表归纳清晰对比
根据不同的业务需求,应选用最匹配的 Join 方式。以下是典型场景与推荐方案对照:
| 场景诉求 | 推荐 Join 类型 |
|---|---|
| 任意两个动态表全量关联,语义最贴近离线 SQL | Regular Join |
| 两个事件流基于 ID 和时间区间进行匹配 | Interval Join |
| 事实表回放某一时刻维度表的历史版本 | Event Time Temporal Join |
| 事实表实时补全当前最新的维度信息 | Processing Time Temporal Join / Lookup Join |
| 将数组、Map 或 multiset 展平为多行 | UNNEST +(可选)WITH ORDINALITY |
| 每行触发一个 UDTF 并将其多行结果 Join | LATERAL TABLE / 表函数 Join |
一条关键实战经验提醒:
Regular Join 和无限制的多表 Join 是生产环境中事故高发区。
在流式计算中必须重点考虑以下因素:
- 状态数据的存储规模;
- watermark 机制与状态 TTL 设置;
- 是否可以改写为 Interval Join、Temporal Join 或 Lookup Join;
- 是否可通过引入窗口约束,将 Join 范围限定在合理的业务时间段内。
8. 总结
本文从语义逻辑、资源代价和典型应用三个维度,系统梳理了 Flink SQL 中各类 Join 的特性:
- Regular Join:行为最接近传统离线 SQL,但状态管理压力最大;
- Interval Join:通过时间区间限定关联范围,有效控制状态增长;
- Temporal Join:依据时间点查找维度表对应版本值,是流式数仓的核心能力之一;
- Lookup Join:用于实时补充最新维度信息,适用于低延迟查询场景;
- UNNEST 与表函数 Join:专用于处理嵌套或半结构化数据,实现“摊平”展开。


雷达卡


京公网安备 11010802022788号







