参考资料:pan---baidu---com/s/1KoaL_soYaB4JMFAI0aDyXg 提取码: pr5c
一、什么是实时数仓
实时数据仓库(Real-time Data Warehouse)是指能够实时地处理和分析数据,使得数据仓库中的数据是最新的、最准确的,并且可以实时响应用户的查询和分析需求的一种数据仓库系统。
与传统的数据仓库相比,实时数据仓库更加注重数据的实时性和对业务的实时响应能力。传统数据仓库通常是每日、每周或每月定期进行数据的抽取、转换和加载(ETL),更新的速度较慢,一般不支持实时查询和分析。而实时数据仓库则更加注重数据的实时性和对业务的实时响应能力,能够在数据发生变化时及时响应用户的查询和分析需求。
二、安装Flink
步骤 1:下载 #
为了运行Flink,只需提前安装好 Java 11。你可以通过以下命令来检查 Java 是否已经安装正确。
java -version
下载 release 1.20-SNAPSHOT 并解压。
$ tar -xzf flink-1.20-SNAPSHOT-bin-scala_2.12.tgz
$ cd flink-1.20-SNAPSHOT-bin-scala_2.12
步骤 2:启动集群 #
Flink 附带了一个 bash 脚本,可以用于启动本地集群。
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
步骤 3:提交作业(Job) #
Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上。
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
另外,你可以通过 Flink 的 Web UI 来监视集群的状态和正在运行的作业。
三、安装Doris
下载安装包
低版本(V1.0之前的版本):安装 Doris,需要先通过源码编译,主要有两种方式:使用 Docker 开发镜像编译(推荐)、直接编译。
高版本:直接下载官网的tar包即可,无需再手动编译;本文安装的是doris-1.1.5版本
前置准备
创建目录作为doris的安装目录
[whybigdata@node01 ~]# mkdir /opt/module/doris-1.1.5
修改可打开文件数(每个节点都要修改)
[whybigdata@node01 ~]# sudo vim /etc/security/limits.conf
* soft nofile 65535
* hard nofile 65535
* soft nproc 65535
* hard nproc 65535
安装部署FE
创建 fe 元数据存储的目录
[whybigdata@node01 ~]# mkdir /opt/module/doris-1.1.5/doris-meta
注意:此处如果没有提前创建该目录(配置文件中指定元数据路径),则启动fe后执行 jps 或者 ps –ef | grep doris 或者 ps –ef | grep fe 都找不到相应的进程。
四、实战Flink+Doris实时数仓
建表
因业务数据经常伴随有 UPDATE,DELETE 等操作,为了保持实时数仓的数据粒度与业务库一致,所以选择 Doris Unique 模型(数据模型在下文有重点介绍)具体建表语句如下:
CREATE TABLE IF NOT EXISTS table_1
(
key1 varchar(32),
key2 varchar(32),
key3 varchar(32),
value1 int,
value2 varchar(128),
value3 Decimal(20, 6),
data_deal_datetime DateTime COMMENT '数据处理时间',
data_status INT COMMENT '数据是否删除,1表示正常,-1表示数据已经删除'
)
ENGINE=OLAP
UNIQUE KEY(`key1`,`key2`,`key3`)
COMMENT "xxx"
DISTRIBUTED BY HASH(`key2`) BUCKETS 32
PROPERTIES (
"storage_type"="column",
"replication_num" = "3",
"function_column.sequence_type" = 'DateTime'
);
可以看到,表结构中有两个字段分别是 data_deal_datetime,data_status。
data_deal_datetime 主要是相同 key 情况下数据覆盖的判断依据
data_status 用来兼容业务库对数据的删除操作
数据导入任务
Doris 提供了主动拉取 Kafka 数据的功能,配置如下:
CREATE ROUTINE LOAD database.table1 ON table1
COLUMNS(key1,key2,key3,value1,value2,value3,data_deal_datetime,data_status),
ORDER BY data_deal_datetime
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "10",
"max_batch_rows" = "500000",
"max_batch_size" = "209715200",
"format" = "json",
"json_root" = "$.data",
"jsonpaths"="[\"$.key1\",\"$.key2\",\"$.key3\",\"$.value1\",\"$.value2\",
\"$.value3\",\"$.data_deal_datetime\",\"$.data_status\"]"
)FROM KAFKA
(
"kafka_broker_list"="broker1_ip:port1,broker2_ip:port2,broker3_ip:port3",
"kafka_topic"="topic_name",
"property.group.id"="group_id",
"property.kafka_default_offsets"="OFFSET_BEGINNING"
);
五、未来规划
实时 Schema Change
目前通过 Flink CDC 实时接入数据时,当上游业务表进行 Schema Change 操作时,必须先手动修改 Doris 中的 Schema 和 Flink 任务中的 Schema,最后再重启任务,新的 Schema 的数据才可以同步过来。这样使用方式需要人为的介入,会给用户带来极大的运维负担。后续会针对 CDC 场景做到支持 Schema 实时变更,上游的 Schema Change 实时同步到下游,全面提升 Schema Change 的效率。
Doris 多表写入
目前 Doris Sink 算子仅支持同步单张表,所以对于整库同步的操作,需要手动在 Flink 层面进行分流,写到多个 Doris Sink 中,这无疑增加了开发者的难度,在后续版本中我们也将支持单个 Doris Sink 同步多张表,这样就大大的简化了用户的操作。