楼主: hooli
6272 38

Learning Spark by Holden Karau O'Reilly [推广有奖]

31
ReneeBK 发表于 2016-9-26 00:12:00 |只看作者 |坛友微信交流群
  1. /**
  2. * Illustrates a simple map partition to parse CSV data in Scala
  3. */
  4. package com.oreilly.learningsparkexamples.scala

  5. import java.io.StringReader
  6. import java.io.StringWriter

  7. import org.apache.spark._
  8. import play.api.libs.json._
  9. import play.api.libs.functional.syntax._
  10. import scala.util.parsing.json.JSON
  11. import scala.collection.JavaConversions._

  12. import au.com.bytecode.opencsv.CSVReader
  13. import au.com.bytecode.opencsv.CSVWriter

  14. object BasicParseCsv {
  15.   case class Person(name: String, favouriteAnimal: String)

  16.   def main(args: Array[String]) {
  17.     if (args.length < 3) {
  18.       println("Usage: [sparkmaster] [inputfile] [outputfile]")
  19.       exit(1)
  20.     }
  21.     val master = args(0)
  22.     val inputFile = args(1)
  23.     val outputFile = args(2)
  24.     val sc = new SparkContext(master, "BasicParseCsv", System.getenv("SPARK_HOME"))
  25.     val input = sc.textFile(inputFile)
  26.     val result = input.map{ line =>
  27.       val reader = new CSVReader(new StringReader(line));
  28.       reader.readNext();
  29.     }
  30.     val people = result.map(x => Person(x(0), x(1)))
  31.     val pandaLovers = people.filter(person => person.favouriteAnimal == "panda")
  32.     pandaLovers.map(person => List(person.name, person.favouriteAnimal).toArray).mapPartitions{people =>
  33.       val stringWriter = new StringWriter();
  34.       val csvWriter = new CSVWriter(stringWriter);
  35.       csvWriter.writeAll(people.toList)
  36.       Iterator(stringWriter.toString)
  37.     }.saveAsTextFile(outputFile)
  38.   }
  39. }
复制代码

使用道具

32
ReneeBK 发表于 2016-9-26 00:12:39 |只看作者 |坛友微信交流群
  1. /**
  2. * Illustrates a simple map partition to parse JSON data in Scala
  3. * Loads the data into a case class with the name and a boolean flag
  4. * if the person loves pandas.
  5. */
  6. package com.oreilly.learningsparkexamples.scala

  7. import org.apache.spark._
  8. import play.api.libs.json._
  9. import play.api.libs.functional.syntax._

  10. object BasicParseJson {
  11.   case class Person(name: String, lovesPandas: Boolean)
  12.   implicit val personReads = Json.format[Person]

  13.   def main(args: Array[String]) {
  14.     if (args.length < 3) {
  15.       println("Usage: [sparkmaster] [inputfile] [outputfile]")
  16.       exit(1)
  17.       }
  18.     val master = args(0)
  19.     val inputFile = args(1)
  20.     val outputFile = args(2)
  21.     val sc = new SparkContext(master, "BasicParseJson", System.getenv("SPARK_HOME"))
  22.     val input = sc.textFile(inputFile)
  23.     val parsed = input.map(Json.parse(_))
  24.     // We use asOpt combined with flatMap so that if it fails to parse we
  25.     // get back a None and the flatMap essentially skips the result.
  26.     val result = parsed.flatMap(record => personReads.reads(record).asOpt)
  27.     result.filter(_.lovesPandas).map(Json.toJson(_)).saveAsTextFile(outputFile)
  28.     }
  29. }
复制代码

使用道具

33
ReneeBK 发表于 2016-9-26 00:17:00 |只看作者 |坛友微信交流群
  1. /**
  2. * Illustrates a simple map partition to parse JSON data in Scala
  3. * Loads the data into a case class with the name and a boolean flag
  4. * if the person loves pandas.
  5. */
  6. package com.oreilly.learningsparkexamples.scala

  7. import org.apache.spark._
  8. import com.fasterxml.jackson.module.scala.DefaultScalaModule
  9. import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
  10. import com.fasterxml.jackson.databind.ObjectMapper
  11. import com.fasterxml.jackson.databind.DeserializationFeature



  12. case class Person(name: String, lovesPandas: Boolean) // Note: must be a top level class

  13. object BasicParseJsonWithJackson {

  14.   def main(args: Array[String]) {
  15.     if (args.length < 3) {
  16.       println("Usage: [sparkmaster] [inputfile] [outputfile]")
  17.       exit(1)
  18.       }
  19.     val master = args(0)
  20.     val inputFile = args(1)
  21.     val outputFile = args(2)
  22.     val sc = new SparkContext(master, "BasicParseJsonWithJackson", System.getenv("SPARK_HOME"))
  23.     val input = sc.textFile(inputFile)

  24.     // Parse it into a specific case class. We use mapPartitions beacuse:
  25.     // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
  26.     //     on the driver and have to send data back to the driver to go through the singleton object.
  27.     //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
  28.     // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
  29.     //     partition with mapPartitions. Solves serialization and object creation performance hit.
  30.     val result = input.mapPartitions(records => {
  31.         // mapper object created on each executor node
  32.         val mapper = new ObjectMapper with ScalaObjectMapper
  33.         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  34.         mapper.registerModule(DefaultScalaModule)
  35.         // We use flatMap to handle errors
  36.         // by returning an empty list (None) if we encounter an issue and a
  37.         // list with one element if everything is ok (Some(_)).
  38.         records.flatMap(record => {
  39.           try {
  40.             Some(mapper.readValue(record, classOf[Person]))
  41.           } catch {
  42.             case e: Exception => None
  43.           }
  44.         })
  45.     }, true)
  46.     result.filter(_.lovesPandas).mapPartitions(records => {
  47.       val mapper = new ObjectMapper with ScalaObjectMapper
  48.       mapper.registerModule(DefaultScalaModule)
  49.       records.map(mapper.writeValueAsString(_))
  50.     })
  51.       .saveAsTextFile(outputFile)
  52.     }
  53. }
复制代码

使用道具

34
ReneeBK 发表于 2016-9-26 00:17:35 |只看作者 |坛友微信交流群
  1. /**
  2. * Illustrates a simple map partition to parse CSV data in Scala
  3. */
  4. package com.oreilly.learningsparkexamples.scala

  5. import java.io.StringReader

  6. import org.apache.spark._
  7. import play.api.libs.json._
  8. import play.api.libs.functional.syntax._
  9. import scala.util.parsing.json.JSON
  10. import scala.collection.JavaConversions._
  11. import au.com.bytecode.opencsv.CSVReader

  12. object BasicParseWholeFileCsv {
  13.   def main(args: Array[String]) {
  14.     if (args.length < 2) {
  15.       println("Usage: [sparkmaster] [inputfile]")
  16.       exit(1)
  17.     }
  18.     val master = args(0)
  19.     val inputFile = args(1)
  20.     val sc = new SparkContext(master, "BasicParseWholeFileCsv", System.getenv("SPARK_HOME"))
  21.     val input = sc.wholeTextFiles(inputFile)
  22.     val result = input.flatMap{ case (_, txt) =>
  23.       val reader = new CSVReader(new StringReader(txt));
  24.       reader.readAll()
  25.     }
  26.     println(result.collect().map(_.toList).mkString(","))
  27.     }
  28. }
复制代码

使用道具

35
iid_garch 发表于 2016-11-30 16:22:04 |只看作者 |坛友微信交流群

使用道具

36
十七里香 发表于 2019-1-14 09:48:40 |只看作者 |坛友微信交流群
感谢楼主的分享~~~~

使用道具

37
胡明敏 发表于 2019-3-27 13:12:33 |只看作者 |坛友微信交流群
谢谢分享

使用道具

38
yuezzyy 发表于 2019-7-5 11:54:12 |只看作者 |坛友微信交流群
kankanakanka

使用道具

39
Whatsappp 企业认证  学生认证  发表于 2019-9-23 23:51:19 来自手机 |只看作者 |坛友微信交流群
Thanks for sharing

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-24 09:19