楼主: 时光永痕
534 0

[数据挖掘新闻] 了解如何在Spark结构化流上编码和部署机器学习模型 [推广有奖]

  • 0关注
  • 14粉丝

svip3

学术权威

12%

(VIP/贵宾)五级

77%

威望
0
论坛币
26 个
通用积分
57.2086
学术水平
4 点
热心指数
4 点
信用等级
4 点
经验
34190 点
帖子
2733
精华
0
在线时间
321 小时
注册时间
2020-7-21
最后登录
2024-8-1

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
了解如何在Spark结构化流上编码和部署机器学习模型
在过去的几个月中,我一直在进行我的一个副项目,以开发基于流数据的机器学习应用程序。这是一次很棒的学习经历,带来了无数挑战和大量学习经验,其中一些我曾尝试在此分享。
这篇文章重点介绍如何在流数据上部署机器学习模型,并涵盖成功的生产应用程序的所有三个必要领域:基础架构,技术和监视。
用SPARK-ML和结构化流水线开发机器学习模型
成功应用程序的第一步是根据业务需求确定应将其编写在其中的技术堆栈。通常,当数据量很大时,请使用Spark。
使用Spark的主要好处是其在大数据处理中的可靠能力以及内置分布式机器学习库的可用性。使用Spark的最初挑战是RDD的使用,这是直觉的,并且与数据科学家习惯使用的Dataframe有很大的不同。但是,随着Spark2.0中数据集API的引入,编码机器学习算法变得比以前更加容易。
在我的经验中,我发现通过正确使用“ Pipeline”框架,使用机器学习模型变得非常容易。管道的作用是提供一个结构,其中包括处理和清理数据,训练模型,然后将其写为对象所需的所有步骤。
然后可以直接导入该对象以处理新数据,并获得结果,从而使开发人员无需重新编写代码,并维护新数据的处理步骤的精确副本,然后使用这些数据来构建训练数据模型。
在下面的代码段中,我尝试介绍了如何使用此API来构建,保存和使用模型进行预测。要构建和保存模型,可以遵循以下代码结构。
//创建一个sqlContext
var sqlContext = new SQLContext(sparkContext)
//从源中读取训练数据。在这种情况下,我正在从s3位置读取它
var data = sqlContext.read.format(“ csv”)。option(“ header”,“ true”)。option(“ inferSchema”,“ true”)。load(“ pathToFile”)
//选择所需的消息
data = data.select(“ field1”,“ field2”,“ field3”)
//对数据执行预处理
val process1 =…一些进程…
val process2 =…一些进程…
//定义一个评估器
val evaluator =…您选择的评估器...
//将数据拆分为训练数据并测试
val Array(trainingData,testData)= data.randomSplit(Array(ratio1,ratio2))
//定义要训练的算法。例如决策树
val dt = new DecisionTree().
setFeaturesCol(featureColumn).setLabelCol(labelColumn)
//定义线性管道。管道中指定的方法以线性顺序执行。步骤的顺序是绑定
valpipelineDT = new Pipeline()。setStages(Array(process1,process2,dt))
//定义用于执行管道和执行交叉验证的交叉验证器。
val cvLR = new CrossValidator().
setEstimator(pipelineDT)
.setEvaluator(evaluator)
.setNumFolds(3)//在实践中使用3+
//使模型适合训练数据
val cvModelLR = cvLR.fit(trainingData)
//从交叉验证器中提取训练有素的流水线模型。
val bestPipelineModel = cvModelLR.bestModel.asInstanceOf [PipelineModel]
//将模型保存在s3存储桶中
cvModelLR.write.overwrite()。save(mlModelPath)
保存模型后,可通过以下步骤轻松地将其用于流数据预测。
1.从Kafka主题读取数据
//创建一个
Spark会话对象val ss = SparkSession.builder.getOrCreate()
//定义要使用的主题的
架构val schema = StructType(Seq(
StructField(“ Field1”,StringType,true),
StructField(“ Field2”,IntType,true)

))
//从Kafka主题开始阅读
val records = ss.readStream
.format(“ kafka”).
option(“ kafka.bootstrap.servers”,kafkaServer)
.option(“ subscribe”,kafkaTopic

.load(). selectExpr( “将(作为字符串的值作为字符串)转换为json”).
select(from_json($“ json”,schema).as(“ data”)).
select(“ data。*”)
2.加载保存的ML模型并将其用于预测
//从保存的位置加载分类模型
val分类模型= CrossValidatorModel.read.load(mlModelPath)
//使用模型执行预测。默认情况下使用最佳模型
val结果= categoryModelModel.transform(records)
3.将结果保存到s3或其他位置
在CSV格式
//将结果保存为csv
results.writeStream.format(“ csv”)。outputMode(“ append”).
option(“ path”,destination_path).option(“ checkpointLocation”,checkpointPath)
.start()
在实木复合地板格式
//将结果保存为拼花结果到一个位置
。writeStream.format(“ parquet”)。outputMode(“ append”).
option(“ path”,destination_path).option(“ checkpointLocation”,checkpointPath)
.start()
或者,如果我们要将结果发送到某些数据库或任何其他扩展
val writer =新的JDBCSink(URL,用户,密码)
results.writeStream
.foreach(writer)
.outputMode(“ append”).
option(“ checkpointLocation”,checkpointPath)
.start()
为此,需要通过扩展Sp??ark结构化流提供的ForeachWriter接口来实现单独的编写器。下面显示了jdbc的示例代码,摘自https://docs.databricks.com/_static/notebooks/structured-streaming -...
导入java.sql。_
类JDBCSink(url:String,user:String,pwd:String)扩展了ForeachWriter [(String,String)] {
val驱动程序=“ com.mysql.jdbc.Driver”
var连接:Connection = _
var语句:Statement = _
def open(partitionId:Long,version:Long):布尔值= {
Class.forName(驱动程序)
connection = DriverManager.getConnection(URL,用户,pwd)
语句= connection.createStatement
true
}
def process(value:(String,String)):Unit = {
statement.executeUpdate(“ INSERT INTO zip_test” +
“ VALUES(” + value._1 +“,” + value._2 +“)”)
}}
def close(errorOrNull:可抛出):单位= {
connection.close
}
}
}
}
监视,记录和警报
下一步是将监视,警报和日志记录服务集成到应用程序中,以便获得即时警报并保持对应用程序工作方式的了解。AWS堆栈中有很多可用的工具。经常使用的其中几个是用于监视的CloudWatch和用于日志记录的弹性搜索。
样例监视仪表板将如下所示
图片提供:https://github.com/amazon-archives/cloudwatch-logs-subscription-con ...
基础设施
一旦代码准备好进行部署,就可以选择合适的基础结构进行部署了。我发现最好的基础架构是Kafka(主要是因为它具有多发布者/消费者架构,并且能够设置不同主题的保留期限),以及AWS EMR作为运行应用程序的核心基础架构
由于具有预先安装的spark和内部资源管理的群集的可用性,AWS EMR成为显而易见的选择。能够在短时间内全面部署新集群的能力也是一大优势。
简化的架构图将看起来像。
图片提供-https://dmhnzl5mp9mj6.cloudfront.net/bigdata_awsblog/images/Spark_S ...
调整火花作业
最后,与其他任何火花作业一样,在流作业的情况下也有必要进行调整,以实现最大效率。调整Spark作业的第一步是为该作业选择适当的实例。在对M4(通用)和C4(计算繁重)实例类型进行多次实验后,我发现M4的性能更好,主要是因为它也提供虚拟内核。
spark中的DynamicAllocation属性在以稳定方式最大化利用率方面也非常有用。我发现还有许多其他参数可用于调整性??能:
a)— conf spark.yarn.executor.memoryOverhead = 1024:为作业定义的内存开销量
b)— conf spark.yarn.maxAppAttempts = 4:此属性定义提交申请的最大尝试次数。对于将多个Spark作业提交到单个群集而有时由于缺少可用资源而导致提交作业失败的情况,这非常有用。
c)— conf spark.task.maxFailures = 8:此属性设置任务在spark作业自身失败之前可以失败的最大次数。默认值为2。始终将此数字保持较高是个好主意。
d)— conf spark.speculation = false:将此属性设置为true时,yarn会根据其消耗的时间自动终止并重新分配任务(如果yarn认为它们被卡住了)。在我们的案例中,我们没有发现这对性能有很大贡献,但是在处理倾斜的数据集时要寻找它是一个很好的属性
e)— conf spark.yarn.max.executor.failures = 15:应用程序失败之前执行程序失败的最大次数。始终将其设置为更大的数字。
f)— conf spark.yarn.executor.failuresValidityInterval = 1h:定义执行程序故障有效性的时间间隔。与上述属性相结合,基本上每小时最多有15位执行者在工作死亡之前会失败。
g)—驱动程序内存10g:提供足够高的驱动程序内存,以确保在要处理大量消息时不会失败。
题库
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark 机器学习 SPAR Park SPA

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-9-10 06:49