楼主: 7112_1566899082
319 0

[学习笔记] 【学习笔记】# 以下为数据中心的两个数据传感器检测到的两个机架的温度数据 / ... [推广有奖]

  • 0关注
  • 0粉丝

高中生

37%

还不是VIP/贵宾

-

威望
0
论坛币
0 个
通用积分
9.3699
学术水平
0 点
热心指数
0 点
信用等级
0 点
经验
253 点
帖子
21
精华
0
在线时间
10 小时
注册时间
2019-8-27
最后登录
2025-3-6

楼主
7112_1566899082 发表于 2019-11-26 18:22:36 来自手机 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
# 以下为数据中心的两个数据传感器检测到的两个机架的温度数据
/*
file1.json:
{\"rack\":\"rack1\",\"temperature\":99.5,\"ts\":\"2017-06-02T08:01:01\"}
{\"rack\":\"rack1\",\"temperature\":100.5,\"ts\":\"2017-06-02T08:06:02\"}
{\"rack\":\"rack1\",\"temperature\":101.0,\"ts\":\"2017-06-02T08:11:03\"}
{\"rack\":\"rack1\",\"temperature\":102.0,\"ts\":\"2017-06-02T08:16:04\"}

file2.json:
{\"rack\":\"rack2\",\"temperature\":99.5,\"ts\":\"2017-06-02T08:01:02\"}
{\"rack\":\"rack2\",\"temperature\":105.5,\"ts\":\"2017-06-02T08:06:04\"}
{\"rack\":\"rack2\",\"temperature\":104.0,\"ts\":\"2017-06-02T08:11:06\"}
{\"rack\":\"rack2\",\"temperature\":108.0,\"ts\":\"2017-06-02T08:16:08\"}
*/

# 所有计算机机架在一个滑动窗口上的平均温度
# 每5分钟,统计一下最近十分钟的平均温度
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 指定一个Schema(模式)
fields = [StructField(\"rack\", StringType(), False),
          StructField(\"temperature\", DoubleType(), False),
          StructField(\"ts\", TimestampType(), False)]
iotDataSchema = StructType(fields)

# 读取温度数据
dataPath = \"/data/spark_demo/streaming/iot-input\"
iotSSDF = spark.readStream.schema(iotDataSchema).json(dataPath)

# group by一个滑动窗口,并在temperature列上求平均值
iotAvgDF = iotSSDF.groupBy(window(col(\"ts\"), \"10 minutes\", \"5 minutes\")).agg(avg(\"temperature\").alias(\"avg_temp\"))

# 将数据写出到memory data sink,使用查询名称iot
iotMemorySQ = iotAvgDF.writeStream.format(\"memory\").queryName(\"iot\").outputMode(\"complete\").start()

# 显示数据,以start时间排序
spark.sql(\"select * from iot\").orderBy(col(\"window.start\")).show(truncate=False)

# 停止该流查询
iotMemorySQ.stop()

----------------------------------------------------------------------------------------------
重构:找出是哪些机架的温度在上升
# 每个机架在一个滑动窗口上的平均温度
# group by一个滑动窗口和rack列
iotAvgByRackDF = iotSSDF.groupBy(window(col(\"ts\"), \"10 minutes\", \"5 minutes\"), col(\"rack\"))
.agg(avg(\"temperature\").alias(\"avg_temp\"))

# 写出到memory data sink,使用iot_rack查询名称
iotByRackConsoleSQ = iotAvgByRackDF.writeStream.format(\"memory\").queryName(\"iot_rack\").outputMode(\"complete\").start()

spark.sql(\"select * from iot_rack\").orderBy(col(\"rack\"), col(\"window.start\")).show(truncate=False)
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:数据中心 学习笔记 传感器 习笔记 streaming

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
扫码
拉您进交流群
GMT+8, 2026-1-28 07:42