楼主: 时光永痕
429 0

[数据挖掘新闻] 针对数据工程初学者的8个必备知识Spark优化技巧 [推广有奖]

  • 0关注
  • 14粉丝

svip3

学术权威

12%

(VIP/贵宾)三级

66%

威望
0
论坛币
26 个
通用积分
49.7576
学术水平
4 点
热心指数
4 点
信用等级
4 点
经验
34070 点
帖子
2731
精华
0
在线时间
316 小时
注册时间
2020-7-21
最后登录
2024-4-28

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
针对数据工程初学者的8个必备知识Spark优化技巧
使用大数据时遇到的最大障碍不是完成任务,而是在最短的时间内用最少的资源完成任务。这就是Apache Spark带来的惊人灵活性,可以优化您的代码,从而使您物有所值!
火花优化
在本文中,我们将讨论每个数据工程初学者都应了解的8个Spark优化技巧。其中大多数都是简单的技术,您需要将它们与可能在不知不觉中使用的低效代码互换。虽然其他一些小的调整,但您需要对当前的代码进行调整才能成为Spark超级巨星。因此,让我们开始吧!
火花优化模因
如果您是个初学者,不知道什么是Spark及其基本组成部分,建议您先阅读以下文章:
PySpark初学者
火花转换和动作
目录
不收集数据
坚持是关键
避免使用Groupbykey
与累加器合计
广播大变量
分区精明
重新分区您的数据
不要对数据进行分区-合并数据
1.不收集数据
作为数据工程师的初学者,即使我们着手处理大数据,我们也从小数据入手,习惯了一些命令并坚持使用。这样的命令之一就是Spark中的collect()操作。
当我们调用collect操作时,结果将返回到驱动程序节点。起初,这似乎无害。但是,如果您要处理大量数据,则驱动程序节点可能很容易耗尽内存。
#加载数据
df = spark.read.csv(“ / FileStore / tables / train.csv”,标题为True)
# 搜集
df.collect()
Spark优化-不收集数据
那么,我们应该使用什么呢?
不收集数据模因
逃脱的一种好方法是使用take()操作。它扫描找到的第一个分区并返回结果。就如此容易!例如,如果您只是想感受一下数据,那么请使用take(1)行数据。
df.take(1)
火花优化-df.take
这比使用collect更有效!
2.坚持是关键
当您开始使用Spark时,您要了解的第一件事是Spark是一个懒惰的评估者,这是一件好事。但是,如果您不能顺利驾驭水域,那也可能是衰退的开始。我什么意思
好吧,假设您已经编写了一些要在RDD上执行的转换。现在,每次在RDD上调用操作时,Spark都会重新计算RDD及其所有依赖项。结果可能很昂贵。
l1 = [1、2、3、4]
rdd1 = sc.parallelize(l1)
rdd2 = rdd1.map(lambda x:x * x)
rdd3 = rdd2.map(lambda x:x + 2)
打印(rdd3.count())
打印(rdd3.collect())
当我调用count()时,将执行所有转换,并且需要0.1秒才能完成任务。
火花优化-计数
当我调用collect()时,再次调用了所有转换,并且仍然需要0.1 s的时间来完成任务。
火花优化-持久性
那么,我们如何摆脱这种恶性循环?坚持!
持久性模因
从pyspark import StorageLevel
#默认情况下缓存到内存和磁盘
rdd3.persist(StorageLevel.MEMORY_AND_DISK)
#在rdd保留之前
打印(rdd3.count())
#rdd保留后
打印(rdd3.collect())
在我们之前的代码中,我们要做的只是保留在最终的RDD中。这样,当我们第一次在RDD上调用动作时,生成的最终数据将存储在集群中。现在,由于我们已经存储了先前的结果,因此对同一RDD进行任何后续动作的使用都会更快。
RDD持续
注–在这里,我们已将数据保留在内存和磁盘中。但是还有其他选择可以保留数据。
坚持资料来源:Learning Spark
3.避免使用Groupbykey
当您开始数据工程之旅时,您肯定会碰到单词计数示例。
#例句列表
text_list = [“这是一个示例句子”,“这是另一个示例句子”,“用于示例测试的示例”]
#创建一个RDD
rdd = sc.parallelize(文本列表)
#使用flatMap将句子分割成单词
rdd_word = rdd.flatMap(lambda x:x.split(“”))
#创建一个成对的rdd
rdd_pair = rdd_word.map(lambda x:(x,1))
#使用groupbykey()计算每个单词的出现次数
rdd_group = rdd_pair.groupByKey()
rdd_group_count = rdd_group.map(lambda x:(x [0],len(x [1])))
rdd_group_count.collect()
但是为什么要把它带到这里呢?好吧,这是强调在使用pair-rdds时groupbykey()转换效率低下的最佳方法。
火花优化-groupkey()
Groupbykey在网络上随机组合键值对,然后将它们组合在一起。有了更大的数据,改组将更加夸张。那么,我们该如何处理呢?Reducebykey!
另一方面,Reducebykey首先将键合并在同一分区中,然后才对数据进行随机排序。
火花优化-Resucebykey()
这是使用reducebykey()计算单词数的方法
#使用reducebykey()计算每个单词的出现次数
rdd_reduce = rdd_pair.reduceByKey(lambda x,y:x + y)
rdd_reduce.collect()
这导致整个网络上的数据混排量大大降低。
火花优化-groupbykey()
如您所见,在reducebykey的情况下,要重排的数据量比在groupbykey的情况下要低得多。
reduceby和groupby meme
4.与累加器聚合
假设您想汇总一些价值。这可以通过使用简单的编程(使用计数器变量)来完成。
文件= sc.textFile(“ / FileStore / tables / log.txt”)
#可变计数器
warningCount = 0
def extractWarning(行):
    全局警告计数
    如果(“警告”行):
        warningCount + = 1
行数= file.flatMap(lambda x:x.split(“,”))
lines.foreach(extractWarning)
#输出变量
warningCount
但是这里有一个警告。
当我们尝试在驱动程序节点上查看结果时,我们得到一个0值。这是因为在工作节点上实现代码时,变量在该节点上变为本地。这意味着更新后的值不会发送回驱动程序节点。为了克服这个问题,我们使用了累加器。
累加模因
累加器具有Spark提供的共享变量。它们用于关联和交换任务。例如,如果您要计算文本文件中的空白行数或确定损坏的数据量,则累加器可能会非常有用。
文件= sc.textFile(“ / FileStore / tables / log.txt”)
#累加器
warningCount = sc.accumulator(0)
def extractWarning(行):
    全局警告计数
    如果(“警告”行):
        warningCount + = 1
行数= file.flatMap(lambda x:x.split(“,”))
lines.foreach(extractWarning)
#累加器值
warningCount.value#输出4
使用累加器时要记住的一件事是,工作节点只能写入累加器。但是只有驱动程序节点可以读取该值。
5.广播大变量
就像累加器一样,Spark具有另一个共享变量,称为Broadcast变量。它们仅用于读取目的,该目的已缓存在群集中的所有工作程序节点中。当您必须向所有节点发送大型查询表时,这非常方便。
假设一个文件包含的数据包含其他国家/地区的速记代码(如印度的IND)以及其他类型的信息。您必须将这些代码转换为国家名称。这是广播变量派上用场的地方,我们可以使用它在工作节点中缓存查找表。
# 抬头
国家= {“ IND”:“印度”,“ USA”:“美利坚合众国”,“ SA”:“南非”}
#广播
广播者= sc.broadcast(国家/地区)
#个数据
userData = [(“ Johnny”,“ USA”),(“ Faf”,“ SA”),(“ Sachin”,“ IND”)]
#创建rdd
rdd_data = sc.parallelize(userData)
#使用广播变量
def convert(代码):
    返回broadcaster.value [code]
# 转型
输出= rdd_data.map(lambda x:(x [0],convert(x [1])))
#动作
output.collect()
广播大变量
广播大变量模因
6.精通分区
Spark的基石之一是其以并行方式处理数据的能力。Spark将数据分为几个分区,每个分区包含完整数据的某些子集。例如,如果一个数据帧包含10
群集中的分区数取决于群集中的核心数,并由驱动程序节点控制。当Spark运行任务时,它将在集群中的单个分区上运行。您可以按如下所示签出为数据框创建的分区数:
df.rdd.getNumPartitions()
但是,此数字是可调整的,应该进行调整以获得更好的优化。
选择的分区太少,您有许多资源处于空闲状态。
选择太多的分区,就会有大量的小分区频繁地对数据进行混洗,这会导致效率低下。
那么正确的号码是多少?
模因
根据Spark的说法,128 MB是您应打包到单个分区中的最大字节数。因此,如果我们有128000 MB的数据,则应该有1000个分区。但是这个数字并不严格,正如我们将在下一个技巧中看到的那样。
7.重新分区您的数据
整个Spark应用程序中的分区数将需要更改。如果从100个分区开始,则可能必须将其降低到50个。但是为什么我们必须这样做呢?
例如,您读取一个数据框并创建100个分区。接下来,过滤数据框以仅存储某些行。现在,存储在分区中的数据量已经有所减少。因此,应谨慎选择减少分区的数量,以便充分利用资源。
但是如何调整分区数呢?
的重新分配()变换可以被用来增加或减少在集群中的分区的数目。
将numpy导入为np
#个数据
l1 = np.arange(13)
#rdd
rdd = sc.parallelize(l1)
#检查每个分区的数据
打印(rdd.glom()。collect())
#增加分区
打印(rdd.repartition(10).glom()。collect())
重新分区
模因
当repartition()将数据调整为定义的分区数时,它必须在网络中重新整理整个数据。尽管在增加分区时不可避免地要进行过多的改组,但是在减少分区数时有更好的方法。
8.不要重新分区您的数据-合并它
在上一个技巧中,我们讨论了通过重新分区来减少分区数量并不是实现此目的的最佳方法。为什么?
模因合并
重分区会重排数据以计算分区数。但是,我们不想这样做。为了避免这种情况,我们使用coalesce()。当减少分区数时,它减少了需要执行的分区数。
打印(rdd.glom()。collect())
#重新分区
打印(rdd.repartition(4).glom()。collect())
#合并
打印(rdd.coalesce(4).glom()。collect())
合并
重新分区算法会进行完整的数据混洗,并在分区之间平均分配数据。它不会像合并算法那样尝试最小化数据移动。
题库
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark SPAR 数据工程 必备知识 Park

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-5-4 22:37