楼主: gbyf
180 0

[其他] Flink CDC系列之:数据接收器(Sink)的配置选项类DorisDataSinkOptions [推广有奖]

  • 0关注
  • 0粉丝

等待验证会员

学前班

80%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
0
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
30 点
帖子
2
精华
0
在线时间
0 小时
注册时间
2018-7-16
最后登录
2018-7-16

楼主
gbyf 发表于 2025-11-27 20:21:20 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

DorisDataSinkOptions 类详解:Flink CDC 中的数据接收器配置

该类为 Apache Flink CDC Connector 提供了向 Apache Doris 数据库写入数据时所需的全部配置参数,广泛应用于基于 Flink CDC 的数据同步任务中。通过此类定义的配置项,用户可以灵活控制与 Doris 集群的连接方式、流式加载行为以及容错机制。

[此处为图片1]

基础连接参数设置

以下配置用于建立与 Doris 集群的基本通信连接,确保 Flink 能够正确访问目标数据库。

  • FENODES:Doris 前端节点(FE)的 HTTP 地址列表
  • BENODES:后端节点(BE)的 HTTP 地址
  • USERNAME:连接 Doris 所需的用户名
  • PASSWORD:对应用户的登录密码
  • JDBC_URL:JDBC 连接字符串,用于元数据操作或查询路由
  • AUTO_REDIRECT:是否允许自动重定向请求至正确的 FE 节点

代码实现如下:

public static final ConfigOption<String> FENODES =
    ConfigOptions.key("fenodes")
        .stringType()
        .noDefaultValue()
        .withDescription("doris fe http address.");

public static final ConfigOption<String> BENODES =
    ConfigOptions.key("benodes")
        .stringType()
        .noDefaultValue()
        .withDescription("doris be http address.");

public static final ConfigOption<String> USERNAME =
    ConfigOptions.key("username")
        .stringType()
        .noDefaultValue()
        .withDescription("the doris user name.");

public static final ConfigOption<String> PASSWORD =
    ConfigOptions.key("password")
        .stringType()
        .noDefaultValue()
        .withDescription("the doris password.");

public static final ConfigOption<String> JDBC_URL =
    ConfigOptions.key("jdbc-url")
        .stringType()
        .noDefaultValue()
        .withDescription("doris jdbc url address.");

流式加载相关配置

此类参数主要用于优化和控制数据在 Flink 到 Doris 之间的流式写入过程,提升稳定性与一致性保障能力。

  • SINK_ENABLE_2PC:启用两阶段提交,确保精确一次(exactly-once)语义
  • SINK_CHECK_INTERVAL:异常状态检查的时间间隔(单位:毫秒)
  • SINK_MAX_RETRIES:写入失败后的最大重试次数
  • SINK_BUFFER_SIZE:单个缓冲区的大小,影响内存使用与传输效率
  • SINK_BUFFER_COUNT:并发使用的缓冲区数量
  • SINK_LABEL_PREFIX:为每次导入生成唯一标签的前缀标识
  • SINK_ENABLE_DELETE:是否开启物理删除功能以支持 DELETE 操作

部分关键配置的代码定义如下:

public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
    ConfigOptions.key("sink.enable-2pc")
        .booleanType()
        .defaultValue(false)
        .withDescription("enable 2PC while loading");

public static final ConfigOption<Integer> SINK_CHECK_INTERVAL =
    ConfigOptions.key("sink.check-interval")
        .intType()
        .defaultValue(10000)
        .withDescription("check exception with the interval while loading");

public static final ConfigOption<Integer> SINK_MAX_RETRIES =
    ConfigOptions.key("sink.max-retries")
        .intType()
        .defaultValue(3)
        .withDescription("max retry times for sink writer");
[此处为图片1]
// 流式加载缓冲区设置
public static final ConfigOption<Integer> SINK_BUFFER_SIZE =
    ConfigOptions.key("sink.buffer-size")
        .intType()
        .defaultValue(1024 * 1024)  // 默认缓存大小为1MB
        .withDescription("用于流式加载时缓存数据的缓冲区大小");

public static final ConfigOption<Integer> SINK_BUFFER_COUNT =
    ConfigOptions.key("sink.buffer-count")
        .intType()
        .defaultValue(3)
        .withDescription("用于缓存流加载数据的缓冲区数量");

[此处为图片1]

// 批量写入相关参数配置
public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
    ConfigOptions.key("sink.enable.batch-mode")
        .booleanType()
        .defaultValue(true)
        .withDescription("是否开启批量写入模式");

// 每批次刷新的最大行数限制
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
    ConfigOptions.key("sink.buffer-flush.max-rows")
        .intType()
        .defaultValue(50000)  // 单批最多处理5万行
        .withDescription("每个批次中最大可刷新的数据条数");

// 每批次刷新的最大字节数限制
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_BYTES =
    ConfigOptions.key("sink.buffer-flush.max-bytes")
        .intType()
        .defaultValue(10 * 1024 * 1024)  // 限制为10MB
        .withDescription("每个批次中允许刷新的最大字节量");

// 缓冲区自动刷新的时间间隔
public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL =
    ConfigOptions.key("sink.buffer-flush.interval")
        .durationType()
        .defaultValue(Duration.ofSeconds(10))
        .withDescription("缓冲区刷新操作的执行周期,单位毫秒");

[此处为图片2]

// 异步流加载队列容量(批量模式下使用)
public static final ConfigOption<Integer> SINK_FLUSH_QUEUE_SIZE =
    ConfigOptions.key("sink.flush-queue.size")
        .intType()
        .defaultValue(1)
        .withDescription("异步流加载过程中使用的队列大小");

// CDC 特性相关配置项

// 控制是否忽略更新前的旧值记录
public static final ConfigOption<Boolean> SINK_IGNORE_UPDATE_BEFORE =
    ConfigOptions.key("sink.ignore.update-before")
        .booleanType()
        .defaultValue(true)
        .withDescription("在处理CDC事件时,是否跳过update-before类型的数据");

// 启用缓存机制以支持断点续传功能
public static final ConfigOption<Boolean> SINK_USE_CACHE =
    ConfigOptions.key("sink.use-cache")
        .booleanType()
        .defaultValue(false)
        .withDescription("是否启用本地缓存来实现写入过程中的断点恢复能力");

public static final ConfigOption<Boolean> SINK_USE_CACHE =
    ConfigOptions.key("sink.use-cache")
        .booleanType()
        .defaultValue(false)
        .withDescription("Whether to use buffer cache for breakpoint resume");

// BE 节点 HTTP 地址配置项
public static final ConfigOption<String> BENODES =
    ConfigOptions.key("benodes")
        .stringType()
        .noDefaultValue()
        .withDescription("doris be http address.");

该配置类为 Flink CDC 与 Doris 系统的集成提供了全面的参数支持,覆盖了连接管理、流式和批量数据写入、容错机制以及 CDC 数据同步等多个关键功能模块。

表结构创建相关配置

定义了一系列用于控制表创建行为的常量前缀,便于在运行时动态解析和应用配置属性:

  • TABLE_CREATE_PROPERTIES_PREFIX:普通表创建时使用的属性前缀。
  • TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX:针对自动分区表的创建属性前缀。
  • TABLE_CREATE_PARTITION_KEY:指定分区字段的键名。
  • TABLE_CREATE_PARTITION_UNIT:定义分区单位大小。

// StreamLoad 配置项前缀 public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";

// 表创建配置属性前缀 public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties.";

// 自动分区表创建配置前缀 public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX = "table.create.auto-partition.properties."; [此处为图片1]

工具方法说明

getPropertiesByPrefix(Configuration tableOptions, String prefix) 此方法的作用是从传入的配置对象中提取出以指定字符串为前缀的所有属性,并将这些属性构造成一个新的键值对映射。该功能特别适用于分离 Doris 的 StreamLoad 参数或表初始化参数等场景。


public static Map<String, String> getPropertiesByPrefix(
    Configuration tableOptions, String prefix) {
    final Map<String, String> props = new HashMap<>();
    for (Map.Entry<String, String> entry : tableOptions.toMap().entrySet()) {
        if (entry.getKey().startsWith(prefix)) {
            // 移除前缀部分,仅保留实际属性名称
            String subKey = entry.getKey().substring(prefix.length());
            props.put(subKey, entry.getValue());
        }
    }
    return props;
}

使用示例

以下是在 Flink 作业中配置 Doris 写入参数的典型用法:


Configuration config = Configuration.create();
config.set(DorisDataSinkOptions.FENODES, "fe1:8030,fe2:8030");
config.set(DorisDataSinkOptions.USERNAME, "admin");
config.set(DorisDataSinkOptions.PASSWORD, "password");
config.set(DorisDataSinkOptions.SINK_BUFFER_FLUSH_MAX_ROWS, 100000);

// 提取带有 sink.properties. 前缀的 StreamLoad 相关参数
Map<String, String> streamLoadProps =
    DorisDataSinkOptions.getPropertiesByPrefix(config,
    DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX);

通过上述方式,可以灵活地将不同类型的配置进行归类处理,提升代码可读性与维护性。

这是一个用于定义 Flink Connector 配置项的典型代码片段,通过 Flink 提供的 ConfigOptions 工具类来声明一个名为 BENODES 的配置参数。

public static final:表示该配置项是公共、静态且不可变的常量。这意味着它可以在不实例化类的情况下被访问,并且其值在程序运行期间不会改变。

ConfigOption<String>:声明了 BENODES 是一个类型为字符串(String)的配置选项,确保类型安全。

ConfigOptions.key("benodes"):这是创建配置项的起点,指定该选项在配置文件或属性映射中的唯一键名。例如,用户可在配置中写入如下内容:
benodes=be_host1:8040,be_host2:8040

.stringType():明确限定此配置项的取值必须为字符串类型,增强了配置解析时的安全性与准确性。

.noDefaultValue():表明该配置没有默认值,属于必填项。如果用户未显式设置该参数,在初始化或校验阶段将可能触发异常或验证失败。

.withDescription("doris be http address."):为该配置项添加描述信息,说明其用途——“Doris BE 节点的 HTTP 服务地址”。其中,BE 指的是 Doris 架构中的后端节点,负责数据存储和查询执行任务。
[此处为图片1]

功能目的
该代码段的核心作用是定义一个必需的配置项 BENODES,使用户在使用 Flink-Doris Connector 时能够指定 Doris 集群中所有 Backend 节点的 HTTP 地址列表。这些地址通常以逗号分隔,如:"192.168.1.10:8040,192.168.1.11:8040"。Flink 程序将利用这些地址与 Doris 进行通信,完成数据写入等操作。 使用方法
用户在配置连接器时,需在 Flink 的配置文件(如 flink-conf.yaml)中,或在作业的 DDL 语句里显式设置该键值对,确保系统能正确识别并连接目标 Doris 集群。 简而言之,这段代码的作用就是:
定义一个强制性的配置入口,让用户明确告知程序:Doris 数据库的后端服务运行于哪些地址。 [此处为图片2] 主要应用场景
  • 实时数据同步:支持 CDC(Change Data Capture)数据的实时写入 Doris,实现低延迟的数据更新。
  • 批量数据导入:适用于大批量数据加载场景,通过批量模式提升写入性能。
  • 表结构管理:可根据源数据自动创建或维护 Doris 中的表结构,减少手动干预。
  • 容错处理机制:内置重试策略、本地缓存等功能,增强数据写入过程中的稳定性与可靠性。
设计特性分析
  • 模块化设计:配置项按功能进行归类划分,结构清晰,便于理解与维护。
  • 灵活性强:同时支持流式处理与批量写入两种模式,适应多种业务需求。
  • 高容错性:集成自动重试、错误恢复、缓冲机制,保障数据不丢失。
  • 良好扩展性:采用前缀机制,可无缝接入 Doris 原生支持的其他配置项,便于未来功能拓展。
综上所述,此类配置组件为 Flink CDC 与 Doris 数据库之间的高效集成提供了全面而可靠的支撑,是构建稳定数据同步链路的关键环节。
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:options Option DataS doris tions

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2025-12-5 18:25