关于本站
人大经济论坛-经管之家:分享大学、考研、论文、会计、留学、数据、经济学、金融学、管理学、统计学、博弈论、统计年鉴、行业分析包括等相关资源。
经管之家是国内活跃的在线教育咨询平台!
经管之家新媒体交易平台
提供"微信号、微博、抖音、快手、头条、小红书、百家号、企鹅号、UC号、一点资讯"等虚拟账号交易,真正实现买卖双方的共赢。【请点击这里访问】
毕业论文
- 开题报告 | 【独家发布】论文 ...
- 开题报告 | 周五双学位论文开 ...
- 开题报告 | 还是找开题报告的 ...
- 开题报告 | 求浙江大学MBA论文 ...
- 开题报告 | 交开题报告
- 开题报告 | 本科毕业论文开题 ...
- 开题报告 | 开题报告、文献检 ...
- 开题报告 | 写开题报告中嘤嘤 ...
TOP热门关键词
坛友互助群 |
扫码加入各岗位、行业、专业交流群 |
一、Java方式:
- public class RDD2DataFrameByReflection {
- public static void main(String args[]){
- SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
- JavaSparkContext sc = new JavaSparkContext(conf);
- SQLContext sqlContext = new SQLContext(sc);
- JavaRDD<String> lines = sc.textFile("D:\\idea\\spark_dt\\src\\main\\java\\com\\base\\spark\\sql\\transfer\\person.txt");
- JavaRDD<Person_> persons = lines.map((Function<String, Person_>) line -> {
- String[] splited = line.split(",");
- Person_ p = new Person_();
- p.setId(Integer.valueOf(splited[0]));
- p.setName(splited[1]);
- p.setAge(Integer.valueOf(splited[0]));
- return p;
- });
- //在底层通过反射的方式获得P鹅绒的所有的fields,结合RDD本身,就生成了DataFrame
- DataFrame df = sqlContext.createDataFrame(persons, Person_.class);
- df.show();
- df.registerTempTable("persons");
- DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");
- JavaRDD<Row> bigDataRDD= bigDatas.toJavaRDD();
- JavaRDD<Person_> personRDD = bigDataRDD.map((Function<Row, Person_>) row -> {
- Person_ p = new Person_();
- p.setId(row.getInt(0));
- p.setName(row.getString(1));
- p.setAge(row.getInt(2));
- return p;
- });
- personRDD.collect().forEach(System.out::println);
- }
- }
- class Person_{
- public int getId() {
- return id;
- }
- public void setId(Integer id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getAge() {
- return age;
- }
- public void setAge(Integer age) {
- this.age = age;
- }
- @Override
- public String toString() {
- return "Person{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", age=" + age +
- '}';
- }
- public Person_(Integer id, String name, Integer age) {
- this.id = id;
- this.name = name;
- this.age = age;
- }
- Person_(){}
- private Integer id;
- private String name;
- private Integer age;
- }
运行结果如下:
- 16/03/25 05:59:14 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$anonfun$org$apache$spark$sql$SQLContext$beansToRows$1$anonfun$apply$1 can not access a member of class com.base.spark.sql.transfer.Person_ with modifiers "public"
- at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
解决办法:Person_类需要Public修饰符,将Person_放到单独的文件
运行结果如下:
- 16/03/25 06:04:46 INFO DAGScheduler: Job 0 finished: show at RDD2DataFrameByReflection.java:32, took 0.178583 s
- +---+---+------+
- |age| id|name|
- +---+---+------+
- |1|1| spark|
- |2|2|hadoop|
- |3|3| flink|
- |4|4| spark|
- |5|5|hadoop|
- |6|6| spark|
- |7|7| spark|
- |8|8| spark|
- |9|9| scala|
- | 10| 10|java|
- | 11| 11| spark|
- +---+---+------+
- 16/03/25 06:04:47 INFO SparkContext: Starting job: collect at RDD2DataFrameByReflection.java:43
- 16/03/25 06:04:47 INFO DAGScheduler: Got job 1 (collect at RDD2DataFrameByReflection.java:43) with 1 output partitions
- 16/03/25 06:04:47 INFO DAGScheduler: Final stage: ResultStage 1 (collect at RDD2DataFrameByReflection.java:43)
- 16/03/25 06:04:47 INFO DAGScheduler: Parents of final stage: List()
- 16/03/25 06:04:47 INFO DAGScheduler: Missing parents: List()
- 16/03/25 06:04:47 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at map at RDD2DataFrameByReflection.java:36), which has no missing parents
- 16/03/25 06:04:47 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 9.5 KB, free 135.6 KB)
- 16/03/25 06:04:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.9 KB, free 140.5 KB)
- 16/03/25 06:04:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:56952 (size: 4.9 KB, free: 1771.1 MB)
- 16/03/25 06:04:47 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
- 16/03/25 06:04:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at map at RDD2DataFrameByReflection.java:36)
- 16/03/25 06:04:47 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
- 16/03/25 06:04:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,PROCESS_LOCAL, 2177 bytes)
- 16/03/25 06:04:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
- 16/03/25 06:04:47 INFO HadoopRDD: Input split: file:/D:/idea/spark_dt/src/main/java/com/base/spark/sql/transfer/person.txt:0+131
- 16/03/25 06:04:47 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56952 in memory (size: 3.1 KB, free: 1771.1 MB)
- 16/03/25 06:04:47 INFO ContextCleaner: Cleaned accumulator 1
- 16/03/25 06:04:47 INFO GeneratePredicate: Code generated in 240.433533 ms
- 16/03/25 06:04:47 INFO GenerateUnsafeProjection: Code generated in 13.091797 ms
- 16/03/25 06:04:47 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
- java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
错误原因:返回的ROW中的字段并没有按照Person_字段的顺序排列
Row中的字段顺序:
可以看到这里scheme的顺序并不是我们定义的字段顺序
解决办法:
- /**
- * Returns the value of a given fieldName.
- * For primitive types if value is null it returns 'zero value' specific for primitive
- * ie. 0 for Int - use isNullAt to ensure that value is not null
- *
- * @throws UnsupportedOperationException when schema is not defined.
- * @throws IllegalArgumentException when fieldName do not exist.
- * @throws ClassCastException when data type does not match.
- */
- def getAs[T](fieldName: String): T = getAs[T](fieldIndex(fieldName))
可以看到这里Row 提供了根据Row中的字段名称来获取字段的值
修改代码:
- JavaRDD<Person_> personRDD = bigDataRDD.map((Function<Row, Person_>) row -> {
- Person_ p = new Person_();
- p.setId(row.getAs("id"));
- p.setName(row.getAs("name"));
- p.setAge(row.getAs("age"));
- return p;
- });
运行结果:
- java.io.NotSerializableException: com.base.spark.sql.transfer.Person_
- Serialization stack:
- - object not serializable (class: com.base.spark.sql.transfer.Person_, value: Person{id=6, name='spark', age=6})
- - element of array (index: 0)
- - array (class [Ljava.lang.Object;, size 6)
- at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
- at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
- at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
- at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
- at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
- at java.lang.Thread.run(Thread.java:745)
提示我们Person_ 没有序列化
解决办法:
让Person_ 实现Serializable 接口
再次运行:
- 16/03/25 06:18:00 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
- 16/03/25 06:18:00 INFO DAGScheduler: ResultStage 1 (collect at RDD2DataFrameByReflection.java:43) finished in 0.347 s
- 16/03/25 06:18:00 INFO DAGScheduler: Job 1 finished: collect at RDD2DataFrameByReflection.java:43, took 0.356079 s
- Person{id=6, name='spark', age=6}
- Person{id=7, name='spark', age=7}
- Person{id=8, name='spark', age=8}
- Person{id=9, name='scala', age=9}
- Person{id=10, name='java', age=10}
- Person{id=11, name='spark', age=11}
这次运行成功了
总结:
JavaRDD和DataFrame互相转换注意事项:
1)反射的类必须是Pulicclass
2)定义的类必须实现Serializable 接口
3)DataFrame 转换成RDD时注意根据fieldName 获取对应的值
二、scala方式
注意:这里lines.map 返回的RDD并没有.toDF 方法,这里用到了隐式转换
- /**
- * Creates a DataFrame from an RDD of Product (e.g. case classes, tuples).
- * @since 1.3.0
- */
- implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {
- DataFrameHolder(_sqlContext.createDataFrame(rdd))
- }
隐式转换最终将RDD转换成了DataFrameHolder 然后调用toDF 最终将RDD转换成了DataFrame了
- case class DataFrameHolder private[sql](private val df: DataFrame) {
- // This is declared with parentheses to prevent the Scala compiler from treating
- // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
- def toDF(): DataFrame = df
- def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)
- }
注:本学习笔记来自DT大数据梦工厂 微信公众号:DT_Spark 每晚8点YY永久直播频道:68917580
免流量费下载资料----在经管之家app可以下载论坛上的所有资源,并且不额外收取下载高峰期的论坛币。
涵盖所有经管领域的优秀内容----覆盖经济、管理、金融投资、计量统计、数据分析、国贸、财会等专业的学习宝库,各类资料应有尽有。
来自五湖四海的经管达人----已经有上千万的经管人来到这里,你可以找到任何学科方向、有共同话题的朋友。
经管之家(原人大经济论坛),跨越高校的围墙,带你走进经管知识的新世界。
扫描下方二维码下载并注册APP
您可能感兴趣的文章
人气文章
本文标题:第59课:使用Java和Scala在IDE中实战RDD和DataFrame
本文链接网址:https://bbs.pinggu.org/jg/qikan_qikanku_4650680_1.html
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。