楼主: 时光永痕
419 0

[数据挖掘新闻] 在10分钟内实用Apache Spark。第5部分-流式传输 [推广有奖]

  • 0关注
  • 14粉丝

svip3

学术权威

12%

(VIP/贵宾)三级

57%

威望
0
论坛币
26 个
通用积分
49.7576
学术水平
4 点
热心指数
4 点
信用等级
4 点
经验
34070 点
帖子
2731
精华
0
在线时间
316 小时
注册时间
2020-7-21
最后登录
2024-4-28

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
在10分钟内实用Apache Spark。第5部分-流式传输
Spark是一个功能强大的工具,可用于解决许多有趣的问题。其中一些已经在我们之前的文章中进行了讨论。今天,我们将考虑另一个重要的应用程序,即流媒体。流数据是连续来自不同来源的小记录的数据。 流技术有许多用例,例如工业或科学设备中的传感器监视,服务器日志检查,金融市场监视等。
在这篇文章中,我们将通过传感器温度监控来检查这种情况。例如,我们的设备中有几个传感器(1
首先,使用以下命令将Netcat作为数据服务器运行:
数控-lk 8080
然后,我们向其输入传感器数据:
21/01/2001 1 1 2322/01/2001 2 0 2023/01/2001 3 1 15
24/01/2001 4 1 10
25/01/2001 5 0 5
接下来,我们将在Spark Streaming模块的帮助下,在端口8080上监听此服务器。请注意,我们可以在流式程序执行期间使用nc发送更多数据。
与之前的文章相比,我们不会在Spark shell中执行示例。>我们会将其另存为Spark主目录中的streaming.py文件。文件夹名称可能是spark-2.3.0-bin-hadoop2.7(取决于您使用的Spark版本)。
按值计数
让我们从一个简单的例子开始。我们要计算来自每个传感器的记录数。
首先,我们必须进行一些进口:
从pyspark导入SparkContext从pyspark.streaming导入StreamingContext
然后,我们从公共行开始程序:
如果__name__ ==“ __main__”:
每个流式传输程序的第一个重要步骤是初始化StreamingContext。在程序中添加以下行:
sc = SparkContext(appName =“ PythonStreamingCountByValue”)ssc = StreamingContext(sc,30)
在最后一行中,30是批处理持续时间。
现在,我们应该创建一个DStream将从连接到hostname:port的Netcat接收数据的主机,例如localhost:8080。
ds = ssc.socketTextStream('localhost',8080)
有必要说这DStream是的连续序列RDD。
下一步是将DStream行划分为元素,然后选择流数据的第二列(传感器编号)。这可以通过map(func)转换来实现。在map简单地应用func到流中的每个元素。
数据= ds.map(lambda行:int(line.split(“”)[1]))
然后,通过应用countByValue转换,我们将计算该批次中来自每个传感器的数据有多少次。
data_count = data.countByValue()
最后,让我们打印结果:
data_count.pprint()
通过上面的代码,我们定义了数据转换的算法。要开始执行,我们需要从StreamingContext以下几行开始:
ssc.start()
然后,我们必须设置何时停止Spark Streaming:
ssc.awaitTermination()
如果我们向Netcat输入以下输入,
01/02/2001 1 1 2301/02/2001 2 0 1201/02/2001 3 1 22
2001年2月
2日1 1 25 2001年2月2日2 0 15 2001年2月2日
3 0 10
我们将得到如下输出。
-------------------------------------------时间:2018-06-22 16:30:30 -------------------------------------------
(3,2)
(1,2)
(2,2)
从表中可以看到,每个传感器的数据来自两次。
完整的代码:
从pyspark导入SparkContext从pyspark.streaming导入StreamingContext
如果__name__ ==“ __main__”:
        sc = SparkContext(appName =“ PythonStreamingCountByValue”)
        ssc = StreamingContext(sc,30)
        ds = ssc.socketTextStream('localhost',8080)
        data = ds.map(lambda line:int(line .split(“”)[1]))
        data_count = data.countByValue()
        data_count.pprint()
        ssc.start()
        ssc.awaitTermination()
要运行流式程序,请执行:
./bin/spark-submit stream.py
筛选
让我们继续进行流传输中的另一个常见任务-过滤。假设我们只接受来自在线传感器的消息。该filter转型将帮助我们与此有关。它仅返回先前定义的函数返回true的记录。
现在您可以更改程序或将其另存为另一个文件。无论您选择什么,都应使程序的框架保持相同,除了以下几行:
data = ds.map(lambda行:int(line.split(“”)[1]))data_count = data.countByValue()data_count.pprint()
这些行是程序中的主要行(在此处进行转换并打印结果),因此在下面的示例中,我们将仅更改这些行。
用以下替换它们。(也许您也想更新appName)
数据= ds.map(lambda行:line.split(“”))\。map(lambda l:(l [0],int(l [1]),int(l [2]),int(l [ 3]]))data_filter = data.filter(lambda line:line [2] == 1)
data_filter.pprint()
在下表中,有此任务的输入和输出
01/02/2001 1 1 2302/02/2001 2 0 1003/02/2001 3 1 25
02/02/2001 2 0 10
01/02/2001 1 1 22
04/02/2001 4 1 24
02/02/2001 1 1 19
05/02/2001 5 0 13
06/02/2001 6 1 26
-------------------------------------------时间:2018-07-05 09:08:00 -------------------------------------------
('01 / 02/2001',
1,1,23 )('03 / 02/2001',
3,1,25 )('01 / 02/2001',
1,1,22 )('04 / 02 / 2001',
4,1,24)
('02 / 02/2001',1,1,19 )('06 / 02/2001',6,1,26)
完整的代码:
从pyspark导入SparkContext从pyspark.streaming导入StreamingContext
如果__name__ ==“ __main__”:
        sc = SparkContext(appName =“ PythonStreamingFiltration”)
        ssc = StreamingContext(sc,30)
        ds = ssc.socketTextStream('localhost',8080)
        data = ds.map(lambda line:(line。 split(“”)))\
        .map(lambda l:(l [0],int(l [1]),int(l [2]),int(l [3])))
        data_filter = data.filter (lambda行:line [2] == 1)
        data_filter.pprint()
        ssc.start()
        ssc.awaitTermination()
传感器的最高温度
现在,我们将使我们的程序更加有趣。假设我们需要知道什么是传感器的最高温度。要获得此信息,我们应该使用reduce带有maxfunction的转换 。该max函数为我们提供了最大值,并且reduce转换返回了该值,该值是通过将元素RDD与定义的函数进行汇总而得出的(max在我们的示例中)。
再次,使程序框架保持不变。用以下内容替换发生转换并打印结果的行:
温度= ds.map(lambda行:int(line.split(“”)[3]))结果=温度.reduce(max)result.pprint()
如果输入相同,您将得到结果:
-------------------------------------------时间:2018-06-14 12:42:00 -------------------------------------------
25
完整的代码:
从pyspark导入SparkContext从pyspark.streaming导入StreamingContext
如果__name__ ==“ __main__”:
        sc = SparkContext(appName =“ PythonStreamingNetworkMaxTemperature”)
        ssc = StreamingContext(sc,30)
        ds = ssc.socketTextStream('localhost',8080)   
        temperature = ds.map(lambda line:int(line .split(“”)[3]))
        结果=温度
        。reduce(最大)result.pprint()   
        ssc.start()
        ssc.awaitTermination()
结论
流数据被广泛用于各种实际应用中。最受欢迎的流应用程序之一是以实时模式从不同的传感器接收数据。有许多用于流数据分析的工具(Flink,Storm,Kafka,Spark,Samza,Kinesis等),其中Apache Spark由于其便利性和简单性而成为最受欢迎的工具之一。Apache Spark有一个特殊的流媒体库。在本文中,我们简要概述了Spark Streaming功能,并将一些基本功能应用于我们的数据。我们已经证明,只有几行代码,才可以从数据流中获取有用的信息,进行分析和打印。
题库
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Apache Spark apache Spark SPAR Park

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-5-1 19:21