一、作业特别说明
上一次课是静态的方式转化,这一次是动态的转换方式。
列的个数,以及每一列的具体的信息只有在运行的时候才会知道:
生产环境时候,常用的方式:动态转化,
第一:生产环境的时候,一开始不容易确定每一条记录的不同的列的元数据信息,以及这条数据到底有多少列。
第二:生产环境业务容易发生变化,数据类型容易发生变化,这样就能够应对变化的业务。可以应对来自数据库或是其他的文件,这样就不需要改动自己的代码了。
作业:使用scala方式
数据:
1,Michael,29
2,Andy,30
3,Justin,19
- import org.apache.spark.sql.types.{StructType, StructField, StringType, DataTypes}
- import org.apache.spark.sql.{Row, RowFactory, SQLContext}
- import org.apache.spark.{SparkContext, SparkConf}
- import org.apache.hadoop.mapred.Master
- import org.apache.spark.deploy.master.Master
- /**
- * Created by hadoop on 2016/3/16.
- * 动态列
- */
- object APPP extends App{
- val schemaString = "id name age"
- val conf = new SparkConf().setMaster("local[8]").setAppName("SparkDataFrame")
- val sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
- val lines = sc.textFile("D:\\idea\\spark_dt\\src\\main\\java\\com\\base\\spark\\sql\\transfer\\person.txt")
- val personsRDD = lines.map(_.split(",")).map(p => Row(p(0), p(1).trim, p(2)))
- val schema =
- StructType(
- schemaString.split("\\s+").map(fieldName => StructField(fieldName, StringType, true)))
- val personsDF = sqlContext.createDataFrame(personsRDD, schema)
- personsDF.registerTempTable("persons")
- personsDF.show
- sqlContext.sql("select * from persons where age > 30").collect().foreach(println)
- sqlContext.sql("select count(*),age from persons group by age").collect().foreach(println)
- }
DataFrames
DataFrame是一个带有列名的分布式数据集合。等同于一张关系型数据库中的表或者R/Python中的data frame,不过在底层做了很多优化;我们可以使用结构化数据文件、Hive tables,外部数据库或者RDDS来构造DataFrames。
将RDD转换成DataFrames有两种方法:
利用反射来推断包含特定类型对象的RDD的schema。这种方法会简化代码并且在你已经知道schema的时候非常适用。
使用编程接口,构造一个schema并将其应用在已知的RDD上。
1)利用反射推断Schema
Spark SQL能够将含Row对象的RDD转换成DataFrame,并推断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。key定义了表的列名,类型通过看第一列数据来推断。(所以这里RDD的第一列数据不能有缺失)未来版本中将会通过看更多数据来推断数据类型,像现在对JSON文件的处理一样
- # sc is an existing SparkContext.from pyspark.sql import SQLContext, Row
- sqlContext = SQLContext(sc)
- # Load a text file and convert each line to a Row.
- lines = sc.textFile("examples/src/main/resources/people.txt")
- parts = lines.map(lambda l: l.split(","))
- people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
- # Infer the schema, and register the DataFrame as a table.
- schemaPeople = sqlContext.createDataFrame(people)
- schemaPeople.registerTempTable("people")
- # SQL can be run over DataFrames that have been registered as a table.
- teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
- # The results of SQL queries are RDDs and support all the normal RDD operations.
- teenNames = teenagers.map(lambda p: "Name: " + p.name)for teenName in teenNames.collect():
- print teenName
通过编程指定Schema需要3步:
1、从原来的RDD创建一个元祖或列表的RDD。
2、用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。
3、通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。
- # Import SQLContext and data types
- from pyspark.sql import SQLContext
- from pyspark.sql.types import *
- # sc is an existing SparkContext.
- sqlContext = SQLContext(sc)
- # Load a text file and convert each line to a tuple.
- lines = sc.textFile("examples/src/main/resources/people.txt")
- parts = lines.map(lambda l: l.split(","))
- people = parts.map(lambda p: (p[0], p[1].strip()))
- # The schema is encoded in a string.
- schemaString = "name age"
- fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
- schema = StructType(fields)
- # Apply the schema to the RDD.
- schemaPeople = sqlContext.createDataFrame(people, schema)
- # Register the DataFrame as a table.
- schemaPeople.registerTempTable("people")
- # SQL can be run over DataFrames that have been registered as a table.
- results = sqlContext.sql("SELECT name FROM people")
- # The results of SQL queries are RDDs and support all the normal RDD operations.
- names = results.map(lambda p: "Name: " + p.name)
- for name in names.collect():
- print name
3)使用Java实战RDD与DataFrame转换
动态构造有时候有些麻烦:spark开发了一个API就是DataSet,DataSet可以基于RDD,RDD里面有类型。他可以基于这种类型。
sparkSQL+DataFrame+DataSet:三者都相当重要,在2.0的时候编码会使用大量使用DataSet。DataSet上可以直接查询。Spark的核心RDD+DataFrame+DataSet:最终会形成三足鼎立。RDD实际是服务SparkSQL的。DataSet是想要用所有的子框架都用DataSet进行计算。DataSet的底层是无私计划。这就让天然的性能优势体现出来。官方建议使用hiveContext,在功能上比SQLContext的更好更高级的功能。
- public class RDD2DataFrameByProgrammatically {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByProgrammatically");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- JavaRDD<String> lines = sc.textFile("E://test.txt");
- /** * 第一步:在RDD的基础上创建类型为Row的RDD */
- JavaRDD<Row> personsRDD = lines.map(new Function<String, Row>() {
- @Override
- public Row call(String line) throws Exception {
- String[] splited = line.split(",");
- return RowFactory.create(Integer.valueOf(splited[0]), splited[1],Integer.valueOf(splited[2]));
- }
- });
- /*** 第二步:动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于JSON文件,也可能来自于DB */
- List<StructField> structFields = new ArrayList<StructField>();
- structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
- structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
- structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
- //构建StructType,用于最后DataFrame元数据的描述
- StructType structType =DataTypes.createStructType(structFields);
-
- /*** 第三步:基于以后的MetaData以及RDD<Row>来构造DataFrame*/
- DataFrame personsDF = sqlContext.createDataFrame(personsRDD, structType);
- /** 第四步:注册成为临时表以供后续的SQL查询操作*/
- personsDF.registerTempTable("persons");
- /** 第五步,进行数据的多维度分析*/
- DataFrame result = sqlContext.sql("select * from persons where age >20");
- /**第六步:对结果进行处理,包括由DataFrame转换成为RDD<Row>,以及结构持久化*/
- List<Row> listRow = result.javaRDD().collect();
- for(Row row : listRow){
- System.out.println(row);
- }
- }
- }
4)使用Scala实战RDD与DataFrame转换
- import org.apache.spark.{SparkContext, SparkConf}
- import org.apache.spark.sql.SQLContext
- class RDD2DataFrameByProgrammaticallyScala {
- def main(args:Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setAppName("RDD2DataFrameByProgrammaticallyScala") //设置应用程序的名称,在程序运行的监控界面可以看到名称
- conf.setMaster("local")
- val sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
- val people = sc.textFile("C://Users//DS01//Desktop//persons.txt")
- val schemaString = "name age"
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.types.{StructType,StructField,StringType};
- val schema = StructType(schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))
- val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))
- val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)
- peopleDataFrame.registerTempTable("people")
- val results = sqlContext.sql("select name from people")
- results.map(t=>"Name: "+t(0)).collect().foreach(println)
- }
- }
注:本学习笔记来自DT大数据梦工厂


雷达卡



京公网安备 11010802022788号







