楼主: ReneeBK
1318 0

Kafka七步实现实时传输 [推广有奖]

  • 1关注
  • 62粉丝

VIP

已卖:4897份资源

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49635 个
通用积分
55.6937
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57805 点
帖子
4005
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

楼主
ReneeBK 发表于 2016-9-18 22:07:28 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
摘要: 本文是关于Flume成功应用Kafka的研究案例,深入剖析它是如何将RDBMS实时数据流导入到HDFS的Hive表中。对于那些想要把数据快速摄取到Hadoop中的企业来讲,Kafka是一个很好的选择。Kafka是什么?Kafka是一个分布式、可 ...



本文是关于Flume成功应用Kafka的研究案例,深入剖析它是如何将RDBMS实时数据流导入到HDFS的Hive表中。

对于那些想要把数据快速摄取到Hadoop中的企业来讲,Kafka是一个很好的选择。Kafka是什么?Kafka是一个分布式、可伸缩、可信赖的消息传递系统,利用发布-订阅模型来集成应用程序/数据流。同时,Kafka还是Hadoop技术堆栈中的关键组件,能够很好地支持实时数据分析或者货币化的物联网数据。

本文服务于技术人群。下面就图解Kafka是如何把数据流从RDBMS(关系数据库管理系统)导入Hive,同时借助一个实时分析用例加以说明。作为参考,本文中使用的组件版本分别为Hive 1.2.1,Flume 1.6 以及 Kafka 0.9。

Kafka所在位置:解决方案的整体结构
下图显示了解决方案的整体结构:Kafka和Flume的结合,再加上Hive的交易功能,RDBMS的交易数据被成功传递到目标Hive表中。

七步实现Hadoop实时数据导入

现在让我们深入方案细节,并展示如何在几个步骤内将数据流导入Hadoop。

1、从RDBMS中提取数据
所有关系型数据库都有一个日志文件,用来记录最新的交易。解决方案的第一步就是获取这些交易数据,同时要确保这些数据格式是可以被Hadoop所接受的。

2、设置Kafka生产商
发布Kafka话题消息的过程称为“生产商”。“话题”里有各种Kafka所需要维护的信息类别,RDBMS数据也会被转换成Kafka话题。对于这个示例,要求设置一个服务于整个销售团队的数据库,且该数据库中的交易数据均以Kafka话题形式发布。以下步骤都需要设置Kafka 生产商:
  1. $ cd /usr/hdp/2.4.0.0-169/kafka
  2. $ bin/kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions
  3. Created topic "SalesDBTransactions".
  4. $ bin/kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181
  5. SalesDBTransactions
复制代码


3、设置Hive
接下来将创建一个Hive表,准备接收销售团队的数据库交易数据。这个例子中,我们将创建一个用户数据表:
  1. [bedrock@sandbox ~]$ beeline -u jdbc:hive2:// -n hive -p hive
  2. 0: jdbc:hive2://> use raj;
  3. create table customers (id string, name string, email string, street_address string, company string)
  4. partitioned by (time string)
  5. clustered by (id) into 5 buckets stored as orc
  6. location '/user/bedrock/salescust'
  7. TBLPROPERTIES ('transactional'='true');
复制代码


为了确保Hive能够有效处理交易数据,以下设置要求在Hive配置中进行:
  1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
复制代码


4、为Kafka到Hive的数据流设置Flume代理
现在来看下如何创建一个Flume代理,用于收集Kafka话题资料并向Hive表发送数据.在启用Flume代理前,要通过这几个步骤设置运行环境:
  1. $ pwd
  2. /home/bedrock/streamingdemo
  3. $ mkdir flume/checkpoint
  4. $ mkdir flume/data
  5. $ chmod 777 -R flume
  6. $ export HIVE_HOME=/usr/hdp/current/hive-server2
  7. $ export HCAT_HOME=/usr/hdp/current/hive-webhcat

  8. $ pwd
  9. /home/bedrock/streamingdemo/flume
  10. $ mkdir logs
复制代码

再如下所示创建一个log4j属性文件:
  1. [bedrock@sandbox conf]$ vi log4j.properties
  2. flume.root.logger=INFO,LOGFILE
  3. flume.log.dir=/home/bedrock/streamingdemo/flume/logs
  4. flume.log.file=flume.log
复制代码

然后为Flume代理配置以下文件:
  1. $ vi flumetohive.conf
  2. flumeagent1.sources = source_from_kafka
  3. flumeagent1.channels = mem_channel
  4. flumeagent1.sinks = hive_sink
  5. # Define / Configure source
  6. flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
  7. flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181
  8. flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions
  9. flumeagent1.sources.source_from_kafka.groupID = flume
  10. flumeagent1.sources.source_from_kafka.channels = mem_channel
  11. flumeagent1.sources.source_from_kafka.interceptors = i1
  12. flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp
  13. flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000

  14. # Hive Sink
  15. flumeagent1.sinks.hive_sink.type = hive
  16. flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083
  17. flumeagent1.sinks.hive_sink.hive.database = raj
  18. flumeagent1.sinks.hive_sink.hive.table = customers
  19. flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2
  20. flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M
  21. flumeagent1.sinks.hive_sink.batchSize = 10
  22. flumeagent1.sinks.hive_sink.serializer = DELIMITED
  23. flumeagent1.sinks.hive_sink.serializer.delimiter = ,
  24. flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company
  25. # Use a channel which buffers events in memory
  26. flumeagent1.channels.mem_channel.type = memory
  27. flumeagent1.channels.mem_channel.capacity = 10000
  28. flumeagent1.channels.mem_channel.transactionCapacity = 100
  29. # Bind the source and sink to the channel
  30. flumeagent1.sources.source_from_kafka.channels = mem_channel
  31. flumeagent1.sinks.hive_sink.channel = mem_channel
复制代码

5、启用Flume代理
通过以下指令启用Flume代理:
  1. $ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf
复制代码




6、启用Kafka流
作为示例下面是一个模拟交易的消息集,这在实际系统中需要通过源数据库才能生成。例如,以下可能来自Oracle流,在回放被提交到数据库的SQL交易数据,也可能来自GoldenGate。
  1. $ cd /usr/hdp/2.4.0.0-169/kafka
  2. $ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions
  3. 1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company"
  4. 2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated"
  5. 3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP"
  6. 4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc."
  7. 5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd"
  8. 6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC"
  9. 7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute"
  10. 8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation"
  11. 9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated"
  12. 10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates"
  13. 11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"
复制代码



7、接收Hive数据
如果上面所有的步骤都完成了,那么现在就可以从Kafka发送数据,可以看到数据流在几秒钟内就会被发送到Hive表。


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Transactions Transaction Incorporate Replication CORPORATION 关系数据库 解决方案 应用程序 管理系统 数据流

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-26 22:43