楼主: ReneeBK
2568 11

[GitHub]Taming Big Data with Apache Spark and Python [推广有奖]

  • 1关注
  • 62粉丝

VIP

学术权威

14%

还不是VIP/贵宾

-

TA的文库  其他...

R资源总汇

Panel Data Analysis

Experimental Design

威望
1
论坛币
49407 个
通用积分
51.8704
学术水平
370 点
热心指数
273 点
信用等级
335 点
经验
57815 点
帖子
4006
精华
21
在线时间
582 小时
注册时间
2005-5-8
最后登录
2023-11-26

1论坛币


About the Book

Frank Kane’s Taming Big Data with Apache Spark and Python is your companion to learning Apache Spark in a hands-on manner. Frank will start you off by teaching you how to set up Spark on a single system or on a cluster, and you’ll soon move on to analyzing large data sets using Spark RDD, and developing and running effective Spark jobs quickly using Python.

Apache Spark has emerged as the next big thing in the Big Data domain – quickly rising from an ascending technology to an established superstar in just a matter of years. Spark allows you to quickly extract actionable insights from large amounts of data, on a real-time basis, making it an essential tool in many modern businesses.

Frank has packed this book with over 15 interactive, fun-filled examples relevant to the real world, and he will empower you to understand the Spark ecosystem and implement production-grade real-time Spark projects with ease. ##Instructions and Navigation All of the code is organized into folders. Each folder starts with a number followed by the application name. For example, Chapter02.

The code will look like the following:

from pyspark import SparkConf, SparkContext import collections conf = SparkConf().setMaster("local").setAppName("RatingsHistogram") sc = SparkContext(conf = conf)lines = sc.textFile("file:///SparkCourse/ml-100k/u.data") ratings = lines.map(lambda x: x.split()[2]) result = ratings.countByValue() sortedResults = collections.OrderedDict(sorted(result.items())) for key, value in sortedResults.items():     print("%s %i" % (key, value))

For this book you’ll need a Python development environment (Python 3.5 or newer), a Canopy installer, Java Development Kit, and of course Spark itself (Spark 2.0 and beyond).

We'll show you how to install this software in first chapter of the book.

This book is based on the Windows operating system, so installations are provided according to it. If you have Mac or Linux, you can follow this URL http://media.sundog-soft.com/spark-python-install.pdf, which contains written instructions on getting everything set up on Mac OS and on Linux.

Related ProductsSuggestions and Feedback

Click here if you have any feedback or suggestions.



关键词:Apache Spark Big data python taming GitHub

本帖被以下文库推荐

沙发
ReneeBK 发表于 2017-7-7 03:55:27 |只看作者 |坛友微信交流群
  1. from pyspark import SparkConf, SparkContext

  2. conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
  3. sc = SparkContext(conf = conf)

  4. def parseLine(line):
  5.     fields = line.split(',')
  6.     age = int(fields[2])
  7.     numFriends = int(fields[3])
  8.     return (age, numFriends)

  9. lines = sc.textFile("file:///SparkCourse/fakefriends.csv")
  10. rdd = lines.map(parseLine)
  11. totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
  12. averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
  13. results = averagesByAge.collect()
  14. for result in results:
  15.     print(result)
复制代码

使用道具

藤椅
ReneeBK 发表于 2017-7-7 03:56:20 |只看作者 |坛友微信交流群
  1. from pyspark import SparkConf, SparkContext

  2. conf = SparkConf().setMaster("local").setAppName("MaxTemperatures")
  3. sc = SparkContext(conf = conf)

  4. def parseLine(line):
  5.     fields = line.split(',')
  6.     stationID = fields[0]
  7.     entryType = fields[2]
  8.     temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
  9.     return (stationID, entryType, temperature)

  10. lines = sc.textFile("file:///SparkCourse/1800.csv")
  11. parsedLines = lines.map(parseLine)
  12. maxTemps = parsedLines.filter(lambda x: "TMAX" in x[1])
  13. stationTemps = maxTemps.map(lambda x: (x[0], x[2]))
  14. maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))
  15. results = maxTemps.collect();

  16. for result in results:
  17.     print(result[0] + "\t{:.2f}F".format(result[1]))
复制代码

使用道具

板凳
ReneeBK 发表于 2017-7-7 03:57:02 |只看作者 |坛友微信交流群
  1. from pyspark import SparkConf, SparkContext

  2. conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
  3. sc = SparkContext(conf = conf)

  4. def parseLine(line):
  5.     fields = line.split(',')
  6.     stationID = fields[0]
  7.     entryType = fields[2]
  8.     temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
  9.     return (stationID, entryType, temperature)

  10. lines = sc.textFile("file:///SparkCourse/1800.csv")
  11. parsedLines = lines.map(parseLine)
  12. minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
  13. stationTemps = minTemps.map(lambda x: (x[0], x[2]))
  14. minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
  15. results = minTemps.collect();

  16. for result in results:
  17.     print(result[0] + "\t{:.2f}F".format(result[1]))
复制代码

使用道具

报纸
ReneeBK 发表于 2017-7-7 03:59:36 |只看作者 |坛友微信交流群
  1. from pyspark import SparkConf, SparkContext

  2. conf = SparkConf().setMaster("local").setAppName("PopularHero")
  3. sc = SparkContext(conf = conf)

  4. def countCoOccurences(line):
  5.     elements = line.split()
  6.     return (int(elements[0]), len(elements) - 1)

  7. def parseNames(line):
  8.     fields = line.split('\"')
  9.     return (int(fields[0]), fields[1].encode("utf8"))

  10. names = sc.textFile("file:///SparkCourse/marvel-names.txt")
  11. namesRdd = names.map(parseNames)

  12. lines = sc.textFile("file:///SparkCourse/marvel-graph.txt")

  13. pairings = lines.map(countCoOccurences)
  14. totalFriendsByCharacter = pairings.reduceByKey(lambda x, y : x + y)
  15. flipped = totalFriendsByCharacter.map(lambda (x,y) : (y,x))

  16. mostPopular = flipped.max()

  17. mostPopularName = namesRdd.lookup(mostPopular[1])[0]

  18. print(mostPopularName + " is the most popular superhero, with " + \
  19.     str(mostPopular[0]) + " co-appearances.")
复制代码

使用道具

地板
ReneeBK 发表于 2017-7-7 04:00:41 |只看作者 |坛友微信交流群
  1. import sys
  2. from pyspark import SparkConf, SparkContext
  3. from pyspark.mllib.recommendation import ALS, Rating

  4. def loadMovieNames():
  5.     movieNames = {}
  6.     with open("ml-1m/movies.dat") as f:
  7.         for line in f:
  8.             fields = line.split('::')
  9.             movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
  10.     return movieNames

  11. conf = SparkConf().setMaster("local[*]").setAppName("MovieRecommendationsALS")
  12. sc = SparkContext(conf = conf)
  13. sc.setCheckpointDir('checkpoint')

  14. print("\nLoading movie names...")
  15. nameDict = loadMovieNames()

  16. data = sc.textFile("file:///E:/SparkCourse/ml-1m/ratings.dat")

  17. ratings = data.map(lambda l: l.split("::")).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))).cache()

  18. # Build the recommendation model using Alternating Least Squares
  19. print("\nTraining recommendation model...")
  20. rank = 10
  21. numIterations = 20
  22. model = ALS.train(ratings, rank, numIterations)

  23. userID = int(sys.argv[1])

  24. print("\nRatings for user ID " + str(userID) + ":")
  25. userRatings = ratings.filter(lambda l: l[0] == userID)
  26. for rating in userRatings.collect():
  27.     print(nameDict[int(rating[1])] + ": " + str(rating[2]))

  28. print("\nTop 10 recommendations:")
  29. recommendations = model.recommendProducts(userID, 10)
  30. for recommendation in recommendations:
  31.     print(nameDict[int(recommendation[1])] + \
  32.         " score " + str(recommendation[2]))
复制代码

使用道具

7
ReneeBK 发表于 2017-7-7 04:02:43 |只看作者 |坛友微信交流群
  1. import sys
  2. from pyspark import SparkConf, SparkContext
  3. from math import sqrt

  4. #To run on EMR successfully + output results for Star Wars:
  5. #aws s3 cp s3://sundog-spark/MovieSimilarities1M.py ./
  6. #aws s3 sp c3://sundog-spark/ml-1m/movies.dat ./
  7. #spark-submit --executor-memory 1g MovieSimilarities1M.py 260

  8. def loadMovieNames():
  9.     movieNames = {}
  10.     with open("movies.dat") as f:
  11.         for line in f:
  12.             fields = line.split("::")
  13.             movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
  14.     return movieNames

  15. def makePairs((user, ratings)):
  16.     (movie1, rating1) = ratings[0]
  17.     (movie2, rating2) = ratings[1]
  18.     return ((movie1, movie2), (rating1, rating2))

  19. def filterDuplicates( (userID, ratings) ):
  20.     (movie1, rating1) = ratings[0]
  21.     (movie2, rating2) = ratings[1]
  22.     return movie1 < movie2

  23. def computeCosineSimilarity(ratingPairs):
  24.     numPairs = 0
  25.     sum_xx = sum_yy = sum_xy = 0
  26.     for ratingX, ratingY in ratingPairs:
  27.         sum_xx += ratingX * ratingX
  28.         sum_yy += ratingY * ratingY
  29.         sum_xy += ratingX * ratingY
  30.         numPairs += 1

  31.     numerator = sum_xy
  32.     denominator = sqrt(sum_xx) * sqrt(sum_yy)

  33.     score = 0
  34.     if (denominator):
  35.         score = (numerator / (float(denominator)))

  36.     return (score, numPairs)


  37. conf = SparkConf()
  38. sc = SparkContext(conf = conf)

  39. print("\nLoading movie names...")
  40. nameDict = loadMovieNames()

  41. data = sc.textFile("s3n://sundog-spark/ml-1m/ratings.dat")

  42. # Map ratings to key / value pairs: user ID => movie ID, rating
  43. ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

  44. # Emit every movie rated together by the same user.
  45. # Self-join to find every combination.
  46. ratingsPartitioned = ratings.partitionBy(100)
  47. joinedRatings = ratingsPartitioned.join(ratingsPartitioned)

  48. # At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

  49. # Filter out duplicate pairs
  50. uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

  51. # Now key by (movie1, movie2) pairs.
  52. moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)

  53. # We now have (movie1, movie2) => (rating1, rating2)
  54. # Now collect all ratings for each movie pair and compute similarity
  55. moviePairRatings = moviePairs.groupByKey()

  56. # We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
  57. # Can now compute similarities.
  58. moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()

  59. # Save the results if desired
  60. moviePairSimilarities.sortByKey()
  61. moviePairSimilarities.saveAsTextFile("movie-sims")

  62. # Extract similarities for the movie we care about that are "good".
  63. if (len(sys.argv) > 1):

  64.     scoreThreshold = 0.97
  65.     coOccurenceThreshold = 1000

  66.     movieID = int(sys.argv[1])

  67.     # Filter for movies with this sim that are "good" as defined by
  68.     # our quality thresholds above
  69.     filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
  70.         (pair[0] == movieID or pair[1] == movieID) \
  71.         and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

  72.     # Sort by quality score.
  73.     results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

  74.     print("Top 10 similar movies for " + nameDict[movieID])
  75.     for result in results:
  76.         (sim, pair) = result
  77.         # Display the similarity result that isn't the movie we're looking at
  78.         similarMovieID = pair[0]
  79.         if (similarMovieID == movieID):
  80.             similarMovieID = pair[1]
  81.         print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: "
复制代码

使用道具

8
ReneeBK 发表于 2017-7-7 04:04:16 |只看作者 |坛友微信交流群
  1. from __future__ import print_function

  2. from pyspark.ml.regression import LinearRegression

  3. from pyspark.sql import SparkSession
  4. from pyspark.ml.linalg import Vectors

  5. if __name__ == "__main__":

  6.     # Create a SparkSession (Note, the config section is only for Windows!)
  7.     spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("LinearRegression").getOrCreate()

  8.     # Load up our data and convert it to the format MLLib expects.
  9.     inputLines = spark.sparkContext.textFile("regression.txt")
  10.     data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))

  11.     # Convert this RDD to a DataFrame
  12.     colNames = ["label", "features"]
  13.     df = data.toDF(colNames)

  14.     # Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
  15.     # Perhaps you're importing data from a real database. Or you are using structured streaming
  16.     # to get your data.

  17.     # Let's split our data into training data and testing data
  18.     trainTest = df.randomSplit([0.5, 0.5])
  19.     trainingDF = trainTest[0]
  20.     testDF = trainTest[1]

  21.     # Now create our linear regression model
  22.     lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

  23.     # Train the model using our training data
  24.     model = lir.fit(trainingDF)

  25.     # Now see if we can predict values in our test data.
  26.     # Generate predictions using our linear regression model for all features in our
  27.     # test dataframe:
  28.     fullPredictions = model.transform(testDF).cache()

  29.     # Extract the predictions and the "known" correct labels.
  30.     predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
  31.     labels = fullPredictions.select("label").rdd.map(lambda x: x[0])

  32.     # Zip them together
  33.     predictionAndLabel = predictions.zip(labels).collect()

  34.     # Print out the predicted and actual values for each point
  35.     for prediction in predictionAndLabel:
  36.       print(prediction)


  37.     # Stop the session
  38.     spark.stop()
复制代码

使用道具

9
ReneeBK 发表于 2017-7-7 04:05:40 |只看作者 |坛友微信交流群
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import Row

  3. import collections

  4. # Create a SparkSession (Note, the config section is only for Windows!)
  5. spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("SparkSQL").getOrCreate()

  6. def mapper(line):
  7.     fields = line.split(',')
  8.     return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), age=int(fields[2]), numFriends=int(fields[3]))

  9. lines = spark.sparkContext.textFile("fakefriends.csv")
  10. people = lines.map(mapper)

  11. # Infer the schema, and register the DataFrame as a table.
  12. schemaPeople = spark.createDataFrame(people).cache()
  13. schemaPeople.createOrReplaceTempView("people")

  14. # SQL can be run over DataFrames that have been registered as a table.
  15. teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

  16. # The results of SQL queries are RDDs and support all the normal RDD operations.
  17. for teen in teenagers.collect():
  18.   print(teen)

  19. # We can also use functions instead of SQL queries:
  20. schemaPeople.groupBy("age").count().orderBy("age").show()

  21. spark.stop()
复制代码

使用道具

10
ReneeBK 发表于 2017-7-7 04:07:29 |只看作者 |坛友微信交流群
  1. from pyspark import SparkConf, SparkContext

  2. conf = SparkConf().setMaster("local").setAppName("WordCount")
  3. sc = SparkContext(conf = conf)

  4. input = sc.textFile("file:///sparkcourse/book.txt")
  5. words = input.flatMap(lambda x: x.split())
  6. wordCounts = words.countByValue()

  7. for word, count in wordCounts.items():
  8.     cleanWord = word.encode('ascii', 'ignore')
  9.     if (cleanWord):
  10.         print(cleanWord.decode() + " " + str(count))
复制代码

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-28 05:22