3565 0

第59课:使用Java和Scala在IDE中实战RDD和DataFrame [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
23032 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

楼主
无量天尊Spark 发表于 2016-6-13 17:07:26 |AI写论文

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、Java方式:

  1. public class RDD2DataFrameByReflection {
  2.     public static void main(String args[]){
  3.         SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
  4.         JavaSparkContext sc = new JavaSparkContext(conf);
  5.         SQLContext sqlContext = new SQLContext(sc);

  6.         JavaRDD<String> lines = sc.textFile("D:\\idea\\spark_dt\\src\\main\\java\\com\\base\\spark\\sql\\transfer\\person.txt");

  7.         JavaRDD<Person_> persons = lines.map((Function<String, Person_>) line -> {
  8.             String[] splited = line.split(",");
  9.             Person_ p = new Person_();
  10.             p.setId(Integer.valueOf(splited[0]));
  11.             p.setName(splited[1]);
  12.             p.setAge(Integer.valueOf(splited[0]));
  13.             return p;
  14.         });
  15.         //在底层通过反射的方式获得P鹅绒的所有的fields,结合RDD本身,就生成了DataFrame
  16.         DataFrame df = sqlContext.createDataFrame(persons, Person_.class);
  17.         df.show();
  18.         df.registerTempTable("persons");
  19.         DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");
  20.         JavaRDD<Row> bigDataRDD= bigDatas.toJavaRDD();
  21.         JavaRDD<Person_> personRDD = bigDataRDD.map((Function<Row, Person_>) row -> {
  22.             Person_ p = new Person_();
  23.             p.setId(row.getInt(0));
  24.             p.setName(row.getString(1));
  25.             p.setAge(row.getInt(2));
  26.             return p;
  27.         });
  28.         personRDD.collect().forEach(System.out::println);

  29.     }
  30. }

  31. class Person_{
  32.     public int getId() {
  33.         return id;
  34.     }

  35.     public void setId(Integer id) {
  36.         this.id = id;
  37.     }

  38.     public String getName() {
  39.         return name;
  40.     }

  41.     public void setName(String name) {
  42.         this.name = name;
  43.     }

  44.     public int getAge() {
  45.         return age;
  46.     }

  47.     public void setAge(Integer age) {
  48.         this.age = age;
  49.     }

  50.     @Override
  51.     public String toString() {
  52.         return "Person{" +
  53.                 "id=" + id +
  54.                 ", name='" + name + '\'' +
  55.                 ", age=" + age +
  56.                 '}';
  57.     }

  58.     public Person_(Integer id, String name, Integer age) {
  59.         this.id = id;
  60.         this.name = name;
  61.         this.age = age;
  62.     }

  63.     Person_(){}
  64.     private Integer id;
  65.     private String name;
  66.     private Integer age;
  67.    }
复制代码

运行结果如下:

  1. 16/03/25 05:59:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1$anonfun$apply$1 can not access a member of class com.base.spark.sql.transfer.Person_ with modifiers "public"
  2.         at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
复制代码

解决办法:Person_类需要Public修饰符,将Person_放到单独的文件

运行结果如下:

  1. 16/03/25 06:04:46 INFO DAGScheduler: Job 0 finished: show at RDD2DataFrameByReflection.java:32, took 0.178583 s
  2. +---+---+------+
  3. |age| id|  name|
  4. +---+---+------+
  5. |  1|  1| spark|
  6. |  2|  2|hadoop|
  7. |  3|  3| flink|
  8. |  4|  4| spark|
  9. |  5|  5|hadoop|
  10. |  6|  6| spark|
  11. |  7|  7| spark|
  12. |  8|  8| spark|
  13. |  9|  9| scala|
  14. | 10| 10|  java|
  15. | 11| 11| spark|
  16. +---+---+------+

  17. 16/03/25 06:04:47 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:43
  18. 16/03/25 06:04:47 INFO DAGScheduler: Got job 1 (collect at RDD2DataFrameByReflection.java:43) with 1 output partitions
  19. 16/03/25 06:04:47 INFO DAGScheduler: Final stage: ResultStage 1 (collect at RDD2DataFrameByReflection.java:43)
  20. 16/03/25 06:04:47 INFO DAGScheduler: Parents of final stage: List()
  21. 16/03/25 06:04:47 INFO DAGScheduler: Missing parents: List()
  22. 16/03/25 06:04:47 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at map at RDD2DataFrameByReflection.java:36), which has no missing parents
  23. 16/03/25 06:04:47 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 9.5 KB, free 135.6 KB)
  24. 16/03/25 06:04:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.9 KB, free 140.5 KB)
  25. 16/03/25 06:04:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56952 (size: 4.9 KB, free: 1771.1 MB)
  26. 16/03/25 06:04:47 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
  27. 16/03/25 06:04:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at map at RDD2DataFrameByReflection.java:36)
  28. 16/03/25 06:04:47 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
  29. 16/03/25 06:04:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2177 bytes)
  30. 16/03/25 06:04:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
  31. 16/03/25 06:04:47 INFO HadoopRDD: Input split: file:/D:/idea/spark_dt/src/main/java/com/base/spark/sql/transfer/person.txt:0+131
  32. 16/03/25 06:04:47 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56952 in memory (size: 3.1 KB, free: 1771.1 MB)
  33. 16/03/25 06:04:47 INFO ContextCleaner: Cleaned accumulator 1
  34. 16/03/25 06:04:47 INFO GeneratePredicate: Code generated in 240.433533 ms
  35. 16/03/25 06:04:47 INFO GenerateUnsafeProjection: Code generated in 13.091797 ms
  36. 16/03/25 06:04:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
  37. java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
复制代码

错误原因:返回的ROW中的字段并没有按照Person_字段的顺序排列

Row中的字段顺序:

T@1UJ_T_[(Y0KHY13JI`%W6.png

可以看到这里scheme的顺序并不是我们定义的字段顺序

解决办法:

  1. /**
  2. * Returns the value of a given fieldName.
  3. * For primitive types if value is null it returns 'zero value' specific for primitive
  4. * ie. 0 for Int - use isNullAt to ensure that value is not null
  5. *
  6. * @throws UnsupportedOperationException when schema is not defined.
  7. * @throws IllegalArgumentException when fieldName do not exist.
  8. * @throws ClassCastException when data type does not match.
  9. */
  10. def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName))
复制代码

可以看到这里Row 提供了根据Row中的字段名称来获取字段的值

修改代码:

  1. JavaRDD<Person_> personRDD = bigDataRDD.map((Function<Row, Person_>) row -> {
  2.     Person_ p = new Person_();
  3.     p.setId(row.getAs("id"));
  4.     p.setName(row.getAs("name"));
  5.     p.setAge(row.getAs("age"));
  6.     return p;
  7. });
复制代码

运行结果:

  1. java.io.NotSerializableException: com.base.spark.sql.transfer.Person_
  2. Serialization stack:
  3.         - object not serializable (class: com.base.spark.sql.transfer.Person_, value: Person{id=6, name='spark', age=6})
  4.         - element of array (index: 0)
  5.         - array (class [Ljava.lang.Object;, size 6)
  6.         at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  7.         at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  8.         at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  9.         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
  10.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  11.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  12.         at java.lang.Thread.run(Thread.java:745)
复制代码

提示我们Person_ 没有序列化

解决办法:

让Person_ 实现Serializable 接口

再次运行:

  1. 16/03/25 06:18:00 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
  2. 16/03/25 06:18:00 INFO DAGScheduler: ResultStage 1 (collect at RDD2DataFrameByReflection.java:43) finished in 0.347 s
  3. 16/03/25 06:18:00 INFO DAGScheduler: Job 1 finished: collect at RDD2DataFrameByReflection.java:43, took 0.356079 s
  4. Person{id=6, name='spark', age=6}
  5. Person{id=7, name='spark', age=7}
  6. Person{id=8, name='spark', age=8}
  7. Person{id=9, name='scala', age=9}
  8. Person{id=10, name='java', age=10}
  9. Person{id=11, name='spark', age=11}
复制代码

这次运行成功了

总结:

JavaRDD和DataFrame互相转换注意事项:

1)反射的类必须是Pulicclass

2)定义的类必须实现Serializable 接口

3)DataFrame 转换成RDD时注意根据fieldName 获取对应的值


二、scala方式

8WH8UK}(4XCBKNXG9@{~4FQ.png

注意:这里lines.map 返回的RDD并没有.toDF 方法,这里用到了隐式转换

  1. /**
  2. * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
  3. * @since 1.3.0
  4. */
  5. implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {
  6.   DataFrameHolder(_sqlContext.createDataFrame(rdd))
  7. }
复制代码

隐式转换最终将RDD转换成了DataFrameHolder 然后调用toDF 最终将RDD转换成了DataFrame了

  1. case class DataFrameHolder private[sql](private val df: DataFrame) {

  2.   // This is declared with parentheses to prevent the Scala compiler from treating
  3.   // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
  4.   def toDF(): DataFrame = df

  5.   def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)
  6. }
复制代码



注:本学习笔记来自DT大数据梦工厂        微信公众号:DT_Spark        每晚8点YY永久直播频道:68917580

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:Dataframe Frame SCALA Fram Data Spark scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 10 精彩帖子

总评分: 论坛币 + 10   查看全部评分

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2026-1-1 09:40