楼主: Lisrelchen
1359 9

[Github]Mastering Apache Spark 2.x - Second Edition [推广有奖]

  • 0关注
  • 62粉丝

VIP

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
49957 个
通用积分
79.5487
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
Mastering Apache Spark 2.x - Second Edition

This is the code repository for Mastering Apache Spark 2.x - Second Edition, published by Packt. It contains all the supporting project files necessary to work through the book from start to finish.

About the Book

Apache Spark is an in-memory cluster based parallel processing system that provides a wide range of functionality like graph processing, machine learning, stream processing and SQL. This book aims to take your limited knowledge of Spark to the next level by teaching you how to expand Spark functionality and implement your data flows and machine/deep learning programs on top of the platform.

Instructions and Navigation

All of the code is organized into folders. Each folder starts with a number followed by the application name. For example, Chapter02.

The code will look like the following:

import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConf

You will need the following to work with the examples in this book:

  • A laptop or PC with at least 6 GB main memory running Windows, macOS, or Linux

  • VirtualBox 5.1.22 or above

  • Hortonworks HDP Sandbox V2.6 or above

  • Eclipse Neon or above

  • Maven

  • Eclipse Maven Plugin

  • Eclipse Scala Plugin

  • Eclipse Git Plugin


Related Products

本帖隐藏的内容

https://github.com/PacktPublishing/Mastering-Apache-Spark-2x



二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Apache Spark Mastering Edition apache GitHub

本帖被以下文库推荐

沙发
Lisrelchen 发表于 2017-8-22 08:10:39 |只看作者 |坛友微信交流群
  1. case class Client(
  2.     age: Long,
  3.     countryCode: String,
  4.     familyName: String,
  5.     id: String,
  6.     name: String
  7.     )

  8. val clientDs = spark.read.json("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client.json").as[Client]

  9. val clientDsBig = spark.read.json("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_big.json").as[Client]

  10. case class Account(
  11.     balance: Long,
  12.     id: String,
  13.     clientId: String
  14.     )

  15. val accountDs = spark.read.json("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account.json").as[Account]

  16. val accountDsBig = spark.read.json("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account_big.json").as[Account]

  17. clientDs.createOrReplaceTempView("client")
  18. clientDsBig.createOrReplaceTempView("clientbig")

  19. accountDs.createOrReplaceTempView("account")
  20. accountDsBig.createOrReplaceTempView("accountbig")

  21. //clientDs.write.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client.parquet")

  22. //clientDsBig.write.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_big.parquet")

  23. //accountDs.write.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account.parquet")

  24. //accountDsBig.write.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account_big.parquet")

  25. //clientDsBig.write.csv("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_big.csv")

  26. //accountDsBig.write.csv("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account_big.csv")


  27. val clientDsParquet = spark.read.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client.parquet").as[Client]

  28. val clientDsBigParquet = spark.read.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_big.parquet").as[Client]

  29. val accountDsParquet = spark.read.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account.parquet").as[Account]

  30. val accountDsBigParquet = spark.read.parquet("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account_big.parquet").as[Account]

  31. val clientDsBigCsv = spark.read.csv("/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_big.csv").as[Client]

  32. clientDsParquet.createOrReplaceTempView("clientparquet")
  33. clientDsBigParquet.createOrReplaceTempView("clientbigparquet")

  34. accountDsParquet.createOrReplaceTempView("accountparquet")
  35. accountDsBigParquet.createOrReplaceTempView("accountbigparquet")

  36. spark.sql("select c.familyName from clientbigparquet c inner join accountbigparquet a on c.id=a.clientId").explain
  37. == Physical Plan ==
  38. *Project [familyName#79]
  39. +- *BroadcastHashJoin [id#80], [clientId#106], Inner, BuildRight
  40.    :- *Project [familyName#79, id#80]
  41.    :  +- *Filter isnotnull(id#80)
  42.    :     +- *BatchedScan parquet [familyName#79,id#80] Format: ParquetFormat, InputPaths: file:/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_b..., PushedFilters: [IsNotNull(id)], ReadSchema: struct<familyName:string,id:string>
  43.    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
  44.       +- *Project [clientId#106]
  45.          +- *Filter isnotnull(clientId#106)
  46.             +- *BatchedScan parquet [clientId#106] Format: ParquetFormat, InputPaths: file:/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account_..., PushedFilters: [IsNotNull(clientId)], ReadSchema: struct<clientId:string>


  47. spark.sql("select c.familyName from clientbig c inner join accountbig a on c.id=a.clientId").explain


  48. spark.sql("select count(*) from clientbigparquet c inner join accountbigparquet a on c.id=a.clientId").explain
  49. == Physical Plan ==
  50. *HashAggregate(keys=[], functions=[count(1)])
  51. +- Exchange SinglePartition
  52.    +- *HashAggregate(keys=[], functions=[partial_count(1)])
  53.       +- *Project
  54.          +- *BroadcastHashJoin [id#199], [clientId#225], Inner, BuildRight
  55.             :- *Project [id#199]
  56.             :  +- *Filter isnotnull(id#199)
  57.             :     +- *BatchedScan parquet [id#199] Format: ParquetFormat, InputPaths: file:/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/client_b..., PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:string>
  58.             +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
  59.                +- *Project [clientId#225]
  60.                   +- *Filter isnotnull(clientId#225)
  61.                      +- *BatchedScan parquet [clientId#225] Format: ParquetFormat, InputPaths: file:/Users/romeokienzler/Documents/romeo/Dropbox/arbeit/spark/sparkbuch/mywork/chapter3/account_..., PushedFilters: [IsNotNull(clientId)], ReadSchema: struct<clientId:string>

  62. spark.sql("select count(*) from clientbig c inner join accountbigparquet a on c.id=a.clientId").explain


  63. ---------
  64. private[sql] case class JDBCRelation(
  65.     parts: Array[Partition], jdbcOptions: JDBCOptions)(@transient val sparkSession: SparkSession)
  66.   extends BaseRelation
  67.   with PrunedFilteredScan
  68.   with InsertableRelation {
  69.       
  70.       
  71. --
  72.       
  73.   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
  74.     // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
  75.     JDBCRDD.scanTable(
  76.       sparkSession.sparkContext,
  77.       schema,
  78.       requiredColumns,
  79.       filters,
  80.       parts,
  81.       jdbcOptions).asInstanceOf[RDD[Row]]
  82.   }

  83. --
  84.       JDBCRDD.scala
  85.       
  86. "We'll skip the scanTable method for now since it just parameterizes and creates a
  87. new JDBCRDD object. So the most interesting method in JDBCRDD is compute which it inherrits from the abstract RDD class. Through the compute method ApacheSpark tells this RDD please go out of lazy mode and materialize yourself. We'll show you two important fractions of this methods after we had a look at the method signature TODO contract? "
  88.       
  89.   override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = {
  90.    
  91. "Here you can see that the return type is of Iterator which allows a lazy underlying data source to be read lazy as well. As we can see soon this is the case for this particular implementation as well"      

  92.     val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
  93.     stmt = conn.prepareStatement(sqlText,
  94.         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
  95.     stmt.setFetchSize(options.fetchSize)
  96.     rs = stmt.executeQuery()
  97.       
  98. "Note that the SQL statement created and stored in the sqlText constant is referencing two interesting variables. columnList and myWhereClause. Both are derived from the requiredColumns and filter arguments passed to the JDBCRelation class. Therefore this data source can be called a smart source because the underlying storage technology - a SQL data base in this case - can be told to only return columns and rows which are actually requested. And as already mentione, the data source supports passing lazy data access patterns to be pushed to the underlying data base as well. Here you can see that the JDBC result set is wrapped into a typed InternalRow iterator Iterator[InternalRow]]. Since this matches the return type of the comput method we are done here."
  99.       
  100.     val rowsIterator = JdbcUtils.resultSetToSparkInternalRows(rs, schema, inputMetrics)

  101.     CompletionIterator[InternalRow, Iterator[InternalRow]](rowsIterator, close())
  102. ------------
  103.       
  104. Catalog
  105. "Internally ApacheSpark uses the class org.apache.spark.sql.catalyst.catalog.SessionCatalog for managing temporary views as well as persistent tables. Temporary views are stored int the SparkSession object as persistent tables are stored in an external meta-store. The abstract base class org.apache.spark.sql.catalyst.catalog.ExternalCatalog is extended for varoius meta store providers. TODO One is for using ApacheDerby and another one is for the ApacheHive metastore but anyone could extend this class and make ApacheSpark to use another meta store as well."

  106. ----------
  107. Unresolved plan
  108.    
  109. "An unresolved plan basically is the first tree created from either SQL statements or the relation API of DataFrames and Datasets. It is mainly composed of subtypes of LeafExpression object which are bound together by Expression object therefore forming a tree of TreeNode objects since all these objects are subtypes of TreeNode. Overall this data structure is a LogicalPlan which is therefor reflected as a LogicalPlan object. Note that the LogicalPlan extends QueryPlan and QueryPlan itself is a TreeNode again. In other words, a LogicalPlan is nothing else than a set of TreeNode objects    "  
  110.       
  111.       //TODO insert inheritance tree of treenode B05868_03_01.png B05868_03_02.png
  112.       
复制代码

使用道具

藤椅
西门高 发表于 2017-8-22 08:13:02 |只看作者 |坛友微信交流群
谢谢分享

使用道具

板凳
MouJack007 发表于 2017-8-22 08:56:38 |只看作者 |坛友微信交流群
谢谢楼主分享!

使用道具

报纸
MouJack007 发表于 2017-8-22 08:58:27 |只看作者 |坛友微信交流群

使用道具

地板
lianqu 发表于 2017-8-22 09:20:34 |只看作者 |坛友微信交流群

使用道具

7
钱学森64 发表于 2017-8-22 11:21:33 |只看作者 |坛友微信交流群
谢谢分享

使用道具

8
seoulcityyxx 发表于 2017-8-22 15:10:33 |只看作者 |坛友微信交流群
过来看一看

使用道具

9
zhw199107 发表于 2018-1-16 10:32:25 |只看作者 |坛友微信交流群
谢谢分享

使用道具

10
qmathews 发表于 2020-5-11 17:11:57 |只看作者 |坛友微信交流群

谢谢楼主分享!

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注jltj
拉您入交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-27 22:11