2728 0

第62课:Spark SQL下的Parquet使用最佳实践和代码实战 [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
23032 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、Spark SQL下的Parquet使用最佳实践

1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式:

a)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);

b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);

上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代

2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、GraphX等)-> Parquet -> 其它各种Data Mining等。


二、Parquet的精要介绍

1)Parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:

a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。

c)只读取需要的列,支持向量运算,能够获取更好的扫描性能。


三、代码实战

Java版本:

  1. package com.dt.spark.SparkApps.sql;

  2. import java.util.List;

  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.sql.DataFrame;
  8. import org.apache.spark.sql.Row;
  9. import org.apache.spark.sql.SQLContext;

  10. public class SparkSQLParquetOps {
  11.     public static void main(String[] args) {
  12.         SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLParquetOps");
  13.         JavaSparkContext sc = new JavaSparkContext(conf);
  14.         SQLContext sqlContext = new SQLContext(sc);

  15.         DataFrame usersDF = sqlContext.read().parquet("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\users.parquet");

  16.         /**
  17.          * 注册成为临时表以供后续的SQL查询操作
  18.          */
  19.         usersDF.registerTempTable("users");

  20.         /**
  21.          * 进行数据的多维度分析
  22.          */
  23.         DataFrame result = sqlContext.sql("select * from users");
  24.         JavaRDD<String> resultRDD = result.javaRDD().map(new Function<Row, String>() {

  25.             @Override
  26.             public String call(Row row) throws Exception {
  27.                 return "The name is : " + row.getAs("name");
  28.             }
  29.         });

  30.         /**
  31.          * 第六步:对结果进行处理,包括由DataFrame转换成为RDD<Row>,以及结构持久化
  32.          */
  33.         List<String> listRow = resultRDD.collect();
  34.         for(String row : listRow){
  35.             System.out.println(row);
  36.         }

  37.     }
  38. }
复制代码

Schema Merging

Java版本:

  1. package com.dt.spark.SparkApps.sql;

  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import org.apache.spark.sql.DataFrame;
  9. import org.apache.spark.sql.Row;
  10. import org.apache.spark.sql.RowFactory;
  11. import org.apache.spark.sql.SQLContext;
  12. import org.apache.spark.sql.types.DataTypes;
  13. import org.apache.spark.sql.types.StructField;
  14. import org.apache.spark.sql.types.StructType;
  15. import scala.Tuple2;

  16. import java.util.ArrayList;
  17. import java.util.Arrays;
  18. import java.util.List;

  19. public class SchemaOps {
  20.     public static void main(String[] args) {
  21.         SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByProgramatically");
  22.         JavaSparkContext sc = new JavaSparkContext(conf);
  23.         SQLContext sqlContext = new SQLContext(sc);
  24.         
  25.         // Create a simple DataFrame, stored into a partition directory
  26.         JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1,2,3,4,5));

  27.         PairFunction<Integer,Integer,Integer> df2 = new PairFunction<Integer,Integer,Integer>() {

  28.             @Override
  29.             public Tuple2 call(Integer x) throws Exception {
  30.                 return new Tuple2(x,x * 2);
  31.             }
  32.         };
  33.         JavaPairRDD<Integer,Integer> pairs = lines.mapToPair(df2);

  34.         /**
  35.          * 第一步:在RDD的基础上创建类型为Row的RDD
  36.          */
  37.         JavaRDD<Row> personsRDD = pairs.map(new Function<Tuple2<Integer, Integer>, Row>() {
  38.             @Override
  39.             public Row call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
  40.                 return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2);
  41.             }
  42.         });

  43.         /**
  44.          * 第二步:动态构造DataFrame的元数据,一般而言,有多少列,以及每列的具体类型可能来自于JSON文件
  45.          * 也可能来自于数据库。
  46.          * 指定类型
  47.          */
  48.         List<StructField> structFields = new ArrayList<StructField>();
  49.         structFields.add(DataTypes.createStructField("single",DataTypes.IntegerType,true));
  50.         structFields.add(DataTypes.createStructField("double",DataTypes.IntegerType,true));

  51.         /**
  52.          * 构建StructType用于最后DataFrame元数据的描述
  53.          */
  54.         StructType structType = DataTypes.createStructType(structFields);
  55.         /**
  56.          * 第三步:基于以后的MetaData以及RDD<Row>来构建DataFrame
  57.          */
  58.         DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType);

  59.         personsDF.write().parquet("data/test_table/key=1");

  60.         // Create a simple DataFrame, stored into a partition directory
  61.         JavaRDD<Integer> lines1 = sc.parallelize(Arrays.asList(6,7,8,9,10));

  62.         PairFunction<Integer,Integer,Integer> df3 = new PairFunction<Integer,Integer,Integer>() {

  63.             @Override
  64.             public Tuple2 call(Integer x) throws Exception {
  65.                 return new Tuple2(x,x * 2);
  66.             }
  67.         };
  68.         JavaPairRDD<Integer,Integer> pairs1 = lines.mapToPair(df2);

  69.         /**
  70.          * 第一步:在RDD的基础上创建类型为Row的RDD
  71.          */
  72.         JavaRDD<Row> personsRDD1 = pairs1.map(new Function<Tuple2<Integer, Integer>, Row>() {
  73.             @Override
  74.             public Row call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
  75.                 return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2);
  76.             }
  77.         });

  78.         /**
  79.          * 第二步:动态构造DataFrame的元数据,一般而言,有多少列,以及每列的具体类型可能来自于JSON文件
  80.          * 也可能来自于数据库。
  81.          * 指定类型
  82.          */
  83.         List<StructField> structFields1 = new ArrayList<StructField>();
  84.         structFields.add(DataTypes.createStructField("single",DataTypes.IntegerType,true));
  85.         structFields.add(DataTypes.createStructField("triple",DataTypes.IntegerType,true));

  86.         /**
  87.          * 构建StructType用于最后DataFrame元数据的描述
  88.          */
  89.         StructType structType1 = DataTypes.createStructType(structFields);
  90.         /**
  91.          * 第三步:基于以后的MetaData以及RDD<Row>来构建DataFrame
  92.          */
  93.         DataFrame personsDF1 = sqlContext.createDataFrame(personsRDD1,structType1);

  94.         personsDF1.write().parquet("data/test_table/key=2");

  95.         DataFrame df4 = sqlContext.read().option("mergeSchema","true").parquet("data/test_table");
  96.         df4.printSchema();

  97.     }
  98. }
复制代码

输出结果如下:

  1. root
  2. |--single: integer (nullable = true)
  3. |--double: integer (nullable = true)
  4. |--single2: integer (nullable = true)
  5. |--triple: integer (nullable = true)
  6. |--key: integer (nullable = true)
复制代码

Scala版本:

  1. // sqlContext from the previous example is used in this example.
  2. // This is used to implicitly convert an RDD to a DataFrame.
  3. import sqlContext.implicits._

  4. // Create a simple DataFrame, stored into a partition directory
  5. val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
  6. df1.write.parquet("data/test_table/key=1")

  7. // Create another DataFrame in a new partition directory,
  8. // adding a new column and dropping an existing column
  9. val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
  10. df2.write.parquet("data/test_table/key=2")

  11. // Read the partitioned table
  12. val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
  13. df3.printSchema()

  14. // The final schema consists of all 3 columns in the Parquet files together
  15. // with the partitioning column appeared in the partition directory paths.
  16. // root
  17. // |-- single: int (nullable = true)
  18. // |-- double: int (nullable = true)
  19. // |-- triple: int (nullable = true)
  20. // |-- key : int (nullable = true)
复制代码


注:本学习笔记来自DT大数据梦工厂


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Spark 最佳实践 SPAR Park SPA Spark scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 10 精彩帖子

总评分: 论坛币 + 10   查看全部评分

本帖被以下文库推荐

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-11-10 09:07