1489 0

第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读(1) [推广有奖]

  • 1关注
  • 8粉丝

硕士生

34%

还不是VIP/贵宾

-

威望
0
论坛币
305 个
通用积分
0
学术水平
5 点
热心指数
14 点
信用等级
2 点
经验
23002 点
帖子
73
精华
0
在线时间
135 小时
注册时间
2016-2-27
最后登录
2016-9-11

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币

一、隐式转换


1) 隐式转换案例

  1. object ABCDMain extends App {
  2.    class B
  3.    class C {
  4.       override def toString() = "I amC";
  5.       def printC(c: C) = println(c);
  6.    }
  7.    class D
  8.    implicit def B2C(b: B) = {
  9.       println("B2C")
  10.       new C
  11.    }

  12.    implicit def D2C(d: D) = {
  13.       println("D2C")
  14.       new C
  15.    }

  16.    new D().printC(new B)
  17.        }
复制代码

先调用 D2C转换函数将new D()转换成C类, 然后调用C类的printC方法;但发现传入的参数类型是B类,于是搜索当前范围有无合适的转换函数,发现B2C转换函数符合要求。


2) 隐式操作规则

       隐式定义是指编译器为了修正类型错误而允许插入到程序中的定义。例如,如果x + y 不能通过类型检查,那么编译器就可能把它修改为convert(x) + y , 这里的convert实质可用的隐式转换。

例如:scala> val a = 4

       a: Int = 4

       scala> val b = "3"

       b: String = 3


       scala> def sum(x: Int, y: Int) = x + y

       sum: (x: Int, y: Int)Int


       scala> sum(a, b)

       <console>:13: error: type mismatch;

       found  : String

       required: Int

              sum(a, b)

                     ^

此时类型不匹配,我们添加一个函数:

       scala> implicit def str2Int(str: String)= Integer.valueOf(str).toInt

       warning: there were 1 feature warning(s);re-run with -feature for details

       str2Int: (str: String)Int

再次执行:scala> sum(a, b)

                  res4: Int = 7

此时b 被转成了我们期待的Int 类型,对于函数sum 来说 convert 就是str2Int,此时如果还有其他的函数也有类型转换的需求就可以简化代码。隐式转换由下面的规则掌控:

       a) 标记规则:只有标记为implicit 的定义才是可用的。implicit 关键字用来标记编译器可以用于隐式转换,可以使用它标记任何变量、函数,对象、参数。

Spark源码中的隐式对象:

  1. implicit val keyConverter =keyWritableFactory.convert            
  2.         implicit val valueConverter =valueWritableFactory.convert
  3.         newSequenceFileRDDFunctions(rdd,
  4.         keyWritableFactory.writableClass(kt),valueWritableFactory.writableClass(vt))
  5.         implicit defrddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
  6.        (implicit kt: ClassTag[K], vt:ClassTag[V],
  7.         keyWritableFactory:WritableFactory[K],
  8.         valueWritableFactory:WritableFactory[V])
  9.         : SequenceFileRDDFunctions[K,V] = {
  10.         implicit val keyConverter =keyWritableFactory.convert
  11.         implicit val valueConverter= valueWritableFactory.convert
  12.         newSequenceFileRDDFunctions(rdd,
  13.         keyWritableFactory.writableClass(kt),valueWritableFactory.writableClass(vt))
  14.    }
复制代码

这里创建SequenceFileRDD时使用了隐式的key,value 对象来做转换。


隐式方法:     

  1. implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  2.        (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):PairRDDFunctions[K, V] = {                   new PairRDDFunctions(rdd)
  3.    }
复制代码

RDD的伴生对象中提供的implicit方法。

Word count程序中 sc.textFile(“/usr/local/word.txt”).flatMap(_.split(“\\s+”).map((_,1)).reduceByKey(_+_).collect

就调用了这implicit 方法。


隐式参数:

  1. def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
  2.       (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
  3.       hadoopFile[K, V, F](path, defaultMinPartitions)
  4.    }
复制代码

这里我们用sc.textFile读取hadoop上的文件是,hadoopFile 使用隐式参数km,vm保存了输入hadoop输入的key、value类型信息。


       b) 作用域规则:插入的隐式转换必须以单一的标示符形式处于作用域中,或者与转换的源或目标类型关联在一起,scala编译器仅仅考虑处于作用域之内的隐式转换。

       例如上面的word count程序sc.textFile 方法调用后会产生HadoopRDD 而hadoopRDD中并没有flatMap方法,而hadoopRDD的父类中的RDD的伴生对象object RDD中有flatMap方法此时编译器就会调用。

       object RDD中的:

  1. def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  2.            val cleanF = sc.clean(f)
  3.            new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  4.     }
复制代码

方法来创建MapPartitionsRDD,这样每次RDD转换时,编译器都会从object RDD 查找此RDD没有定义的方法来实现关联,这样就不需要在程序中单独引用转换了。

     c) 无歧义规则:隐式转换只有不存在其他可以插入的转换前提下才能插入,此时编译器会报错。以之前隐式操作规则 那小节的案例说明:

       我们继续定义一个strt 到Int 的转换:

       scala>   implicit def str2Int2(str: String) =Integer.valueOf(str).toInt

       warning: there were 1 feature warning(s);re-run with -feature for details

       tr2Int2: (str: String)Int

       我们再次调用 sum方法;

       sum(a, b)

       <console>:17: error: type mismatch;

       found  : String

       required: Int

       Note that implicit conversions are notapplicable because they are ambiguous:

       both method str2Int of type (str: String)Int

       and method str2Int2 of type (str:String)Int

       are possible conversion functions fromString to Int

       sum(a, b)

                     ^

     此时编译器拒绝了我们定义的隐式转换。


     d) 命名隐式转换:隐式转换可以任意命名,因为编译器是通过方法的参数信息来推断应该来调用哪个方法,而不是方法的名称。最佳实践:方法名称就能看出此方法是做了是么转换:

  1. implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  2.      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  3.          new PairRDDFunctions(rdd)
  4.     }
复制代码

此方法名称rddToPairRDDFunctions 表明是将RDD转换成PairRDDFunctions


3) 隐式参数

由运行时上下文实际赋值注入的参数,不需要手动赋值,自动完成。一般到隐式参数类型的伴生对象中去找隐式值。

scala> classLevel(val level : Int)

defined class Level


scala> implicitval lelvel = new Level(9)

lelvel: Level =Level@12028586


scala> deftoWorker(name: String)(implicit level: Level) {println(name + ":" +level.level)}

toWorker: (name:String)(implicit level: Level)Unit


scala>toWorker("wjl")

wjl:9


注意val 本身也被标记为implicit,如果不是的话编译器就不能使用它。Implicit关键字是作用于全体参数列表,而不是单独的参数。

scala> val a = 4

a: Int = 4


scala> implicitval b = "s"

b: String = s


scala> implicitval c = 56

c: Int = 56


scala> defsum(x: Int)(implicit y: Int, z: String) = x + y + z

sum: (x:Int)(implicit y: Int, implicit z: String)String


scala> sum(4)

res4: String = 60s

从REPL的显示我们看到z也被加上了implicit 关键字。


4) 隐式对象

   scala> abstract class Template[T] {

     |      def add(x: T, y: T): T

     |    }

   defined class Template


   scala> abstract class SubTemplate[T]extends Template[T] {

     |      def unit: T

     |    }

   defined class SubTemplate


   scala> implicit object StringAdd extendsSubTemplate[String] {

     |        override def add(x: String, y: String) = x concat y

     |        override def unit: String = ""

     |      }

   defined module StringAdd



scala> implicitobject IntAdd extends SubTemplate[Int] {

     |        override def add(x: Int, y: Int) = x + y

     |        override def unit: Int = 0

     |      }

defined moduleIntAdd


scala>  def sum[T](xs: List[T])(implicit m:SubTemplate[T]): T = {

     | if (xs.isEmpty) m.unit

     | else m.add(xs.head, sum(xs.tail))}

sum: [T](xs:List[T])(implicit m: SubTemplate[T])T


scala>println(sum(List(1, 2, 3, 4, 5)))

15


scala>println(sum(List("Scala", "Spark", "Kafka")))

ScalaSparkKafka

这里利用了scala的类型推断功能,传入不同的类型调用不同的函数,传入String 类型的List时T 被替换成String ,传入Int类型时T被替换成 Int。


5) 隐式类

scala> objectContext_Helper{  

     |    implicit class FileEnhancer(file : File){   

     |        def read = Source.fromFile(file.getPath).mkString  

     |    }

     |    implicit class Op(x:Int){

     |         def addSAP(second: Int) = x + second

     |    }

     | }

defined moduleContext_Helper


scala> importContext_Helper._

importContext_Helper._

scala>  println(1.addSAP(2))

3

这里1 没有 addSAP 方法,首先回到RichInt 里面去找,然后到运行的上下文中去找,然后到Context_Helper.中直接调用addSAP方法,这里并没有返回一个对象而是直接调用。原因是这里隐式类Op在构造的时候,传入了1 这个参数,然后调用了Op 对象的addSAP方法。


总结:implicit  1.从当前类的伴生对象中找 2. 把所有的隐式转换放在一个object中,然后导入3.从当前作用域中的隐式转换中找 4 在用到隐式转换的时候定义或者导入。


最佳实践:在伴生对象中定义隐式转换

  1. object RDD {

  2.   // The following implicit functions were in SparkContext before 1.3 and users had to
  3.   // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  4.   // them automatically. However, we still keep the old functions in SparkContext for backward
  5.   // compatibility and forward to the following functions directly.

  6.   implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
  7.     (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  8.     new PairRDDFunctions(rdd)
  9.   }

  10.   implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
  11.     new AsyncRDDActions(rdd)
  12.   }

  13.   implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
  14.       (implicit kt: ClassTag[K], vt: ClassTag[V],
  15.                 keyWritableFactory: WritableFactory[K],
  16.                 valueWritableFactory: WritableFactory[V])
  17.     : SequenceFileRDDFunctions[K, V] = {
  18.     implicit val keyConverter = keyWritableFactory.convert
  19.     implicit val valueConverter = valueWritableFactory.convert
  20.     new SequenceFileRDDFunctions(rdd,
  21.       keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
  22.   }

  23.   implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
  24.     : OrderedRDDFunctions[K, V, (K, V)] = {
  25.     new OrderedRDDFunctions[K, V, (K, V)](rdd)
  26.   }

  27.   implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = {
  28.     new DoubleRDDFunctions(rdd)
  29.   }

  30.   implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T])
  31.     : DoubleRDDFunctions = {
  32.     new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
  33.   }
  34. }
复制代码


注:本学习笔记来自DT大数据梦工厂

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:SCALA Spark SPAR Park SPA Spark Scala DT_Spark 大数据

已有 1 人评分论坛币 收起 理由
daazx + 5 精彩帖子

总评分: 论坛币 + 5   查看全部评分

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-5-1 09:13