1. 什么是 Versioned Table?
在深入理解 Versioned Table(版本表)之前,先明确三个基础概念:
- 动态表(Dynamic Table):表示一个随时间不断变化的数据关系,其底层由 changelog 构成,包含 INSERT、UPDATE 和 DELETE 操作;
- 主键约束(PRIMARY KEY):确保表中某列或列组合在任意时刻都具有唯一且非空的值;
- 时间属性:通常指事件时间(Event Time)字段,用于标记每条变更发生的具体时间。
当一张动态表同时满足以下两个条件时:
- 定义了主键约束(可以是单列或多列);
- 具备明确的事件时间属性;
Flink 即可将其识别为版本表(Versioned Table)。这意味着,对于某个主键 key,Flink 能够追踪它在不同时刻的取值情况,清楚地知道“该 key 在什么时间段内对应哪个值”。
换句话说,Versioned Table 不仅反映当前状态,还完整保留了从过去到现在的所有状态演变过程。这种能力对处理 Slowly Changing Dimension(SCD,缓慢变化维度)场景至关重要,同时也是实现Temporal Join(时态关联)的前提基础。
2. 实例解析:商品价格如何体现“版本化”特性?
以常见的业务场景——商品价格变动为例进行说明。
假设我们有一张记录商品价格随时间更新的表:
products
其底层数据流以 changelog 形式呈现如下:
(changelog kind) update_time product_id product_name price
================= =========== ========== ============ =====
+(INSERT) 00:01:00 p_001 scooter 11.11
+(INSERT) 00:02:00 p_002 basketball 23.11
-(UPDATE_BEFORE) 12:00:00 p_001 scooter 11.11
+(UPDATE_AFTER) 12:00:00 p_001 scooter 12.99
-(UPDATE_BEFORE) 12:00:00 p_002 basketball 23.11
+(UPDATE_AFTER) 12:00:00 p_002 basketball 19.99
-(DELETE) 18:00:00 p_001 scooter 12.99
通过这条变更序列可以看出:
p_001
- 滑板车于 00:01:00 上架,初始价格为 11.11;
- 在 12:00:00 价格上涨至 12.99;
- 到 18:00:00 被删除(即下架);
p_002
- 篮球的价格从 23.11 下调至 19.99。
如果我们在不同时间点对该动态表进行快照查询,所看到的结果将有所不同:
在 10:00:00 查询时:
update_time product_id product_name price
=========== ========== ============ =====
00:01:00 p_001 scooter 11.11
00:02:00 p_002 basketball 23.11
在 13:00:00 查询时:
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
这正是版本表的核心语义所在:
同一个主键(如商品 ID)在不同时间可能拥有不同的有效值,而 Flink 可自动维护这一整条时间线上的状态演化过程。
product_id
3. Versioned Table Source:源数据天然支持版本化
许多 CDC(Change Data Capture)或 Upsert 类型的数据源本身就携带 changelog 语义,例如:
- 使用 Upsert 模式的 Kafka 表(基于 key 的 changelog 流);
- 数据库的 CDC 输出格式,如 Debezium、Canal 等。
只要满足以下两点:
- 在 DDL 中声明了 PRIMARY KEY;
- 定义了有效的 事件时间属性(Event Time);
Flink 就能将这类数据源作为 Versioned Table 直接使用。
典型建表语句示例如下:
CREATE TABLE products ( product_id STRING, product_name STRING, price DECIMAL(32, 2), update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, PRIMARY KEY (product_id) NOT ENFORCED, WATERMARK FOR update_time AS update_time ) WITH (...);
其中:
product_id
- product_id 作为主键,表明该表为 upsert 模式;
update_time
- update_time 是事件时间字段,通常来源于 CDC 记录中的 source timestamp 元数据;
由此,Flink 可以记录每个
product_id
在各个
update_time
时刻对应的值,从而构建出完整的版本历史。
4. Versioned View:从 Append-only 表推导版本表
上述情况适用于数据源本身即为 changelog 格式。但在实际应用中,也常遇到另一种情形:
数据源为纯追加模式(Append-only),无显式 UPDATE 或 DELETE 操作,但从业务逻辑上看,多条记录实则代表同一实体的不同版本。
例如下面这张汇率记录表:
CREATE TABLE currency_rates ( currency STRING, rate DECIMAL(32, 10), update_time TIMESTAMP(3), WATERMARK FOR update_time AS update_time ) WITH ( 'connector' = 'kafka', 'topic' = 'rates', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json' );
其数据形式如下(JSON 格式仅包含插入记录,无 changelog 语义):
(changelog kind) update_time currency rate
================ =========== ======== ====
+(INSERT) 09:00:00 Yen 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 USD 1
+(INSERT) 11:15:00 Euro 119
+(INSERT) 11:45:00 Pounds 107
+(INSERT) 11:49:00 Pounds 108
对 Flink 来说:
- 这是一个 Append-only 表;
- 无法直接定义
PRIMARY KEY(currency)
但我们从业务角度知道:
- 同一 currency 的多条记录实际上是其“历史版本”;
- 我们只关心在任一时间点上,各货币的最新有效汇率。
此时可通过创建 Versioned View 的方式,将原始 Append-only 表转换为逻辑上的版本表,使 Flink 能按时间线管理每个键的最新状态,进而支持时态查询与关联操作。
4.1 使用 ROW_NUMBER 实现去重并构建版本视图
目标是创建一个视图,满足以下条件:
- 对每一个
,仅保留最新的一条记录;currency - 将
作为推导出的主键(PRIMARY KEY);currency - 使用
作为事件时间字段;update_time - 输出的 changelog 流符合 Versioned Table 所需的 upsert 格式(即 INSERT 或 UPDATE_AFTER 操作)。
对应的 SQL 语句如下:
-- 定义一个版本化视图
CREATE VIEW versioned_rates AS
SELECT currency, rate, update_time -- (1) 保留事件时间字段 update_time
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY currency -- (2) 按 currency 分组,用于推断唯一键
ORDER BY update_time DESC
) AS rownum
FROM currency_rates
)
WHERE rownum = 1;
versioned_rates
该视图生成的 changelog 会被 Flink 自动识别为:
(changelog kind) update_time currency rate ================ =========== ========= ==== +(INSERT) 09:00:00 Yen 102 +(INSERT) 09:00:00 Euro 114 +(INSERT) 09:00:00 USD 1 +(UPDATE_AFTER) 11:15:00 Euro 119 +(INSERT) 11:45:00 Pounds 107 +(UPDATE_AFTER) 11:49:00 Pounds 108
注意:在此模式下,针对相同 和 Euro 的后续更新被视为 UPDATE_AFTER 类型的变更,而非新的 INSERT 操作。Pounds
4.2 Flink 如何识别“这是一个版本视图”?
Flink 内部具备一种优化机制,能够自动识别特定查询结构为“版本表构造”模式。当 SQL 查询符合如下结构时:
SELECT [column_list]
FROM table_name
QUALIFY ROW_NUMBER() OVER (
PARTITION BY col1[, col2...]
ORDER BY time_attr [ASC|DESC]
) = 1;
就会被解析为版本表语义。这种写法与我们前面使用的子查询 + ROW_NUMBER 形式等价,而 提供了更清晰的对照说明:ROW_NUMBER() + WHERE rownum = 1
:为每个分组内的行分配序号,从 1 开始递增;ROW_NUMBER():定义分组依据,即逻辑上的去重键 —— 此列将作为版本表的 PRIMARY KEY;PARTITION BY:排序所用字段,必须是具有事件时间语义(Event Time)的时间列;ORDER BY time_attr:筛选出每组中排名第一的记录(根据排序方向决定是最新的或最旧的)。WHERE rownum = 1
采用 的 QUALIFY 写法更为简洁且可读性强,示例如下:QUALIFY
SELECT currency, rate, update_time
FROM currency_rates
QUALIFY
ROW_NUMBER() OVER (
PARTITION BY currency
ORDER BY update_time DESC
) = 1;
一旦满足上述模式,Flink 即可进行内部优化,并将该查询的下游使用者视为在访问一个 Versioned Table。
5. 版本表的应用价值与典型场景
引入 Versioned Table 后,可以支持大量具有强时态语义的数据处理需求,尤其适用于需要回溯“历史某一时刻维度状态”的场景。
5.1 汇率维度表:订单金额按发生时汇率折算
设有以下两个核心表:
- 订单事实表:
,包含orders、order_time、currency等字段;amount - 汇率版本表:
,记录各币种随时间变化的汇率。versioned_rates
通过 Temporal Join 技术,可对任意时间点的订单,使用其下单时刻的有效汇率进行金额换算 —— 这正是 Versioned Table 设计的核心用途之一(尽管原文未明确提及 Temporal Join,但其底层支撑正是此类结构)。
5.2 商品价格维度表:分析历史价格变动影响
构建商品价格的版本表 ,并与订单表 products 关联,可用于实现多种分析功能:order_items
- 统计不同价格区间在特定时间段内的销量分布;
- 评估每次调价后整体 GMV 的变化趋势;
- 判断用户是否倾向于在促销期间完成购买行为。
5.3 用户等级与会员权益变更追踪
利用 changelog 数据结合 Event Time,构建用户等级和权益信息的版本表 。随后,在处理用户行为日志时,可根据每条日志的发生时间,关联查询“当时”用户的实际等级和享有的权益内容,从而实现精准的历史状态还原。user_id + level + valid_from + valid_to
以上均为“将维度表赋予时间维度、支持历史追溯”的经典应用案例。
6. 实践建议与关键注意事项
在实际落地过程中,以下几个细节极易被忽视,但至关重要:
确保 Versioned Source 具备主键和事件时间
对于 CDC 或 Upsert 类型的数据源,务必在 DDL 中声明 ;PRIMARY KEY (...) NOT ENFORCED
同时使用 显式标记时间列为 Event Time 属性。WATERMARK
Append-only 源需借助 ROW_NUMBER 构建版本视图
若原始数据为仅追加流(append-only),则需通过以下方式构造版本化视图:
- 按业务主键进行分区;
- 按时间字段降序排序;
- 配合
取每组首行。ROW_NUMBER() = 1
推荐使用 的 QUALIFY 写法,语法紧凑且易于理解。QUALIFY
排序字段必须为时间属性
用于 ORDER BY 的列 必须是已定义的 Event Time 字段;time_attr
普通的 TIMESTAMP 类型无法满足要求,必须通过 明确赋予时间语义。WATERMARK
版本表本质是语义层面的逻辑视图
通常无需将完整的版本历史物化存储到外部系统中;
Flink 会在运行时利用自身状态机制,维护“每个 key 当前有效版本”以及必要的历史上下文,以支持时态操作。
谨慎选择主键
主键的选择直接影响去重效果和状态管理效率,应基于业务含义准确选取,避免因键值设计不当导致数据错误或性能瓶颈。
Versioned Table 的核心语义建立在用户指定的 PRIMARY KEY(或去重键)之上;一旦该键选择不当,所有后续基于此表生成的历史视图都将出现偏差,导致结果不准确。
可以这样一句话概括 Versioned Tables 的本质:
Versioned Table = 具备主键和时间属性的动态表,使得 Flink 能够追踪每个键在时间维度上的值的变化过程。
products
它所应对的是如下典型场景:用户不仅需要获取当前最新的数据状态,还希望回溯特定时刻的历史状态——即不仅要了解“现在是什么”,还要明确“过去某一时刻曾是什么”。
随着此类需求在实际业务中日益普遍——如数据审计、风险控制、操作追溯、A/B 测试分析以及实时数仓构建等场景——掌握 Versioned Tables 已成为深入运用 Flink SQL 时不可或缺的一项高级技能。


雷达卡


京公网安备 11010802022788号







