3450 0

第60课:使用Java和Scala在IDE中实战RDD和DataFrame动态转换 [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
23032 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

楼主
无量天尊Spark 发表于 2016-6-13 17:52:41 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、作业特别说明

上一次课是静态的方式转化,这一次是动态的转换方式。

列的个数,以及每一列的具体的信息只有在运行的时候才会知道:

生产环境时候,常用的方式:动态转化,

第一:生产环境的时候,一开始不容易确定每一条记录的不同的列的元数据信息,以及这条数据到底有多少列。

第二:生产环境业务容易发生变化,数据类型容易发生变化,这样就能够应对变化的业务。可以应对来自数据库或是其他的文件,这样就不需要改动自己的代码了。


作业:使用scala方式

数据:

1,Michael,29

2,Andy,30

3,Justin,19

~N4V29I%Y8@%HC2S$}B7GMD.png

  1. import org.apache.spark.sql.types.{StructType, StructField, StringType, DataTypes}
  2. import org.apache.spark.sql.{Row, RowFactory, SQLContext}
  3. import org.apache.spark.{SparkContext, SparkConf}
  4. import org.apache.hadoop.mapred.Master
  5. import org.apache.spark.deploy.master.Master
  6. /**
  7.   * Created by hadoop on 2016/3/16.
  8.   * 动态列
  9.   */
  10. object APPP extends App{
  11.   val schemaString = "id name age"
  12.   val conf = new SparkConf().setMaster("local[8]").setAppName("SparkDataFrame")
  13.   val sc = new SparkContext(conf)
  14.   val sqlContext = new SQLContext(sc)
  15.   val lines = sc.textFile("D:\\idea\\spark_dt\\src\\main\\java\\com\\base\\spark\\sql\\transfer\\person.txt")
  16.   val personsRDD = lines.map(_.split(",")).map(p => Row(p(0), p(1).trim, p(2)))
  17.   val schema =
  18.     StructType(
  19.       schemaString.split("\\s+").map(fieldName => StructField(fieldName, StringType, true)))
  20.   val personsDF = sqlContext.createDataFrame(personsRDD, schema)
  21.   personsDF.registerTempTable("persons")
  22.   personsDF.show
  23.   sqlContext.sql("select * from persons where age > 30").collect().foreach(println)
  24.   sqlContext.sql("select count(*),age from persons group by age").collect().foreach(println)
  25. }
复制代码

DataFrames

DataFrame是一个带有列名的分布式数据集合。等同于一张关系型数据库中的表或者R/Python中的data frame,不过在底层做了很多优化;我们可以使用结构化数据文件、Hive tables,外部数据库或者RDDS来构造DataFrames。

将RDD转换成DataFrames有两种方法:

利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。

使用编程接口,构造一个schema并将其应用在已知的RDD上。

1)利用反射推断Schema

Spark SQL能够将含Row对象的RDD转换成DataFrame,并推断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。key定义了表的列名,类型通过看第一列数据来推断。(所以这里RDD的第一列数据不能有缺失)未来版本中将会通过看更多数据来推断数据类型,像现在对JSON文件的处理一样

  1. # sc is an existing SparkContext.from pyspark.sql import SQLContext, Row
  2.                 sqlContext = SQLContext(sc)
  3.                 # Load a text file and convert each line to a Row.
  4.                 lines = sc.textFile("examples/src/main/resources/people.txt")
  5.                 parts = lines.map(lambda l: l.split(","))
  6.                 people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
  7.                 # Infer the schema, and register the DataFrame as a table.
  8.                 schemaPeople = sqlContext.createDataFrame(people)
  9.                 schemaPeople.registerTempTable("people")
  10.                 # SQL can be run over DataFrames that have been registered as a table.
  11.                 teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  12.                 # The results of SQL queries are RDDs and support all the normal RDD operations.
  13.                 teenNames = teenagers.map(lambda p: "Name: " + p.name)for teenName in teenNames.collect():
  14.                   print teenName
复制代码
2)编程指定Schema

通过编程指定Schema需要3步:

1、从原来的RDD创建一个元祖或列表的RDD。

2、用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。

3、通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。

  1. # Import SQLContext and data types
  2. from pyspark.sql import SQLContext
  3. from pyspark.sql.types import *
  4. # sc is an existing SparkContext.
  5. sqlContext = SQLContext(sc)
  6. # Load a text file and convert each line to a tuple.
  7. lines = sc.textFile("examples/src/main/resources/people.txt")
  8. parts = lines.map(lambda l: l.split(","))
  9. people = parts.map(lambda p: (p[0], p[1].strip()))
  10. # The schema is encoded in a string.
  11. schemaString = "name age"

  12. fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
  13. schema = StructType(fields)
  14. # Apply the schema to the RDD.
  15. schemaPeople = sqlContext.createDataFrame(people, schema)
  16. # Register the DataFrame as a table.
  17. schemaPeople.registerTempTable("people")
  18. # SQL can be run over DataFrames that have been registered as a table.
  19. results = sqlContext.sql("SELECT name FROM people")
  20. # The results of SQL queries are RDDs and support all the normal RDD operations.
  21. names = results.map(lambda p: "Name: " + p.name)
  22. for name in names.collect():
  23.   print name
复制代码

3)使用Java实战RDD与DataFrame转换

动态构造有时候有些麻烦:spark开发了一个API就是DataSet,DataSet可以基于RDD,RDD里面有类型。他可以基于这种类型。

sparkSQL+DataFrame+DataSet:三者都相当重要,在2.0的时候编码会使用大量使用DataSet。DataSet上可以直接查询。Spark的核心RDD+DataFrame+DataSet:最终会形成三足鼎立。RDD实际是服务SparkSQL的。DataSet是想要用所有的子框架都用DataSet进行计算。DataSet的底层是无私计划。这就让天然的性能优势体现出来。官方建议使用hiveContext,在功能上比SQLContext的更好更高级的功能。

  1. public class RDD2DataFrameByProgrammatically {
  2.         public static void main(String[] args) {
  3.                 SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByProgrammatically");
  4.                 JavaSparkContext sc = new JavaSparkContext(conf);
  5.                 SQLContext sqlContext = new SQLContext(sc);
  6.                 JavaRDD<String> lines = sc.textFile("E://test.txt");        
  7.                 /** * 第一步:在RDD的基础上创建类型为Row的RDD */
  8.                 JavaRDD<Row> personsRDD = lines.map(new Function<String, Row>() {
  9.                         @Override
  10.                         public Row call(String line) throws Exception {
  11.                                 String[] splited = line.split(",");
  12.                                 return RowFactory.create(Integer.valueOf(splited[0]), splited[1],Integer.valueOf(splited[2]));
  13.                         }
  14.                 });
  15.                 /*** 第二步:动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于JSON文件,也可能来自于DB */
  16.                 List<StructField> structFields = new ArrayList<StructField>();
  17.                 structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
  18.                 structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  19.                 structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
  20.                 //构建StructType,用于最后DataFrame元数据的描述
  21.                 StructType structType =DataTypes.createStructType(structFields);
  22.                
  23.                 /*** 第三步:基于以后的MetaData以及RDD<Row>来构造DataFrame*/
  24.                 DataFrame personsDF = sqlContext.createDataFrame(personsRDD, structType);
  25.                 /** 第四步:注册成为临时表以供后续的SQL查询操作*/
  26.                 personsDF.registerTempTable("persons");
  27.                 /** 第五步,进行数据的多维度分析*/
  28.                 DataFrame result = sqlContext.sql("select * from persons where age >20");
  29.                 /**第六步:对结果进行处理,包括由DataFrame转换成为RDD<Row>,以及结构持久化*/
  30.                 List<Row> listRow = result.javaRDD().collect();
  31.                 for(Row row : listRow){
  32.                         System.out.println(row);
  33.                 }
  34.         }
  35. }
复制代码

4)使用Scala实战RDD与DataFrame转换

  1. import org.apache.spark.{SparkContext, SparkConf}
  2. import org.apache.spark.sql.SQLContext

  3. class RDD2DataFrameByProgrammaticallyScala {
  4.   def main(args:Array[String]): Unit = {
  5.     val conf = new SparkConf()
  6.     conf.setAppName("RDD2DataFrameByProgrammaticallyScala") //设置应用程序的名称,在程序运行的监控界面可以看到名称
  7.     conf.setMaster("local")
  8.     val sc = new SparkContext(conf)
  9.     val sqlContext = new SQLContext(sc)
  10.     val people = sc.textFile("C://Users//DS01//Desktop//persons.txt")
  11.     val schemaString = "name age"
  12.     import org.apache.spark.sql.Row;
  13.     import org.apache.spark.sql.types.{StructType,StructField,StringType};
  14.     val schema = StructType(schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
  15.     val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
  16.     val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)
  17.     peopleDataFrame.registerTempTable("people")
  18.     val results = sqlContext.sql("select name from people")
  19.     results.map(t=>"Name: "+t(0)).collect().foreach(println)
  20.   }
  21. }
复制代码


注:本学习笔记来自DT大数据梦工厂      

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Dataframe Frame SCALA Java Fram Spark scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 10 精彩帖子

总评分: 论坛币 + 10   查看全部评分

本帖被以下文库推荐

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2025-12-31 21:00