楼主: Lisrelchen
1346 9

【Apache Spark】Thunder:Large-scale neural data analysis with Spark [推广有奖]

  • 0关注
  • 62粉丝

VIP

已卖:4192份资源

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
50278 个
通用积分
83.5106
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

楼主
Lisrelchen 发表于 2017-4-18 03:46:14 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. thunder (homepage)Large-scale neural data analysis with Spark

  2. Thunder is a library for analyzing large-scale spatial and temporal neural data. It includes utilities for loading and saving data using a variety of input formats, classes for working with distributed spatial and temporal data, and modular functions for time series analysis, image processing, factorization, and model fitting. Thunder is written against Spark's Python API (PySpark), making extensive use of NumPy and SciPy. It is pip-installable, and requires a working installation of Spark (currently supporting versions 1.0 and 1.1).
复制代码



Tags
  • 3neuroscience
  • 2machine learning
  • 2python
  • 1mllib


How to

This package doesn't have any releases published in the Spark Packages repo, or with maven coordinates supplied. You may have to build this package from source, or it may simply be a script. To use this Spark Package, please follow the instructions in the README.

Releases

[url=]Version: 0.4.1[/url] ( bd268a | zip ) / Date: 2014-11-27 / License: BSD 3-Clause




二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Apache Spark Large-Scale Analysis Analysi thunder extensive homepage includes loading library

本帖被以下文库推荐

沙发
Lisrelchen 发表于 2017-4-18 03:49:29
  1. package thunder.examples

  2. import org.apache.spark.{SparkContext, SparkConf}
  3. import thunder.util.Load

  4. object ExampleLoad {

  5.   def main(args: Array[String]) {

  6.     val master = args(0)

  7.     val file = args(1)

  8.     val conf = new SparkConf().setMaster(master).setAppName("ExampleLoad")

  9.     if (!master.contains("local")) {
  10.       conf.setSparkHome(System.getenv("SPARK_HOME"))
  11.         .setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
  12.         .set("spark.executor.memory", "100G")
  13.     }

  14.     val sc = new SparkContext(conf)

  15.     val data = Load.fromBinaryWithKeys(sc, file, recordLength = args(2).toInt, nKeys = 3, format="short")

  16.     data.take(10).foreach{x =>
  17.       println(x._1.mkString(" "))
  18.       println(x._2.mkString(" "))
  19.     }

  20.     println(data.count())

  21.   }
  22. }
复制代码

藤椅
Lisrelchen 发表于 2017-4-18 03:50:30
  1. package thunder.examples

  2. import thunder.util.LoadParam

  3. object ExampleLoadParam {

  4.   def main(args: Array[String]) {

  5.     val file = args(0)

  6.     val param = LoadParam.fromText(file)

  7.     println(param.toDebugString)

  8.   }
  9. }
复制代码

板凳
Lisrelchen 发表于 2017-4-18 03:51:02
  1. package thunder.examples

  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. import thunder.util.LoadStreaming

  5. object ExampleLoadStreaming {

  6.   def main(args: Array[String]) {

  7.     val master = args(0)

  8.     val file = args(1)

  9.     val batchTime = args(2).toLong

  10.     val conf = new SparkConf().setMaster(master).setAppName("ExampleLoadStreaming")

  11.     if (!master.contains("local")) {
  12.       conf.setSparkHome(System.getenv("SPARK_HOME"))
  13.         .setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
  14.         .set("spark.executor.memory", "100G")
  15.     }

  16.     val ssc = new StreamingContext(conf, Seconds(batchTime))

  17.     val data = LoadStreaming.fromBinary(ssc, file)

  18.     data.map(v => v.mkString(" ")).print()

  19.     data.foreachRDD(rdd => println(rdd.count()))

  20.     ssc.start()
  21.     ssc.awaitTermination()

  22.   }
  23. }
复制代码

报纸
Lisrelchen 发表于 2017-4-18 03:51:40
  1. package thunder.examples

  2. import org.apache.spark.{SparkContext, SparkConf}
  3. import thunder.util.Save

  4. object ExampleSave {

  5.   def main(args: Array[String]) {

  6.     val master = args(0)

  7.     val file = args(1)

  8.     val directory = args(2)

  9.     val conf = new SparkConf().setMaster(master).setAppName("ExampleLoad")

  10.     if (!master.contains("local")) {
  11.       conf.setSparkHome(System.getenv("SPARK_HOME"))
  12.         .setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
  13.         .set("spark.executor.memory", "100G")
  14.     }

  15.     val sc = new SparkContext(conf)

  16.     val data = sc.parallelize(Seq((Array(3),Array(1.5)),(Array(2),Array(2.0)),(Array(1),Array(3.0))))

  17.     Save.asTextWithKeys(data, directory, Seq(file))

  18.   }
  19. }
复制代码

地板
Lisrelchen 发表于 2017-4-18 03:52:27
  1. package thunder.streaming

  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.streaming._
  6. import org.apache.spark.streaming.StreamingContext._
  7. import org.apache.spark.streaming.dstream.DStream

  8. import thunder.util.LoadStreaming
  9. import thunder.util.Save
  10. import thunder.util.io.Keys
  11. import org.apache.spark.Logging
  12. import scala.math.sqrt
  13. import scala.Some
  14. import cern.colt.matrix.DoubleFactory2D
  15. import cern.colt.matrix.DoubleFactory1D
  16. import cern.colt.matrix.DoubleMatrix2D
  17. import cern.colt.matrix.DoubleMatrix1D
  18. import cern.jet.math.Functions.{plus, minus, bindArg2, pow}
  19. import cern.colt.matrix.linalg.Algebra.DEFAULT.{inverse, mult, transpose}


  20. /** Class for representing parameters and sufficient statistics for a running linear regression model */
  21. class FittedModel(
  22.    var count: Double,
  23.    var mean: Double,
  24.    var sumOfSquaresTotal: Double,
  25.    var sumOfSquaresError: Double,
  26.    var XX: DoubleMatrix2D,
  27.    var Xy: DoubleMatrix1D,
  28.    var beta: DoubleMatrix1D) extends Serializable {

  29.   def variance = sumOfSquaresTotal / (count - 1)

  30.   def std = sqrt(variance)

  31.   def R2 = 1 - sumOfSquaresError / sumOfSquaresTotal

  32.   def intercept = beta.toArray()(0)

  33.   def weights = beta.toArray.drop(1)

  34. }

  35. /**
  36. * Stateful linear regression on streaming data
  37. *
  38. * The underlying model is that every batch of streaming
  39. * data contains a set of records with unique keys,
  40. * one is the feature, and the rest can be predicted
  41. * by the feature. We estimate the sufficient statistics
  42. * of the features, and each of the data points,
  43. * to computing a running estimate of the linear regression
  44. * model for each key. Returns a state stream of fitted models.
  45. *
  46. * Features and labels from different batches
  47. * can have different lengths.
  48. *
  49. * See also: StreamingLinearRegression
  50. */
  51. class StatefulLinearRegression (
  52.   var featureKeys: Array[Int])
  53.   extends Serializable with Logging
  54. {

  55.   def this() = this(Array(0))

  56.   /** Set which indices that correspond to features. Default: Array(0). */
  57.   def setFeatureKeys(featureKeys: Array[Int]): StatefulLinearRegression = {
  58.     this.featureKeys = featureKeys
  59.     this
  60.   }

  61.   val runningLinearRegression = (input: Seq[Array[Double]], state: Option[FittedModel], features: Array[Double]) => {

  62.     val y = input.foldLeft(Array[Double]()) { (acc, i) => acc ++ i}
  63.     val currentCount = y.size
  64.     val n = features.size

  65.     val updatedState = state.getOrElse(new FittedModel(0.0, 0.0, 0.0, 0.0, DoubleFactory2D.dense.make(2, 2),
  66.       DoubleFactory1D.dense.make(2), DoubleFactory1D.dense.make(2)))

  67.     if ((currentCount != 0) & (n != 0)) {

  68.       // append column of 1s
  69.       val X = DoubleFactory2D.dense.make(n, 2)
  70.       for (i <- 0 until n) {
  71.         X.set(i, 0, 1)
  72.       }
  73.       for (i <- 0 until n ; j <- 1 until 2) {
  74.         X.set(i, j, features(i))
  75.       }

  76.       // create matrix version of y
  77.       val ymat = DoubleFactory1D.dense.make(currentCount)
  78.       for (i <- 0 until currentCount) {
  79.         ymat.set(i, y(i))
  80.       }

  81.       // store values from previous iteration (needed for update equations)
  82.       val oldCount = updatedState.count
  83.       val oldMean = updatedState.mean
  84.       val oldXy = updatedState.Xy.copy
  85.       val oldXX = updatedState.XX.copy
  86.       val oldBeta = updatedState.beta

  87.       // compute current estimates of all statistics
  88.       val currentMean = y.foldLeft(0.0)(_+_) / currentCount
  89.       val currentSumOfSquaresTotal = y.map(x => pow(x - currentMean, 2)).foldLeft(0.0)(_+_)
  90.       val currentXy = mult(transpose(X), ymat)
  91.       val currentXX = mult(transpose(X), X)

  92.       // compute new values for X*y (the sufficient statistic) and new beta (needed for update equations)
  93.       val newXX = oldXX.copy.assign(currentXX, plus)
  94.       val newXy = updatedState.Xy.copy.assign(currentXy, plus)
  95.       val newBeta = mult(inverse(newXX), newXy)

  96.       // compute terms for update equations
  97.       val delta = currentMean - oldMean
  98.       val term1 = ymat.copy.assign(mult(X, newBeta), minus).assign(bindArg2(pow, 2)).zSum
  99.       val term2 = mult(mult(oldXX, newBeta), newBeta)
  100.       val term3 = mult(mult(oldXX, oldBeta), oldBeta)
  101.       val term4 = 2 * mult(oldBeta.copy.assign(newBeta, minus), oldXy)

  102.       // update the all statistics of the fitted model
  103.       updatedState.count += currentCount
  104.       updatedState.mean += (delta * currentCount / (oldCount + currentCount))
  105.       updatedState.Xy = newXy
  106.       updatedState.XX = newXX
  107.       updatedState.beta = newBeta
  108.       updatedState.sumOfSquaresTotal += currentSumOfSquaresTotal + delta * delta * (oldCount * currentCount) / (oldCount + currentCount)
  109.       updatedState.sumOfSquaresError += term1 + term2 - term3 + term4
  110.     }

  111.     Some(updatedState)
  112.   }


  113.   def runStreaming(data: DStream[(Int, Array[Double])]): DStream[(Int, FittedModel)] = {

  114.     var features = Array[Double]()

  115.     data.filter{case (k, _) => featureKeys.contains(k)}.foreachRDD{rdd =>
  116.         val batchFeatures = rdd.values.collect().flatten
  117.         features = batchFeatures.size match {
  118.           case 0 => Array[Double]()
  119.           case _ => batchFeatures
  120.         }
  121.     }

  122.     data.filter{case (k, _) => !featureKeys.contains(k)}.updateStateByKey{
  123.       (x, y) => runningLinearRegression(x, y, features)}
  124.   }

  125. }

  126. /**
  127. * Top-level methods for calling Stateful Linear Regression.
  128. */
  129. object StatefulLinearRegression {

  130.   /**
  131.    * Train a Stateful Linear Regression model.
  132.    * We assume that in each batch of streaming data we receive
  133.    * one or more features and several vectors of labels, each
  134.    * with a unique key, and a subset of keys indicate the features.
  135.    * We fit separate linear models that relate the common features
  136.    * to the labels associated with each key.
  137.    *
  138.    * @param input DStream of (Int, Array[Double]) keyed data point
  139.    * @param featureKeys Array of keys associated with features
  140.    * @return DStream of (Int, LinearRegressionModel) with fitted regression models
  141.    */
  142.   def trainStreaming(input: DStream[(Int, Array[Double])],
  143.             featureKeys: Array[Int]): DStream[(Int, FittedModel)] =
  144.   {
  145.     new StatefulLinearRegression().setFeatureKeys(featureKeys).runStreaming(input)
  146.   }

  147.   def main(args: Array[String]) {
  148.     if (args.length != 6) {
  149.       System.err.println(
  150.         "Usage: StatefulLinearRegression <master> <directory> <batchTime> <outputDirectory> <dims> <featureKeys>")
  151.       System.exit(1)
  152.     }

  153.     val (master, directory, batchTime, outputDirectory, dims, features) = (
  154.       args(0), args(1), args(2).toLong, args(3),
  155.       args(4).drop(1).dropRight(1).split(",").map(_.trim.toInt),
  156.       Array(args(5).drop(1).dropRight(1).split(",").map(_.trim.toInt)))

  157.     val conf = new SparkConf().setMaster(master).setAppName("StatefulLinearRegression")

  158.     if (!master.contains("local")) {
  159.       conf.setSparkHome(System.getenv("SPARK_HOME"))
  160.           .setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
  161.           .set("spark.executor.memory", "100G")
  162.           .set("spark.default.parallelism", "100")
  163.     }

  164.     /** Get feature keys with linear indexing */
  165.     val featureKeys = Keys.subToInd(features, dims)

  166.     /** Create Streaming Context */
  167.     val ssc = new StreamingContext(conf, Seconds(batchTime))
  168.     ssc.checkpoint(System.getenv("CHECKPOINT"))

  169.     /** Load streaming data */
  170.     val data = LoadStreaming.fromTextWithKeys(ssc, directory, dims.size, dims)

  171.     /** Train Linear Regression models */
  172.     val state = StatefulLinearRegression.trainStreaming(data, featureKeys)

  173.     /** Print output (for testing) */
  174.     state.mapValues(x => "\n" + "mean: " + "%.5f".format(x.mean) +
  175.                          "\n" + "count: " + "%.5f".format(x.count) +
  176.                          "\n" + "variance: " + "%.5f".format(x.variance) +
  177.                          "\n" + "beta: " + x.beta.toArray.mkString(",") +
  178.                          "\n" + "R2: " + "%.5f".format(x.R2) +
  179.                          "\n" + "SSE: " + "%.5f".format(x.sumOfSquaresError) +
  180.                          "\n" + "SST: " + "%.5f".format(x.sumOfSquaresTotal) +
  181.                          "\n" + "Xy: " + x.Xy.toArray.mkString(",")).print()

  182.     ///** Save output (for production) */
  183.     //val out = state.mapValues(x => Array(x.R2))
  184.     //Save.saveStreamingDataAsText(out, outputDirectory, Seq("r2"))

  185.     ssc.start()
  186.     ssc.awaitTermination()
  187.   }

  188. }
复制代码

7
Lisrelchen 发表于 2017-4-18 03:53:26
  1. package thunder.streaming

  2. import org.apache.spark.{SparkConf, Logging}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.SparkContext._
  5. import org.apache.spark.streaming._
  6. import org.apache.spark.streaming.dstream.DStream
  7. import org.apache.spark.mllib.linalg.{Vectors, Vector}
  8. import org.apache.spark.mllib.streaming.StreamingKMeansModel

  9. import scala.util.Random.nextDouble
  10. import scala.util.Random.nextGaussian

  11. import thunder.util.LoadStreaming

  12. /**
  13. * K-means clustering on streaming data with support for
  14. * sequential and forgetful algorithms.
  15. *
  16. * The underlying assumption is that all streaming data points
  17. * belong to one of several clusters, and we want to
  18. * learn the identity of those clusters (the "KMeans Model")
  19. * as new data arrive. All records MUST have the same dimensionality.
  20. *
  21. * For sequential algorithms, we update the underlying
  22. * cluster identities once for each batch of data, and keep
  23. * a count of the number of data points per cluster.
  24. * The number of data points per batch can be arbitrary.
  25. * This implementation is based on the sequential k-means algorithm
  26. * (see https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm)
  27. * except that each update is based on a batch of data rather than
  28. * a single data point. It is also similar to the offline mini-batch
  29. * algorithm (see http://www.eecs.tufts.edu/~dsculley/papers/fastkmeans.pdf)
  30. * except that batches arrive over time, rather than through sampling.
  31. *
  32. * For forgetful algorithms, each new batch of data is weighted in
  33. * its contribution so that more recent data is weighted more strongly
  34. * (see https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm).
  35. * The weighting is per batch (i.e. per time window), rather than per data point,
  36. * so for meaningful interpretation, the number of data points per batch
  37. * should be approximately constant.
  38. *
  39. */
  40. class StreamingKMeans (
  41.   var k: Int,
  42.   var d: Int,
  43.   var a: Double,
  44.   var maxIterations: Int,
  45.   var initializationMode: String)
  46.   extends Serializable with Logging
  47. {

  48.   private type ClusterCentersAndCounts = Array[(Array[Double], Int)]

  49.   /** Construct a StreamingKMeans object with default parameters */
  50.   def this() = this(2, 5, 1.0, 1, "gauss")

  51.   /** Set the number of clusters to create (k). Default: 2. */
  52.   def setK(k: Int): StreamingKMeans = {
  53.     this.k = k
  54.     this
  55.   }

  56.   /** Set the dimensionality of the data (d). Default: 5
  57.     * TODO: if possible, set this automatically based on first data point
  58.     */
  59.   def setD(d: Int): StreamingKMeans = {
  60.     this.d = d
  61.     this
  62.   }

  63.   /**
  64.    * Set the parameter alpha to determine the update rule.
  65.    * If alpha = 1, perform seqeutnail or "mini batch" KMeans, treating all
  66.    * data equivalently. If alpha < 1, perform forgetful KMeans,
  67.    * which uses a constant to weight old data
  68.    * less strongly (with exponential weighting), e.g. 0.9 will
  69.    * favor only recent data, whereas 0.1 will update slowly.
  70.    * Weighting over time is per batch, so this algorithm implicitly
  71.    * assumes an approximately constant number of data points per batch
  72.    * Default: 1 (sequential)
  73.    */
  74.   def setAlpha(a: Double): StreamingKMeans = {
  75.     this.a = a
  76.     this
  77.   }

  78.   // TODO: characterize the effect of max iterations in forgetful version
  79.   /** Set the maximum number of iterations per batch of data. */
  80.   def setMaxIterations(maxIterations: Int): StreamingKMeans = {
  81.     this.maxIterations = maxIterations
  82.     this
  83.   }

  84.   /**
  85.    * Set the initialization algorithm. Unlike batch KMeans, we
  86.    * initialize randomly before we have seen any data. Options are "gauss"
  87.    * for random Gaussian centers, and "pos" for random positive uniform centers.
  88.    * Default: gauss
  89.    */
  90.   def setInitializationMode(initializationMode: String): StreamingKMeans = {
  91.     if (initializationMode != "gauss" && initializationMode != "pos") {
  92.       throw new IllegalArgumentException("Invalid initialization mode: " + initializationMode)
  93.     }
  94.     this.initializationMode = initializationMode
  95.     this
  96.   }


  97.   /** Initialize random points for KMeans clustering */
  98.   def initRandom(): StreamingKMeansModel = {

  99.     val clusters = new Array[(Array[Double], Int)](k)
  100.     for (ik <- 0 until k) {
  101.       clusters(ik) = initializationMode match {
  102.         case "gauss" => (Array.fill(d)(nextGaussian()), 0)
  103.         case "pos" => (Array.fill(d)(nextDouble()), 0)
  104.       }
  105.     }
  106.     new StreamingKMeansModel(clusters.map(_._1).map(x => Vectors.dense(x)), clusters.map(_._2))
  107.   }

  108.   // TODO: stop iterating if clusters have converged
  109.   /** Update KMeans clusters by doing training passes over an RDD */
  110.   def update(data: RDD[Vector], model: StreamingKMeansModel): StreamingKMeansModel = {

  111.     val centers = model.clusterCenters
  112.     val counts = model.clusterCounts

  113.     // do iterative KMeans updates on a batch of data
  114.     for (i <- Range(0, maxIterations)) {
  115.       // find nearest cluster to each point
  116.       val closest = data.map(point => (model.predict(point), (point, 1)))

  117.       // get sums and counts for updating each cluster
  118.       val pointStats = closest.reduceByKey{
  119.         case ((x1, y1), (x2, y2)) => (Vectors.dense(x1.toArray.zip(x2.toArray).map{case (x, y) => x + y}), y1 + y2)}
  120.       val newPoints = pointStats.map{
  121.         pair => (pair._1, (pair._2._1, pair._2._2))}.collectAsMap()

  122.       a match {
  123.         case 1 => for (newP <- newPoints) {
  124.           // remove previous count scaling
  125.           centers(newP._1) = Vectors.dense(centers(newP._1).toArray.map(x => x * counts(newP._1)))
  126.           // update sums
  127.           centers(newP._1) = Vectors.dense(centers(newP._1).toArray.zip(newP._2._1.toArray).map{case (x, y) => x + y})
  128.           // update counts
  129.           counts(newP._1) += newP._2._2
  130.           // rescale to compute new means (of both old and new points)
  131.           centers(newP._1) = Vectors.dense(centers(newP._1).toArray.map(x => x / counts(newP._1)))
  132.         }
  133.         case _ => for (newP <- newPoints) {
  134.           // update centers with forgetting factor a
  135.           centers(newP._1) = Vectors.dense(centers(newP._1).toArray.zip(newP._2._1.toArray.map(x => x / newP._2._2)).map{
  136.             case (x, y) => x + a * (y - x)})
  137.         }
  138.       }

  139.       val model.clusterCenters = centers
  140.     }

  141.     // log the cluster centers
  142.     centers.zip(Range(0, centers.length)).foreach{
  143.       case (x, ix) => logInfo("Cluster center " + ix.toString + ": " + x.toString)}

  144.     new StreamingKMeansModel(centers, counts)

  145.   }

  146.   /**
  147.    * Main streaming operation: initialize the KMeans model
  148.    * and then update it based on new data from the stream.
  149.    */
  150.   def runStreaming(data: DStream[Vector]): DStream[Int] = {
  151.     var model = initRandom()
  152.     data.foreachRDD(RDD => model = update(RDD, model))
  153.     data.map(point => model.predict(point))
  154.   }

  155. }

  156. /** Top-level methods for calling Streaming KMeans clustering. */
  157. object StreamingKMeans {

  158.   /**
  159.    * Train a Streaming KMeans model. We initialize a set of
  160.    * cluster centers randomly and then update them
  161.    * after receiving each batch of data from the stream.
  162.    * If a = 1 this is equivalent to sequential or mini-batch KMeans,
  163.    * where each batch of data from the stream is treated as a different
  164.    * mini-batch. If a < 1, perform forgetful KMeans, which
  165.    * weights more recent data points more strongly.
  166.    *
  167.    * @param input Input DStream of (Array[Double]) data points
  168.    * @param k Number of clusters to estimate.
  169.    * @param d Number of dimensions per data point.
  170.    * @param a Update rule (1 mini batch, < 1 forgetful).
  171.    * @param maxIterations Maximum number of iterations per batch.
  172.    * @param initializationMode Random initialization of cluster centers.
  173.    * @return Output DStream of (Int) assignments of data points to clusters.
  174.    */
  175.   def trainStreaming(input: DStream[Vector],
  176.       k: Int,
  177.       d: Int,
  178.       a: Double,
  179.       maxIterations: Int,
  180.       initializationMode: String)
  181.     : DStream[Int] =
  182.   {
  183.     new StreamingKMeans().setK(k)
  184.                          .setD(d)
  185.                          .setAlpha(a)
  186.                          .setMaxIterations(maxIterations)
  187.                          .setInitializationMode(initializationMode)
  188.                          .runStreaming(input)
  189.   }

  190.   def main(args: Array[String]) {
  191.     if (args.length != 8) {
  192.       System.err.println("Usage: StreamingKMeans <master> <directory> <batchTime> <k> <d> <a> <maxIterations> <initializationMode>")
  193.       System.exit(1)
  194.     }

  195.     val (master, directory, batchTime, k, d, a, maxIterations, initializationMode) = (
  196.       args(0), args(1), args(2).toLong, args(3).toInt, args(4).toInt, args(5).toDouble, args(6).toInt, args(7))

  197.     val conf = new SparkConf().setMaster(master).setAppName("StreamingKMeans")

  198.     if (!master.contains("local")) {
  199.       conf.setSparkHome(System.getenv("SPARK_HOME"))
  200.         .setJars(List("target/scala-2.10/thunder_2.10-0.1.0.jar"))
  201.         .set("spark.executor.memory", "100G")
  202.     }

  203.     /** Create Streaming Context */
  204.     val ssc = new StreamingContext(conf, Seconds(batchTime))

  205.     /** Train KMeans model */
  206.     val data = LoadStreaming.fromText(ssc, directory).map(x => Vectors.dense(x))
  207.     val assignments = StreamingKMeans.trainStreaming(data, k, d, a, maxIterations, initializationMode)

  208.     /** Print assignments (for testing) */
  209.     assignments.print()

  210.     ssc.start()
  211.     ssc.awaitTermination()
  212.   }

  213. }
复制代码

8
MouJack007 发表于 2017-4-18 07:09:49
谢谢楼主分享!

9
MouJack007 发表于 2017-4-18 07:10:08

10
franky_sas 发表于 2017-4-18 09:57:53

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群
GMT+8, 2025-12-6 06:49