请选择 进入手机版 | 继续访问电脑版
楼主: Nicolle
2683 14

[GITHUB]Scala Data Analysis Cookbook [推广有奖]

巨擘

0%

还不是VIP/贵宾

-

TA的文库  其他...

Python(Must-Read Books)

SAS Programming

Must-Read Books

威望
16
论坛币
12402323 个
通用积分
1620.7415
学术水平
3305 点
热心指数
3329 点
信用等级
3095 点
经验
477211 点
帖子
23879
精华
91
在线时间
9878 小时
注册时间
2005-4-23
最后登录
2022-3-6

Nicolle 学生认证  发表于 2016-10-11 07:39:37 |显示全部楼层 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

本帖被以下文库推荐

Nicolle 学生认证  发表于 2016-10-11 07:41:58 |显示全部楼层 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

Nicolle 学生认证  发表于 2016-10-11 07:43:05 |显示全部楼层 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

Nicolle 学生认证  发表于 2016-10-11 07:43:34 |显示全部楼层 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

Nicolle 学生认证  发表于 2016-10-11 07:44:36 |显示全部楼层 |坛友微信交流群
提示: 作者被禁止或删除 内容自动屏蔽

使用道具

smartlife 在职认证  发表于 2016-10-11 07:51:50 |显示全部楼层 |坛友微信交流群
  1. package com.packt.scalada.learning

  2. import java.util.Properties
  3. import scala.collection.JavaConverters.asScalaBufferConverter
  4. import scala.io.Source
  5. import org.apache.spark.SparkConf
  6. import org.apache.spark.SparkContext
  7. import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
  8. import org.apache.spark.mllib.feature.HashingTF
  9. import org.apache.spark.mllib.regression.LabeledPoint
  10. import org.apache.spark.rdd.RDD
  11. import org.apache.spark.sql.SQLContext
  12. import edu.stanford.nlp.ling.CoreAnnotations.LemmaAnnotation
  13. import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation
  14. import edu.stanford.nlp.ling.CoreAnnotations.TokensAnnotation
  15. import edu.stanford.nlp.pipeline.Annotation
  16. import edu.stanford.nlp.pipeline.StanfordCoreNLP
  17. import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
  18. import org.apache.spark.mllib.classification.SVMWithSGD
  19. import org.apache.spark.mllib.optimization.Updater
  20. import org.apache.spark.mllib.optimization.L1Updater
  21. import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
  22. import org.apache.spark.mllib.evaluation.MulticlassMetrics
  23. import org.apache.spark.mllib.regression.GeneralizedLinearModel
  24. import org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm
  25. import org.apache.spark.mllib.optimization.SquaredL2Updater
  26. import epic.preprocess.TreebankTokenizer
  27. import epic.preprocess.MLSentenceSegmenter
  28. import org.apache.spark.mllib.feature.IDF
  29. import org.apache.spark.mllib.linalg.Vector
  30. import org.apache.spark.mllib.feature.Normalizer
  31. import org.apache.spark.mllib.feature.IDFModel
  32. import org.apache.spark.ml.feature.Tokenizer
  33. import org.apache.spark.mllib.linalg.distributed.RowMatrix
  34. import org.apache.spark.mllib.linalg.Vectors
  35. import org.apache.spark.mllib.clustering.KMeans
  36. import org.apache.spark.mllib.clustering.KMeansModel
  37. import org.apache.spark.mllib.feature.StandardScaler
  38. import org.apache.spark.mllib.linalg.Matrix
  39. import org.apache.spark.mllib.stat.Statistics
  40. import org.apache.spark.mllib.linalg.Matrices

  41. object PCAIris extends App {

  42.   val conf = new SparkConf().setAppName("PCAIris").setMaster("local[2]")
  43.   val sc = new SparkContext(conf)
  44.   val sqlContext = new SQLContext(sc)

  45.   val data = sc.textFile("iris.data").map(line => {
  46.     val dataArray = line.split(",").take(4)
  47.     Vectors.dense(dataArray.map(_.toDouble))
  48.   })

  49.   //Scale data
  50.   val scaler = new StandardScaler(withMean = true, withStd = false).fit(data)
  51.   val scaledData = scaler.transform(data).cache()

  52.   println("Count" + scaledData.count)

  53.   val matrix = new RowMatrix(scaledData)
  54.   val svd = matrix.computeSVD(3)

  55.   val sum = svd.s.toArray.sum

  56.   svd.s.toArray.zipWithIndex.foldLeft(0.0) {
  57.     case (cum, (curr, component)) =>
  58.       val percent = (cum + curr) / sum
  59.       println(s"Component and percent ${component + 1} :: $percent :::: Singular value is : $curr")
  60.       cum + curr
  61.   }

  62.   val pcomp: Matrix = matrix.computePrincipalComponents(3)
  63.   val reducedData = matrix.multiply(pcomp).rows

  64.   //println(s"Rows * Columns :${projectedData.numRows} * ${projectedData.numCols}")

  65.   //principalComponents.

  66.   //Decide number of clusters
  67.   val clusterCost = (1 to 7).map { noOfClusters =>
  68.     val kmeans = new KMeans()
  69.       .setK(noOfClusters)
  70.       .setMaxIterations(5)
  71.       .setInitializationMode(KMeans.K_MEANS_PARALLEL) //KMeans||

  72.     val model = kmeans.run(reducedData)
  73.     (noOfClusters, model.computeCost(reducedData))
  74.   }

  75.   println("Cluster cost on sample data ")
  76.   clusterCost.foreach(println)

  77. }
  78.   
复制代码

已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

hjtoh 发表于 2016-10-11 07:53:11 来自手机 |显示全部楼层 |坛友微信交流群
  1. package com.packt.scalada.learning

  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
  5. import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
  6. import org.apache.spark.mllib.evaluation.MulticlassMetrics
  7. import org.apache.spark.mllib.feature.HashingTF
  8. import org.apache.spark.mllib.feature.IDF
  9. import org.apache.spark.mllib.feature.IDFModel
  10. import org.apache.spark.mllib.linalg.Matrix
  11. import org.apache.spark.mllib.linalg.Vector
  12. import org.apache.spark.mllib.linalg.distributed.RowMatrix
  13. import org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm
  14. import org.apache.spark.mllib.regression.LabeledPoint
  15. import org.apache.spark.rdd.RDD
  16. import org.apache.spark.sql.SQLContext
  17. import epic.preprocess.MLSentenceSegmenter
  18. import epic.preprocess.TreebankTokenizer
  19. import org.apache.spark.mllib.regression.GeneralizedLinearModel
  20. import org.apache.spark.mllib.feature.StandardScaler
  21. import org.apache.spark.mllib.linalg.Vectors

  22. object PCASpam extends App {

  23.   val conf = new SparkConf().setAppName("PCASpam").setMaster("local[2]")
  24.   val sc = new SparkContext(conf)
  25.   val sqlContext = new SQLContext(sc)

  26.   case class Document(label: String, content: String)

  27.   val docs = sc.textFile("SMSSpamCollection").map(line => {
  28.     val words = line.split("\t")
  29.     Document(words.head.trim(), words.tail.mkString(" "))
  30.   }).cache()

  31.   val labeledPointsWithTf = getLabeledPoints(docs)
  32.   val lpTfIdf = withIdf(labeledPointsWithTf).cache()

  33.   //Split dataset
  34.   val spamPoints = lpTfIdf.filter(point => point.label == 1).randomSplit(Array(0.8, 0.2))
  35.   val hamPoints = lpTfIdf.filter(point => point.label == 0).randomSplit(Array(0.8, 0.2))

  36.   println("Spam count:" + (spamPoints(0).count) + "::" + (spamPoints(1).count))
  37.   println("Ham count:" + (hamPoints(0).count) + "::" + (hamPoints(1).count))

  38.   val trainingSpamSplit = spamPoints(0)
  39.   val testSpamSplit = spamPoints(1)

  40.   val trainingHamSplit = hamPoints(0)
  41.   val testHamSplit = hamPoints(1)

  42.   val trainingData = trainingSpamSplit ++ trainingHamSplit
  43.   val testSplit = testSpamSplit ++ testHamSplit

  44.   println ("Training split count : "+trainingData.count())
  45.   println ("Test split count : "+testSplit.count())

  46.   //Unscaled sample. Watch out. However, the actual dimension reduction is done on scaled data
  47. /* val dimensionDecidingSample=new RowMatrix((trainingSplit.randomSplit(Array(0.8,0.2))(1)).map(lp=>lp.features))
  48.   val svd = dimensionDecidingSample.computeSVD(100, computeU = false)
  49.    val sum = svd.s.toArray.sum
  50.   //Calculate the number of principal components which could retain a variance of 95%
  51.   val featureRange=(0 to 1000)
  52.   
  53.   val placeholder=svd.s.toArray.zip(featureRange).foldLeft(0.0) {
  54.     case (cum, (curr, component)) =>
  55.       val percent = (cum + curr) / sum
  56.       println(s"Component and percent ${component + 1} :: $percent :::: Singular value is : $curr")
  57.       cum + curr
  58.   }*/


  59.   val unlabeledTrainData = trainingData.map(lpoint => Vectors.dense(lpoint.features.toArray)).cache()
  60.   
  61.   //Scale data - Does not support scaling of SparseVector.  
  62.   val scaler = new StandardScaler(withMean = true, withStd = false).fit(unlabeledTrainData)
  63.   val scaledTrainingData = scaler.transform(unlabeledTrainData).cache()
  64.   
  65.   val trainMatrix = new RowMatrix(scaledTrainingData)
  66.   val pcomp: Matrix = trainMatrix.computePrincipalComponents(100)
  67.   
  68.   println ("trainMatrix dimension  "+ trainMatrix.numRows +"::"+trainMatrix.numCols)
  69.   println ("Pcomp dimension  "+ pcomp.numRows +"::"+pcomp.numCols)
  70.   
  71.   val reducedTrainingData = trainMatrix.multiply(pcomp).rows.cache()
  72.   val reducedTrainingSplit = trainingData.zip(reducedTrainingData).map { case (labeled, reduced) => new LabeledPoint(labeled.label, reduced) }
  73.   
  74.   val unlabeledTestData=testSplit.map(lpoint=>lpoint.features)
  75.   val testMatrix = new RowMatrix(unlabeledTestData)
  76.   val reducedTestData=testMatrix.multiply(pcomp).rows.cache()
  77.   val reducedTestSplit=testSplit.zip(reducedTestData).map{case (labeled,reduced) => new LabeledPoint (labeled.label, reduced)}
  78.   

  79.   //println("Reduced Data to debug string " + reducedTrainingData.toDebugString)
  80.   //Get the principal components
  81.   
  82.   val logisticWithBFGS = getAlgorithm(10, 1, 0.001)
  83.   val logisticWithBFGSPredictsActuals = runClassification(logisticWithBFGS, reducedTrainingSplit, reducedTestSplit)
  84.   calculateMetrics(logisticWithBFGSPredictsActuals, "Logistic with BFGS")

  85.   def getAlgorithm(iterations: Int, stepSize: Double, regParam: Double) = {
  86.     val algo = new LogisticRegressionWithLBFGS()
  87.     algo.setIntercept(true).optimizer.setNumIterations(iterations).setRegParam(regParam)
  88.     algo
  89.   }

  90.   def runClassification(algorithm: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel], trainingData: RDD[LabeledPoint],
  91.     testData: RDD[LabeledPoint]): RDD[(Double, Double)] = {
  92.     val model = algorithm.run(trainingData)
  93.     println ("predicting")
  94.     val predicted = model.predict(testData.map(point => point.features))
  95.     val actuals = testData.map(point => point.label)
  96.     val predictsAndActuals: RDD[(Double, Double)] = predicted.zip(actuals)
  97.     println (predictsAndActuals.collect)
  98.     predictsAndActuals
  99.   }

  100.   def calculateMetrics(predictsAndActuals: RDD[(Double, Double)], algorithm: String) {

  101.     val accuracy = 1.0 * predictsAndActuals.filter(predActs => predActs._1 == predActs._2).count() / predictsAndActuals.count()
  102.     val binMetrics = new BinaryClassificationMetrics(predictsAndActuals)
  103.     println(s"************** Printing metrics for $algorithm ***************")
  104.     println(s"Area under ROC ${binMetrics.areaUnderROC}")
  105.     println(s"Accuracy $accuracy")

  106.     val metrics = new MulticlassMetrics(predictsAndActuals)
  107.     println(s"Precision : ${metrics.precision}")
  108.     println(s"Confusion Matrix \n${metrics.confusionMatrix}")
  109.     println(s"************** ending metrics for $algorithm *****************")
  110.   }

  111.   def getLabeledPoints(docs: RDD[Document]): RDD[LabeledPoint] = {

  112.     //Use Scala NLP - Epic
  113.     val labeledPointsUsingEpicRdd: RDD[LabeledPoint] = docs.mapPartitions { docIter =>

  114.       val segmenter = MLSentenceSegmenter.bundled().get
  115.       val tokenizer = new TreebankTokenizer()
  116.       val hashingTf = new HashingTF(5000)

  117.       docIter.map { doc =>
  118.         val sentences = segmenter.apply(doc.content)
  119.         val features = sentences.flatMap(sentence => tokenizer(sentence))

  120.         //consider only features that are letters or digits and cut off all words that are less than 2 characters
  121.         features.toList.filter(token => token.forall(_.isLetterOrDigit)).filter(_.length() > 1)

  122.         new LabeledPoint(if (doc.label.equals("ham")) 0 else 1, hashingTf.transform(features))
  123.       }
  124.     }.cache()

  125.     labeledPointsUsingEpicRdd
  126.    
  127.   }
  128.   
  129.   def withIdf(lPoints: RDD[LabeledPoint]): RDD[LabeledPoint] = {
  130.     val hashedFeatures = labeledPointsWithTf.map(lp => lp.features)
  131.     val idf: IDF = new IDF()
  132.     val idfModel: IDFModel = idf.fit(hashedFeatures)

  133.     val tfIdf: RDD[Vector] = idfModel.transform(hashedFeatures)

  134.     val lpTfIdf = labeledPointsWithTf.zip(tfIdf).map {
  135.       case (originalLPoint, tfIdfVector) => {
  136.         new LabeledPoint(originalLPoint.label, tfIdfVector)
  137.       }
  138.     }

  139.     lpTfIdf
  140.   }

  141. }
  142.   
复制代码

已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

Kamize 学生认证  发表于 2016-10-11 12:26:20 来自手机 |显示全部楼层 |坛友微信交流群
Nicolle 发表于 2016-10-11 07:39
Scala Data Analysis Cookbook
**** 本内容被作者隐藏 ****

谢谢楼主分享的资料
已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

franky_sas 发表于 2016-10-11 13:42:12 |显示全部楼层 |坛友微信交流群
已有 1 人评分论坛币 收起 理由
Nicolle + 20 鼓励积极发帖讨论

总评分: 论坛币 + 20   查看全部评分

使用道具

frankeml 在职认证  发表于 2016-10-12 13:35:05 |显示全部楼层 |坛友微信交流群
{:2_25:}

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加JingGuanBbs
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-3-29 16:29