第59课:使用Java和Scala在IDE中实战RDD和DataFrame-经管之家官网!

人大经济论坛-经管之家 收藏本站
您当前的位置> 期刊>>

期刊库

>>

第59课:使用Java和Scala在IDE中实战RDD和DataFrame

第59课:使用Java和Scala在IDE中实战RDD和DataFrame

发布:无量天尊Spark | 分类:期刊库

关于本站

人大经济论坛-经管之家:分享大学、考研、论文、会计、留学、数据、经济学、金融学、管理学、统计学、博弈论、统计年鉴、行业分析包括等相关资源。
经管之家是国内活跃的在线教育咨询平台!

经管之家新媒体交易平台

提供"微信号、微博、抖音、快手、头条、小红书、百家号、企鹅号、UC号、一点资讯"等虚拟账号交易,真正实现买卖双方的共赢。【请点击这里访问】

提供微信号、微博、抖音、快手、头条、小红书、百家号、企鹅号、UC号、一点资讯等虚拟账号交易,真正实现买卖双方的共赢。【请点击这里访问】

一、Java方式:publicclassRDD2DataFrameByReflection{publicstaticvoidmain(Stringargs[]){SparkConfconf=newSparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");JavaSparkContextsc ...
坛友互助群


扫码加入各岗位、行业、专业交流群


一、Java方式:

  1. public class RDD2DataFrameByReflection {
  2. public static void main(String args[]){
  3. SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
  4. JavaSparkContext sc = new JavaSparkContext(conf);
  5. SQLContext sqlContext = new SQLContext(sc);

  6. JavaRDD<String> lines = sc.textFile("D:\\idea\\spark_dt\\src\\main\\java\\com\\base\\spark\\sql\\transfer\\person.txt");

  7. JavaRDD<Person_> persons = lines.map((Function<String, Person_>) line -> {
  8. String[] splited = line.split(",");
  9. Person_ p = new Person_();
  10. p.setId(Integer.valueOf(splited[0]));
  11. p.setName(splited[1]);
  12. p.setAge(Integer.valueOf(splited[0]));
  13. return p;
  14. });
  15. //在底层通过反射的方式获得P鹅绒的所有的fields,结合RDD本身,就生成了DataFrame
  16. DataFrame df = sqlContext.createDataFrame(persons, Person_.class);
  17. df.show();
  18. df.registerTempTable("persons");
  19. DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");
  20. JavaRDD<Row> bigDataRDD= bigDatas.toJavaRDD();
  21. JavaRDD<Person_> personRDD = bigDataRDD.map((Function<Row, Person_>) row -> {
  22. Person_ p = new Person_();
  23. p.setId(row.getInt(0));
  24. p.setName(row.getString(1));
  25. p.setAge(row.getInt(2));
  26. return p;
  27. });
  28. personRDD.collect().forEach(System.out::println);

  29. }
  30. }

  31. class Person_{
  32. public int getId() {
  33. return id;
  34. }

  35. public void setId(Integer id) {
  36. this.id = id;
  37. }

  38. public String getName() {
  39. return name;
  40. }

  41. public void setName(String name) {
  42. this.name = name;
  43. }

  44. public int getAge() {
  45. return age;
  46. }

  47. public void setAge(Integer age) {
  48. this.age = age;
  49. }

  50. @Override
  51. public String toString() {
  52. return "Person{" +
  53. "id=" + id +
  54. ", name='" + name + '\'' +
  55. ", age=" + age +
  56. '}';
  57. }

  58. public Person_(Integer id, String name, Integer age) {
  59. this.id = id;
  60. this.name = name;
  61. this.age = age;
  62. }

  63. Person_(){}
  64. private Integer id;
  65. private String name;
  66. private Integer age;
  67. }
复制代码

运行结果如下:

  1. 16/03/25 05:59:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1$anonfun$apply$1 can not access a member of class com.base.spark.sql.transfer.Person_ with modifiers "public"
  2. at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
复制代码

解决办法:Person_类需要Public修饰符,将Person_放到单独的文件

运行结果如下:

  1. 16/03/25 06:04:46 INFO DAGScheduler: Job 0 finished: show at RDD2DataFrameByReflection.java:32, took 0.178583 s
  2. +---+---+------+
  3. |age| id|name|
  4. +---+---+------+
  5. |1|1| spark|
  6. |2|2|hadoop|
  7. |3|3| flink|
  8. |4|4| spark|
  9. |5|5|hadoop|
  10. |6|6| spark|
  11. |7|7| spark|
  12. |8|8| spark|
  13. |9|9| scala|
  14. | 10| 10|java|
  15. | 11| 11| spark|
  16. +---+---+------+

  17. 16/03/25 06:04:47 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:43
  18. 16/03/25 06:04:47 INFO DAGScheduler: Got job 1 (collect at RDD2DataFrameByReflection.java:43) with 1 output partitions
  19. 16/03/25 06:04:47 INFO DAGScheduler: Final stage: ResultStage 1 (collect at RDD2DataFrameByReflection.java:43)
  20. 16/03/25 06:04:47 INFO DAGScheduler: Parents of final stage: List()
  21. 16/03/25 06:04:47 INFO DAGScheduler: Missing parents: List()
  22. 16/03/25 06:04:47 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at map at RDD2DataFrameByReflection.java:36), which has no missing parents
  23. 16/03/25 06:04:47 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 9.5 KB, free 135.6 KB)
  24. 16/03/25 06:04:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.9 KB, free 140.5 KB)
  25. 16/03/25 06:04:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56952 (size: 4.9 KB, free: 1771.1 MB)
  26. 16/03/25 06:04:47 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
  27. 16/03/25 06:04:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at map at RDD2DataFrameByReflection.java:36)
  28. 16/03/25 06:04:47 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
  29. 16/03/25 06:04:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2177 bytes)
  30. 16/03/25 06:04:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
  31. 16/03/25 06:04:47 INFO HadoopRDD: Input split: file:/D:/idea/spark_dt/src/main/java/com/base/spark/sql/transfer/person.txt:0+131
  32. 16/03/25 06:04:47 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56952 in memory (size: 3.1 KB, free: 1771.1 MB)
  33. 16/03/25 06:04:47 INFO ContextCleaner: Cleaned accumulator 1
  34. 16/03/25 06:04:47 INFO GeneratePredicate: Code generated in 240.433533 ms
  35. 16/03/25 06:04:47 INFO GenerateUnsafeProjection: Code generated in 13.091797 ms
  36. 16/03/25 06:04:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
  37. java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
复制代码

错误原因:返回的ROW中的字段并没有按照Person_字段的顺序排列

Row中的字段顺序:

可以看到这里scheme的顺序并不是我们定义的字段顺序

解决办法:

  1. /**
  2. * Returns the value of a given fieldName.
  3. * For primitive types if value is null it returns 'zero value' specific for primitive
  4. * ie. 0 for Int - use isNullAt to ensure that value is not null
  5. *
  6. * @throws UnsupportedOperationException when schema is not defined.
  7. * @throws IllegalArgumentException when fieldName do not exist.
  8. * @throws ClassCastException when data type does not match.
  9. */
  10. def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName))
复制代码

可以看到这里Row 提供了根据Row中的字段名称来获取字段的值

修改代码:

  1. JavaRDD<Person_> personRDD = bigDataRDD.map((Function<Row, Person_>) row -> {
  2. Person_ p = new Person_();
  3. p.setId(row.getAs("id"));
  4. p.setName(row.getAs("name"));
  5. p.setAge(row.getAs("age"));
  6. return p;
  7. });
复制代码

运行结果:

  1. java.io.NotSerializableException: com.base.spark.sql.transfer.Person_
  2. Serialization stack:
  3. - object not serializable (class: com.base.spark.sql.transfer.Person_, value: Person{id=6, name='spark', age=6})
  4. - element of array (index: 0)
  5. - array (class [Ljava.lang.Object;, size 6)
  6. at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  7. at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  8. at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  9. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
  10. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  11. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  12. at java.lang.Thread.run(Thread.java:745)
复制代码

提示我们Person_ 没有序列化

解决办法:

让Person_ 实现Serializable 接口

再次运行:

  1. 16/03/25 06:18:00 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
  2. 16/03/25 06:18:00 INFO DAGScheduler: ResultStage 1 (collect at RDD2DataFrameByReflection.java:43) finished in 0.347 s
  3. 16/03/25 06:18:00 INFO DAGScheduler: Job 1 finished: collect at RDD2DataFrameByReflection.java:43, took 0.356079 s
  4. Person{id=6, name='spark', age=6}
  5. Person{id=7, name='spark', age=7}
  6. Person{id=8, name='spark', age=8}
  7. Person{id=9, name='scala', age=9}
  8. Person{id=10, name='java', age=10}
  9. Person{id=11, name='spark', age=11}
复制代码

这次运行成功了

总结:

JavaRDD和DataFrame互相转换注意事项:

1)反射的类必须是Pulicclass

2)定义的类必须实现Serializable 接口

3)DataFrame 转换成RDD时注意根据fieldName 获取对应的值


二、scala方式

注意:这里lines.map 返回的RDD并没有.toDF 方法,这里用到了隐式转换

  1. /**
  2. * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
  3. * @since 1.3.0
  4. */
  5. implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {
  6. DataFrameHolder(_sqlContext.createDataFrame(rdd))
  7. }
复制代码

隐式转换最终将RDD转换成了DataFrameHolder 然后调用toDF 最终将RDD转换成了DataFrame了

  1. case class DataFrameHolder private[sql](private val df: DataFrame) {

  2. // This is declared with parentheses to prevent the Scala compiler from treating
  3. // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
  4. def toDF(): DataFrame = df

  5. def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)
  6. }
复制代码



注:本学习笔记来自DT大数据梦工厂 微信公众号:DT_Spark 每晚8点YY永久直播频道:68917580

扫码或添加微信号:坛友素质互助


「经管之家」APP:经管人学习、答疑、交友,就上经管之家!
免流量费下载资料----在经管之家app可以下载论坛上的所有资源,并且不额外收取下载高峰期的论坛币。
涵盖所有经管领域的优秀内容----覆盖经济、管理、金融投资、计量统计、数据分析、国贸、财会等专业的学习宝库,各类资料应有尽有。
来自五湖四海的经管达人----已经有上千万的经管人来到这里,你可以找到任何学科方向、有共同话题的朋友。
经管之家(原人大经济论坛),跨越高校的围墙,带你走进经管知识的新世界。
扫描下方二维码下载并注册APP
本文关键词:

本文论坛网址:https://bbs.pinggu.org/thread-4650680-1-1.html

人气文章

1.凡人大经济论坛-经管之家转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
经管之家 人大经济论坛 大学 专业 手机版