请选择 进入手机版 | 继续访问电脑版
楼主: Lisrelchen
2071 9

[程序汇编]Sparks using Scala,Java and Python [推广有奖]

  • 0关注
  • 62粉丝

VIP

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
49957 个
通用积分
79.5487
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

Lisrelchen 发表于 2016-3-29 10:08:13 |显示全部楼层 |坛友微信交流群

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. #
  2. # Spark for Python Developers
  3. # Chapter 1 - Code Word Count
  4. #

  5. # Word count on manuscript using PySpark

  6. # import regex module
  7. import re
  8. # import add from operator module
  9. from operator import add


  10. # read input file
  11. file_in = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py 20150315')

  12. # count lines
  13. print('number of lines in file: %s' % file_in.count())

  14. # add up lenths of each line
  15. #
  16. chars = file_in.map(lambda s: len(s)).reduce(add)
  17. print('number of characters in file: %s' % chars)

  18. # Get words from the input file
  19. words =file_in.flatMap(lambda line: re.split('\W+', line.lower().strip()))

  20. # words of more than 3 characters
  21. words = words.filter(lambda x: len(x) > 3)

  22. # set count 1 per word
  23. words = words.map(lambda w: (w,1))

  24. # reduce phase - sum count all the words
  25. words = words.reduceByKey(add)

  26. # create tuple (count, word) and sort in descending
  27. words = words.map(lambda x: (x[1], x[0])).sortByKey(False)

  28. # take top 20 words by frequency
  29. words.take(20)

  30. # create function for hitogram of most frequent words
  31. #

  32. % matplotlib inline
  33. import matplotlib.pyplot as plt
  34. #

  35. def histogram(words):
  36.     count = map(lambda x: x[1], words)
  37.     word = map(lambda x: x[0], words)
  38.     plt.barh(range(len(count)), count,color = 'grey')
  39.     plt.yticks(range(len(count)), word)

  40. # Change order of tuple (word, count) from (count, word)
  41. words = words.map(lambda x:(x[1], x[0]))
  42. words.take(25)

  43. # display histogram
  44. histogram(words.take(25))

  45. # words in one summarised statement
  46. words = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py 20150315')
  47.         .flatMap(lambda line: re.split('\W+', line.lower().strip()))
  48.         .filter(lambda x: len(x) > 3)
  49.         .map(lambda w: (w,1))
  50.         .reduceByKey(add)
  51.         .map(lambda x: (x[1], x[0])).sortByKey(False)
  52. words.take(20)
复制代码
Spark for Python Developers
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Sparks python SCALA Using Parks Java

Lisrelchen 发表于 2016-3-29 10:15:38 |显示全部楼层 |坛友微信交流群
[code]##
#
# Spark for Python - Chapter 5 - Code
#
##

#
# Spark Streaming Wordcount - netcat client
#
# Spark example code  from:
# https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py
#
"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <hostname> <port>
   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
    `$ nc -lk 9999`
and then run the example
    `$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

#
#
# Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live
#
#

"""
Twitter Streaming API Spark Streaming into an RDD-Queue to process tweets live


Create a queue of RDDs that will be mapped/reduced one at a time in
1 second intervals.

To run this example use
    '$ bin/spark-submit examples/AN_Spark/AN_Spark_Code/twitterstreaming.py'

"""
#
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import twitter
import dateutil.parser
import json

#
#
# Connecting Streaming Twitter with Streaming Spark via Queue
#


class Tweet(dict):
    def __init__(self, tweet_in):
        super(Tweet, self).__init__(self)
        if tweet_in and 'delete' not in tweet_in:
            self['timestamp'] = dateutil.parser.parse(tweet_in[u'created_at']
                                ).replace(tzinfo=None).isoformat()
            self['text'] = tweet_in['text'].encode('utf-8')
            #self['text'] = tweet_in['text']
            self['hashtags'] = [x['text'].encode('utf-8') for x in tweet_in['entities']['hashtags']]
            #self['hashtags'] = [x['text'] for x in tweet_in['entities']['hashtags']]
            self['geo'] = tweet_in['geo']['coordinates'] if tweet_in['geo'] else None
            self['id'] = tweet_in['id']
            self['screen_name'] = tweet_in['user']['screen_name'].encode('utf-8')
            #self['screen_name'] = tweet_in['user']['screen_name']
            self['user_id'] = tweet_in['user']['id']

def connect_twitter():
    twitter_stream = twitter.TwitterStream(auth=twitter.OAuth(
        token = "get_your_own_credentials",
        token_secret = "get_your_own_credentials",
        consumer_key = "get_your_own_credentials",
        consumer_secret = "get_your_own_credentials"))
    return twitter_stream

def get_next_tweet(twitter_stream):
    stream = twitter_stream.statuses.sample(block=True)
    # testing = stream.next() # This is just to make sure the stream is emitting data.
    tweet_in = None
    while not tweet_in or 'delete' in tweet_in:
        tweet_in = stream.next()
        tweet_parsed = Tweet(tweet_in)
        # print(json.dumps(tweet_in, indent=2, sort_keys=True))
    # return json.dumps(tweet_in, indent=2, sort_keys=True)
    return json.dumps(tweet_parsed)

def process_rdd_queue(twitter_stream):
    # Create the queue through which RDDs can be pushed to
    # a QueueInputDStream
    rddQueue = []
    for i in range(3):
        rddQueue += [ssc.sparkContext.parallelize([get_next_tweet(twitter_stream)], 5)]

    lines = ssc.queueStream(rddQueue)
    lines.pprint()
   
if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingQueueStream")
    ssc = StreamingContext(sc, 1)
   
    # Instantiate the twitter_stream
    twitter_stream = connect_twitter()
    # Get RDD queue of the streams json or parsed
    process_rdd_queue(twitter_stream)
   
    ssc.start()
    time.sleep(2)
    ssc.stop(stopSparkContext=True, stopGraceFully=True)

#
#
# Kafka and Spark Streaming
#
#

#
# kafka producer
#
#
import time
from kafka.common import LeaderNotAvailableError
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
from datetime import datetime

def print_response(response=None):
    if response:
        print('Error: {0}'.format(response[0].error))
        print('Offset: {0}'.format(response[0].offset))

def main():
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    try:
        time.sleep(5)
        topic = 'test'
        for i in range(5):
            time.sleep(1)
            msg = 'This is a message sent from the kafka producer: ' \
                  + str(datetime.now().time()) + ' -- '\
                  + str(datetime.now().strftime("%A, %d %B %Y %I:%M%p"))
            print_response(producer.send_messages(topic, msg))
    except LeaderNotAvailableError:
        # https://github.com/mumrah/kafka-python/issues/249
        time.sleep(1)
        print_response(producer.send_messages(topic, msg))

    kafka.close()

if __name__ == "__main__":
    main()

#   
# kafka consumer
# consumes messages from a

使用道具

Lisrelchen 发表于 2016-3-29 10:19:28 |显示全部楼层 |坛友微信交流群
To Be Continued

使用道具

Lisrelchen 发表于 2016-3-29 10:24:07 |显示全部楼层 |坛友微信交流群
  1. package com.github.giocode.graphxbook

  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.graphx._
  6. import org.apache.spark.rdd.RDD

  7. object SimpleGraphApp {
  8.         def main(args: Array[String]){
  9.                 // Configure the program
  10.                 val conf = new SparkConf()
  11.                                         .setAppName("Tiny Social")
  12.                                         .setMaster("local")
  13.                                         .set("spark.driver.memory", "2G")
  14.                 val sc = new SparkContext(conf)

  15.                 // Load some data into RDDs
  16.                 val people = sc.textFile("../../data/people.csv")
  17.                 val links = sc.textFile("../../data/links.csv")

  18.                 // Parse the csv files into new RDDs
  19.                 case class Person(name: String, age: Int)
  20.                 type Connexion = String
  21.                 val peopleRDD: RDD[(VertexId, Person)] = people map { line =>
  22.                         val row = line split ','
  23.                         (row(0).toInt, Person(row(1), row(2).toInt))
  24.                 }
  25.                 val linksRDD: RDD[Edge[Connexion]] = links map {line =>
  26.                         val row = line split ','
  27.                         Edge(row(0).toInt, row(1).toInt, row(2))
  28.                 }

  29.                 // Create the social graph of people
  30.                 val tinySocial: Graph[Person, Connexion] = Graph(peopleRDD, linksRDD)
  31.                 tinySocial.cache()
  32.                 tinySocial.vertices.collect()
  33.                 tinySocial.edges.collect()

  34.                 // Extract links between coworkers and print their professional relationship
  35.                 val profLinks: List[Connexion] = List("coworker", "boss", "employee","client", "supplier")
  36.                 tinySocial.subgraph(profLinks contains _.attr).
  37.                                    triplets.foreach(t => println(t.srcAttr.name + " is a " + t.attr + " of " + t.dstAttr.name))
  38.         }
  39. }
复制代码

Apache Spark Graph Processing

使用道具

Lisrelchen 发表于 2016-3-29 10:25:00 |显示全部楼层 |坛友微信交流群
  1. import org.apache.spark.graphx._
  2. import org.apache.spark.rdd._
  3. import breeze.linalg.SparseVector
  4. import scala.io.Source
  5. import scala.math.abs
  6. type Feature = breeze.linalg.SparseVector[Int]

  7. // This takes the ego.feat file and creates a Map of feature
  8. // The vertexId is created by hashing the string identifier to a Long
  9. val featureMap: Map[Long, Feature] =
  10.         Source.fromFile("./data/ego.feat").
  11.                         getLines().
  12.                         map{ line =>
  13.                                 val row = line split ' '
  14.                                 val key = abs(row.head.hashCode.toLong)
  15.                                 val feat: Feature = SparseVector(row.tail.map(_.toInt))
  16.                                 (key, feat)
  17.                         }.toMap


  18. val edges: RDD[Edge[Int]] =
  19.   sc.textFile("./data/ego.edges").
  20.      map {line =>
  21.               val row = line split ' '
  22.               val srcId = abs(row(0).hashCode.toLong)
  23.               val dstId = abs(row(1).hashCode.toLong)
  24.               val numCommonFeats = featureMap(srcId) dot featureMap(dstId)
  25.               Edge(srcId, dstId, numCommonFeats)
  26.      }

  27. val vertices:  RDD[(VertexId, Feature)] =
  28.         sc.textFile("./data/ego.edges").
  29.                 map{line =>
  30.                         val key = abs(line.hashCode.toLong)
  31.                         (key, featureMap(key))
  32. }

  33. val egoNetwork: Graph[Int,Int] = Graph.fromEdges(edges, 1)

  34. egoNetwork.edges.filter(_.attr == 3).count()
  35. egoNetwork.edges.filter(_.attr == 2).count()
  36. egoNetwork.edges.filter(_.attr == 1).count()
复制代码

Apache Spark Graph Processing

使用道具

Lisrelchen 发表于 2016-3-29 10:26:41 |显示全部楼层 |坛友微信交流群
  1. package com.github.giocode.graphxbook

  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. import org.apache.spark.graphx._
  6. import org.apache.spark.rdd.RDD

  7. import org.graphstream.graph.{Graph => GraphStream}
  8. import org.graphstream.graph.implementations._
  9. import org.graphstream.ui.j2dviewer.J2DGraphRenderer

  10. import org.jfree.chart.axis.ValueAxis
  11. import breeze.linalg.{SparseVector, DenseVector}
  12. import breeze.plot._
  13. import scala.io.Source
  14. import scala.math.abs





  15. object SimpleGraphVizApp {
  16.         def main(args: Array[String]){
  17.                 // Configure the program
  18.                 val conf = new SparkConf()
  19.                                                                 .setAppName("Tiny Social Viz")
  20.                                                                 .setMaster("local")
  21.                                                                 .set("spark.driver.memory", "2G")
  22.                 val sc = new SparkContext(conf)

  23. type Feature = breeze.linalg.SparseVector[Int]

  24. // This takes the ego.feat file and creates a Map of feature
  25. // The vertexId is created by hashing the string identifier to a Long
  26. val featureMap: Map[Long, Feature] =
  27.                 Source.fromFile("../../../data/ego.feat").
  28.                                                 getLines().
  29.                                                 map{ line =>
  30.                                                                 val row = line split ' '
  31.                                                                 val key = abs(row.head.hashCode.toLong)
  32.                                                                 val feat: Feature = SparseVector(row.tail.map(_.toInt))
  33.                                                                 (key, feat)
  34.                                                 }.toMap


  35. val edges: RDD[Edge[Int]] =
  36.         sc.textFile("../../../data/ego.edges").
  37.                  map {line =>
  38.                                 val row = line split ' '
  39.                                 val srcId = abs(row(0).hashCode.toLong)
  40.                                 val dstId = abs(row(1).hashCode.toLong)
  41.                                 val numCommonFeats = featureMap(srcId) dot featureMap(dstId)
  42.                                 Edge(srcId, dstId, numCommonFeats)
  43.                  }

  44. val vertices:  RDD[(VertexId, Feature)] =
  45.         sc.textFile("../../../data/ego.edges").
  46.                         map{line =>
  47.                                         val key = abs(line.hashCode.toLong)
  48.                                         (key, featureMap(key))
  49. }

  50. val egoNetwork: Graph[Int,Int] = Graph.fromEdges(edges, 1)
  51.                

  52.                 // Setup GraphStream settings
  53.                 System.setProperty("org.graphstream.ui.renderer", "org.graphstream.ui.j2dviewer.J2DGraphRenderer")

  54.                                                
  55.                 // Create a SingleGraph class for GraphStream visualization
  56.                 val graph: SingleGraph = new SingleGraph("EgoSocial")

  57.                 // Set up the visual attributes for graph visualization
  58.                 graph.addAttribute("ui.stylesheet", "url(file:../../..//style/stylesheet)")
  59.                 graph.addAttribute("ui.quality")
  60.                 graph.addAttribute("ui.antialias")


  61.                 // Load the graphX vertices into GraphStream nodes
  62.                 for ((id,_) <- egoNetwork.vertices.collect()) {
  63.                 val node = graph.addNode(id.toString).asInstanceOf[SingleNode]
  64.                 }
  65.                 // Load the graphX edges into GraphStream edges
  66.                 for (Edge(x,y,_) <- egoNetwork.edges.collect()) {
  67.                 val edge = graph.addEdge(x.toString ++ y.toString, x.toString, y.toString, true).asInstanceOf[AbstractEdge]
  68.                 }
  69.                
  70.                 // Display the graph
  71.                 graph.display()


  72.                 // Function for computing degree distribution
  73. def degreeHistogram(net: Graph[Int, Int]): Array[(Int, Int)] =
  74.         net.degrees.map(t => (t._2,t._1)).
  75.                   groupByKey.map(t => (t._1,t._2.size)).
  76.                   sortBy(_._1).collect()


  77. val nn = egoNetwork.numVertices
  78. val egoDegreeDistribution = degreeHistogram(egoNetwork).map({case (d,n) => (d,n.toDouble/nn)})

  79.                 // Plot degree distribution with breeze-viz
  80. val f = Figure()
  81. val p = f.subplot(0)
  82. val x = new DenseVector(egoDegreeDistribution map (_._1.toDouble))
  83. val y = new DenseVector(egoDegreeDistribution map (_._2))
  84. p.xlabel = "Degrees"
  85. p.ylabel = "Degree distribution"
  86. p += plot(x, y)
  87. // f.saveas("/output/degrees-ego.png")               

  88.         }
  89. }
复制代码

Apache Spark Graph Processing

使用道具

Lisrelchen 发表于 2016-3-29 10:29:21 |显示全部楼层 |坛友微信交流群
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
  3. import org.apache.spark.mllib.linalg.{Vector,Vectors}
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.mllib.regression.LabeledPoint
  6. import org.apache.spark.mllib.regression.LinearRegressionWithSGD
  7. import org.apache.spark.SparkConf

  8. object MLlib01 {
  9.         //
  10.         def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath
  11.                         //
  12.                         def parseCarData(inpLine : String) : Array[Double] = {
  13.                     val values = inpLine.split(',')
  14.                                 val mpg = values(0).toDouble
  15.                                 val displacement = values(1).toDouble
  16.                                 val hp = values(2).toInt
  17.                                 val torque = values(3).toInt
  18.                                 val CRatio = values(4).toDouble
  19.                                 val RARatio = values(5).toDouble // Rear Axle Ratio
  20.                                 val CarbBarrells = values(6).toInt
  21.                                 val NoOfSpeed = values(7).toInt
  22.                                 val length = values(8).toDouble
  23.                                 val width = values(9).toDouble
  24.                                 val weight = values(10).toDouble
  25.                                 val automatic = values(11).toInt
  26.                                 return Array(mpg,displacement,hp,
  27.                                                 torque,CRatio,RARatio,CarbBarrells,
  28.                                                 NoOfSpeed,length,width,weight,automatic)
  29.         }
  30.         //
  31.         def carDataToLP(inpArray : Array[Double]) : LabeledPoint = {
  32.                 return new LabeledPoint( inpArray(0),Vectors.dense ( inpArray(1), inpArray(2),inpArray(3),
  33.                                 inpArray(4), inpArray(5),inpArray(6),inpArray(7), inpArray(8),inpArray(9),
  34.                                 inpArray(10), inpArray(11) ) )
  35.         }
  36.         //
  37.         def main(args: Array[String]) {
  38.                 println(getCurrentDirectory)
  39.                 val conf = new SparkConf(false) // skip loading external settings
  40.                 .setMaster("local") // could be "local[4]" for 4 threads
  41.                 .setAppName("Chapter 9")
  42.                 .set("spark.logConf", "true")
  43.                 val sc = new SparkContext(conf) // ("local","Chapter 9") if using directly
  44.                 println(s"Running Spark Version ${sc.version}")
  45.                 //
  46.                 val dataFile = sc.textFile("/Users/ksankar/fdps-vii/data/car-milage-no-hdr.csv")
  47.                 val carRDD = dataFile.map(line => parseCarData(line))
  48.                 //
  49.                 // Let us find summary statistics
  50.                 //
  51.                 val vectors: RDD[Vector] = carRDD.map(v => Vectors.dense(v))
  52.                 val summary = Statistics.colStats(vectors)
  53.                 carRDD.foreach(ln=> {ln.foreach(no => print("%6.2f | ".format(no))); println()})
  54.                 print("Count :");println(summary.count)
  55.                 print("Max  :");summary.max.toArray.foreach(m => print("%5.1f | ".format(m)));println
  56.                 print("Min  :");summary.min.toArray.foreach(m => print("%5.1f | ".format(m)));println
  57.                 print("Mean :");summary.mean.toArray.foreach(m => print("%5.1f | ".format(m)));println
  58.                 //
  59.                 // correlations
  60.                 //
  61.                 val hp = vectors.map(x => x(2))
  62.                 val weight = vectors.map(x => x(10))
  63.                 var corP = Statistics.corr(hp,weight,"pearson") // default
  64.                 println("hp to weight : Pearson Correlation = %2.4f".format(corP))
  65.                 var corS = Statistics.corr(hp,weight,"spearman") // Need to specify
  66.                 println("hp to weight : Spearman Correlation = %2.4f".format(corS))
  67.                 //
  68.                 val raRatio = vectors.map(x => x(5))
  69.                 val width = vectors.map(x => x(9))
  70.                 corP = Statistics.corr(raRatio,width,"pearson") // default
  71.                 println("Rear Axle Ratio to width : Pearson Correlation = %2.4f".format(corP))
  72.                 corS = Statistics.corr(raRatio,width,"spearman") // Need to specify
  73.                 println("Rear Axle Ratio to width : Spearman Correlation = %2.4f".format(corS))
  74.                 //
  75.                 // Linear Regression
  76.                 //
  77.                 val carRDDLP = carRDD.map(x => carDataToLP(x)) // create a labeled point RDD
  78.                 println(carRDDLP.count())
  79.                 println(carRDDLP.first().label)
  80.                 println(carRDDLP.first().features)
  81.                 //
  82.                 // Let us split the data set into training & test set using a very simple filter
  83.                 //
  84.                 val carRDDLPTrain = carRDDLP.filter( x => x.features(9) <= 4000)
  85.                 val carRDDLPTest = carRDDLP.filter( x => x.features(9) > 4000)
  86.                 println("Training Set : " + "%3d".format(carRDDLPTrain.count()))
  87.                 println("Training Set : " + "%3d".format(carRDDLPTest.count()))
  88.                 //
  89.                 // Train a Linear Regression Model
  90.                 // numIterations = 100, stepsize = 0.000000001
  91.                 // without such a small step size the algorithm will diverge
  92.                 //
  93.                 val mdlLR = LinearRegressionWithSGD.train(carRDDLPTrain,100,0.000000001)
  94.                 println(mdlLR.intercept) // Intercept is turned off when using LinearRegressionSGD object,
  95.                              // so intercept will always be 0 for this code
  96.                 println(mdlLR.weights)
  97.                 //
  98.                 // Now let us use the model to predict our test set
  99.                 //
  100.                 val valuesAndPreds = carRDDLPTest.map(p => (p.label, mdlLR.predict(p.features)))
  101.                 val mse = valuesAndPreds.map( vp => math.pow( (vp._1 - vp._2),2 ) ).
  102.                 reduce(_+_) / valuesAndPreds.count()
  103.                 println("Mean Squared Error      = " + "%6.3f".format(mse))
  104.     println("Root Mean Squared Error = " + "%6.3f".format(math.sqrt(mse)))
  105.                 // Let us print what the model predicted
  106.                 valuesAndPreds.take(20).foreach(m => println("%5.1f | %5.1f |".format(m._1,m._2)))
  107.         }
  108. }
复制代码

Fast Data Processing with Spark - Second Edition

使用道具

Lisrelchen 发表于 2016-3-29 10:30:27 |显示全部楼层 |坛友微信交流群
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.mllib.regression.LabeledPoint
  3. import org.apache.spark.mllib.linalg.Vectors
  4. import org.apache.spark.mllib.tree.DecisionTree

  5. object MLlib02 {
  6.   //
  7.   def getCurrentDirectory = new java.io.File( "." ).getCanonicalPath
  8.   //
  9.   //  0 pclass,1 survived,2 l.name,3.f.name, 4 sex,5 age,6 sibsp,7 parch,8 ticket,9 fare,10 cabin,
  10.   // 11 embarked,12 boat,13 body,14 home.dest
  11.   //
  12.   def str2Double(x: String) : Double = {
  13.     try {
  14.       x.toDouble
  15.     } catch {
  16.       case e: Exception => 0.0
  17.     }
  18.   }
  19.   //
  20.   def parsePassengerDataToLP(inpLine : String) : LabeledPoint = {
  21.     val values = inpLine.split(',')
  22.     //println(values)
  23.     //println(values.length)
  24.     //
  25.     val pclass = str2Double(values(0))
  26.     val survived = str2Double(values(1))
  27.     // skip last name, first name
  28.     var sex = 0
  29.     if (values(4) == "male") {
  30.       sex = 1
  31.     }
  32.     var age = 0.0 // a better choice would be the average of all ages
  33.     age = str2Double(values(5))
  34.     //
  35.     var sibsp = 0.0
  36.     age = str2Double(values(6))
  37.     //
  38.     var parch = 0.0
  39.     age = str2Double(values(7))
  40.     //
  41.     var fare = 0.0
  42.     fare = str2Double(values(9))
  43.     return new LabeledPoint(survived,Vectors.dense(pclass,sex,age,sibsp,parch,fare))
  44.   }
  45.   //
  46.   def main(args: Array[String]): Unit = {
  47.     println(getCurrentDirectory)
  48.     val sc = new SparkContext("local","Chapter 8")
  49.     println(s"Running Spark Version ${sc.version}")
  50.     //
  51.     val dataFile = sc.textFile("/Users/ksankar/fdps-vii/data/titanic3_01.csv")
  52.     val titanicRDDLP = dataFile.map(_.trim).filter( _.length > 1).
  53.       map(line => parsePassengerDataToLP(line))
  54.     //
  55.     println(titanicRDDLP.count())
  56.     //titanicRDDLP.foreach(println)
  57.     //
  58.     println(titanicRDDLP.first().label)
  59.     println(titanicRDDLP.first().features)
  60.     //
  61.     val categoricalFeaturesInfo = Map[Int, Int]()
  62.     val mdlTree = DecisionTree.trainClassifier(titanicRDDLP, 2, // numClasses
  63.         categoricalFeaturesInfo, // all features are continuous
  64.         "gini", // impurity
  65.         5, // Maxdepth
  66.         32) //maxBins
  67.     //
  68.     //println(mdlTree.depth)
  69.     println(mdlTree)
  70.     //
  71.     // Let us predict on the data set and see how well it works
  72.     // In real world, we should split the data to train & test; then predict the test data
  73.     //
  74.     val predictions = mdlTree.predict(titanicRDDLP.map(x=>x.features))
  75.     val labelsAndPreds = titanicRDDLP.map(x=>x.label).zip(predictions)
  76.     //
  77.     val mse = labelsAndPreds.map( vp => math.pow( (vp._1 - vp._2),2 ) ).
  78.        reduce(_+_) / labelsAndPreds.count()
  79.     println("Mean Squared Error = " + "%6f".format(mse))
  80.     //
  81.     // labelsAndPreds.foreach(println)
  82.     //
  83.     val correctVals = labelsAndPreds.aggregate(0.0)((x, rec) => x + (rec._1 == rec._2).compare(false), _ + _)
  84.     val accuracy = correctVals/labelsAndPreds.count()
  85.     println("Accuracy = " + "%3.2f%%".format(accuracy*100))
  86.     //
  87.     println("*** Done ***")
  88.   }
  89. }
复制代码

Fast Data Processing with Spark - Second Edition

使用道具

Lisrelchen 发表于 2016-3-29 10:32:35 |显示全部楼层 |坛友微信交流群
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.mllib.linalg.{Vector,Vectors}
  3. import org.apache.spark.mllib.clustering.KMeans

  4. object MLlib03 {
  5.   def parsePoints(inpLine : String) : Vector = {
  6.     val values = inpLine.split(',')
  7.     val x = values(0).toInt
  8.     val y = values(1).toInt
  9.     return Vectors.dense(x,y)
  10.   }
  11.   //

  12.   def main(args: Array[String]): Unit = {
  13.     val sc = new SparkContext("local","Chapter 8")
  14.     println(s"Running Spark Version ${sc.version}")
  15.     //
  16.     val dataFile = sc.textFile("/Users/ksankar/fdps-vii/data/cluster-points.csv")
  17.     val points = dataFile.map(_.trim).filter( _.length > 1).map(line => parsePoints(line))
  18.     //
  19.     println(points.count())
  20.     //
  21.     var numClusters = 2
  22.     val numIterations = 20
  23.     var mdlKMeans = KMeans.train(points, numClusters, numIterations)
  24.     //
  25.     println(mdlKMeans.clusterCenters)
  26.     //
  27.     var clusterPred = points.map(x=>mdlKMeans.predict(x))
  28.     var clusterMap = points.zip(clusterPred)
  29.     //
  30.     clusterMap.foreach(println)
  31.     //
  32.     clusterMap.saveAsTextFile("/Users/ksankar/fdps-vii/data/3x-cluster.csv")
  33.     //
  34.     // Now let us try 4 centers
  35.     //
  36.     numClusters = 4
  37.     mdlKMeans = KMeans.train(points, numClusters, numIterations)
  38.     clusterPred = points.map(x=>mdlKMeans.predict(x))
  39.     clusterMap = points.zip(clusterPred)
  40.     clusterMap.saveAsTextFile("/Users/ksankar/fdps-vii/data/5x-cluster.csv")
  41.     clusterMap.foreach(println)
  42.   }
  43. }
复制代码

Fast Data Processing with Spark - Second Edition

使用道具

Lisrelchen 发表于 2016-3-29 10:34:12 |显示全部楼层 |坛友微信交流群
  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._ // for implicit conversations
  3. import org.apache.spark.mllib.recommendation.Rating
  4. import org.apache.spark.mllib.recommendation.ALS

  5. object MLlib04 {
  6.   
  7.   def parseRating1(line : String) : (Int,Int,Double,Int) = {
  8.     val x = line.split("::")
  9.     val userId = x(0).toInt
  10.     val movieId = x(1).toInt
  11.     val rating = x(2).toDouble
  12.     val timeStamp = x(3).toInt/10
  13.     return (userId,movieId,rating,timeStamp)
  14.   }
  15.   //
  16.   def parseRating(x : (Int,Int,Double,Int)) : Rating = {
  17.     val userId = x._1
  18.     val movieId = x._2
  19.     val rating = x._3
  20.     val timeStamp = x._4 // ignore
  21.     return new Rating(userId,movieId,rating)
  22.   }
  23.   //
  24.   def main(args: Array[String]): Unit = {
  25.     val sc = new SparkContext("local","Chapter 8")
  26.     println(s"Running Spark Version ${sc.version}")
  27.     //
  28.     val moviesFile = sc.textFile("/Users/ksankar/fdps-vii/data/medium/movies.dat")
  29.     val moviesRDD = moviesFile.map(line => line.split("::"))
  30.     println(moviesRDD.count())
  31.     //
  32.     val ratingsFile = sc.textFile("/Users/ksankar/fdps-vii/data/medium/ratings.dat")
  33.     val ratingsRDD = ratingsFile.map(line => parseRating1(line))
  34.     println(ratingsRDD.count())
  35.     //
  36.     ratingsRDD.take(5).foreach(println) // always check the RDD
  37.     //
  38.     val numRatings = ratingsRDD.count()
  39.     val numUsers = ratingsRDD.map(r => r._1).distinct().count()
  40.     val numMovies = ratingsRDD.map(r => r._2).distinct().count()
  41.     println("Got %d ratings from %d users on %d movies.".
  42.          format(numRatings, numUsers, numMovies))
  43.     //
  44.     // Split dataset into training, validation & test
  45.     // We can use random. But here we will use the last digit of the time stamp
  46.     //
  47.     val trainSet = ratingsRDD.filter(x => (x._4 % 10) < 6).map(x=>parseRating(x))
  48.     val validationSet = ratingsRDD.filter(x => (x._4 % 10) >= 6 & (x._4 % 10) < 8).map(x=>parseRating(x))
  49.     val testSet = ratingsRDD.filter(x => (x._4 % 10) >= 8).map(x=>parseRating(x))
  50.     println("Training: "+ "%d".format(trainSet.count()) +
  51.       ", validation: " + "%d".format(validationSet.count()) + ", test: " +
  52.       "%d".format(testSet.count()) + ".")
  53.     //
  54.     // Now train the model using the training set
  55.     val rank = 10
  56.     val numIterations = 20
  57.     val mdlALS = ALS.train(trainSet,rank,numIterations)
  58.     //
  59.     // prepare validation set for prediction
  60.     //
  61.     val userMovie = validationSet.map {
  62.       case Rating(user, movie, rate) =>(user, movie)
  63.     }
  64.     //
  65.     // Predict and convert to Key-Value PairRDD
  66.     val predictions = mdlALS.predict(userMovie).map {
  67.       case Rating(user, movie, rate) => ((user, movie), rate)
  68.     }
  69.     //
  70.     println(predictions.count())
  71.     predictions.take(5).foreach(println)
  72.     //
  73.     // Now convert the validation set to PairRDD
  74.     //
  75.     val validationPairRDD = validationSet.map(r => ((r.user, r.product), r.rating))
  76.     println(validationPairRDD.count())
  77.     validationPairRDD.take(5).foreach(println)
  78.     println(validationPairRDD.getClass())
  79.     println(predictions.getClass())
  80.     //
  81.     // Now join the validation set with Predictions
  82.     // Then we can figure out how good our recommendations are
  83.     // Tip:
  84.     //   Need to import org.apache.spark.SparkContext._
  85.     //   Then MappedRDD would be converted implicitly to PairRDD
  86.     //
  87.     val ratingsAndPreds = validationPairRDD.join(predictions)
  88.     println(ratingsAndPreds.count())
  89.     ratingsAndPreds.take(3).foreach(println)
  90.     //
  91.     val mse = ratingsAndPreds.map(r => {
  92.       math.pow((r._2._1 - r._2._2),2)
  93.     }).reduce(_+_) / ratingsAndPreds.count()
  94.     val rmse = math.sqrt(mse)
  95.     println("*** Model Performance Metrics ***")
  96.     println("MSE = %2.5f".format(mse))
  97.     println("RMSE = %2.5f".format(rmse))
  98.     println("** Done **") }
  99. }
复制代码

Fast Data Processing with Spark - Second Edition

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加JingGuanBbs
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-19 09:15