108 0

[作业] 实战Flink+Doris实时数仓教程 [推广有奖]

  • 0关注
  • 1粉丝

硕士生

21%

还不是VIP/贵宾

-

威望
0
论坛币
3 个
通用积分
20.6178
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
1306 点
帖子
69
精华
0
在线时间
60 小时
注册时间
2021-5-4
最后登录
2024-4-26

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
参考资料:pan---baidu---com/s/1KoaL_soYaB4JMFAI0aDyXg 提取码: pr5c



一、什么是实时数仓
实时数据仓库(Real-time Data Warehouse)是指能够实时地处理和分析数据,使得数据仓库中的数据是最新的、最准确的,并且可以实时响应用户的查询和分析需求的一种数据仓库系统。

与传统的数据仓库相比,实时数据仓库更加注重数据的实时性和对业务的实时响应能力。传统数据仓库通常是每日、每周或每月定期进行数据的抽取、转换和加载(ETL),更新的速度较慢,一般不支持实时查询和分析。而实时数据仓库则更加注重数据的实时性和对业务的实时响应能力,能够在数据发生变化时及时响应用户的查询和分析需求。

二、安装Flink
步骤 1:下载 #
为了运行Flink,只需提前安装好 Java 11。你可以通过以下命令来检查 Java 是否已经安装正确。

java -version
下载 release 1.20-SNAPSHOT 并解压。

$ tar -xzf flink-1.20-SNAPSHOT-bin-scala_2.12.tgz
$ cd flink-1.20-SNAPSHOT-bin-scala_2.12
步骤 2:启动集群 #
Flink 附带了一个 bash 脚本,可以用于启动本地集群。

$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.

步骤 3:提交作业(Job) #
Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上。

$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
  (nymph,1)
  (in,3)
  (thy,1)
  (orisons,1)
  (be,4)
  (all,2)
  (my,1)
  (sins,1)
  (remember,1)
  (d,4)
另外,你可以通过 Flink 的 Web UI 来监视集群的状态和正在运行的作业。

三、安装Doris
下载安装包
低版本(V1.0之前的版本):安装 Doris,需要先通过源码编译,主要有两种方式:使用 Docker 开发镜像编译(推荐)、直接编译。
高版本:直接下载官网的tar包即可,无需再手动编译;本文安装的是doris-1.1.5版本
前置准备
创建目录作为doris的安装目录
[whybigdata@node01 ~]# mkdir /opt/module/doris-1.1.5
修改可打开文件数(每个节点都要修改)
[whybigdata@node01 ~]# sudo vim /etc/security/limits.conf
*        soft nofile 65535
*        hard nofile 65535
*        soft nproc 65535
*        hard nproc 65535
安装部署FE
创建 fe 元数据存储的目录
[whybigdata@node01 ~]# mkdir /opt/module/doris-1.1.5/doris-meta
注意:此处如果没有提前创建该目录(配置文件中指定元数据路径),则启动fe后执行 jps 或者 ps –ef | grep doris 或者 ps –ef | grep fe 都找不到相应的进程。

四、实战Flink+Doris实时数仓
建表
因业务数据经常伴随有 UPDATE,DELETE 等操作,为了保持实时数仓的数据粒度与业务库一致,所以选择 Doris Unique 模型(数据模型在下文有重点介绍)具体建表语句如下:
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
可以看到,表结构中有两个字段分别是 data_deal_datetime,data_status。

data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据
data_status 用来兼容业务库对数据的删除操作
数据导入任务
Doris 提供了主动拉取 Kafka 数据的功能,配置如下:
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
            \"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);


五、未来规划
实时 Schema Change
目前通过 Flink CDC 实时接入数据时,当上游业务表进行 Schema Change 操作时,必须先手动修改 Doris 中的 Schema 和 Flink 任务中的 Schema,最后再重启任务,新的 Schema 的数据才可以同步过来。这样使用方式需要人为的介入,会给用户带来极大的运维负担。后续会针对 CDC 场景做到支持 Schema 实时变更,上游的 Schema Change 实时同步到下游,全面提升 Schema Change 的效率。

Doris 多表写入
目前 Doris Sink 算子仅支持同步单张表,所以对于整库同步的操作,需要手动在 Flink 层面进行分流,写到多个 Doris Sink 中,这无疑增加了开发者的难度,在后续版本中我们也将支持单个 Doris Sink 同步多张表,这样就大大的简化了用户的操作。


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:doris link RIS Lin Replication

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-28 14:41