楼主: Lisrelchen
1707 8

【GitHub】Spark with Scala & Alogorithm [推广有奖]

  • 0关注
  • 62粉丝

VIP

院士

67%

还不是VIP/贵宾

-

TA的文库  其他...

Bayesian NewOccidental

Spatial Data Analysis

东西方数据挖掘

威望
0
论坛币
49957 个
通用积分
79.5487
学术水平
253 点
热心指数
300 点
信用等级
208 点
经验
41518 点
帖子
3256
精华
14
在线时间
766 小时
注册时间
2006-5-4
最后登录
2022-11-6

相似文件 换一批

+2 论坛币
k人 参与回答

经管之家送您一份

应届毕业生专属福利!

求职就业群
赵安豆老师微信:zhaoandou666

经管之家联合CDA

送您一个全额奖学金名额~ !

感谢您参与论坛问题回答

经管之家送您两个论坛币!

+2 论坛币
  1. Cake Pattern

  2. Worker.scala
  3. master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))


  4. ActorRefProvider.scala
  5. trait ActorRefFactory {
  6. def actorSelection(path: String): ActorSelection = path match {
  7.   case RelativeActorPath(elems) ⇒
  8.     if (elems.isEmpty) ActorSelection(provider.deadLetters, "")
  9.     else if (elems.head.isEmpty) ActorSelection(provider.rootGuardian, elems.tail)
  10.     else ActorSelection(lookupRoot, elems)
  11.   case ActorPathExtractor(address, elems) ⇒
  12.     ActorSelection(provider.rootGuardianAt(address), elems)
  13.   case _ ⇒
  14.     ActorSelection(provider.deadLetters, "")
  15. }
复制代码

本帖隐藏的内容

spark-with-scala.pdf (1.01 MB)


二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

关键词:address

本帖被以下文库推荐

沙发
Lisrelchen 发表于 2017-2-21 10:05:32 |只看作者 |坛友微信交流群
  1. Extractor Objects

  2. 参见 官方文档 Extractor Objects

  3. 下面Spark SQL代码,当遇到  case ExtractEquiJoinKeys 时自动使用ExtractEquiJoinKeys对象(?only object) unapply 方法
  4.       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
  5.         val buildSide =
  6.           if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
  7.             joins.BuildRight
  8.           } else {
  9.             joins.BuildLeft
  10.           }
  11.         val hashJoin = joins.ShuffledHashJoin(
  12.           leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
  13.         condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
  14. ...  

  15. object ExtractEquiJoinKeys extends Logging with PredicateHelper {
  16.   /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
  17.   type ReturnType =
  18.     (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

  19.   def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
  20.     case join @ Join(left, right, joinType, condition) =>
  21.       logDebug(s"Considering join on: $condition")
  22.       // Find equi-join predicates that can be evaluated before the join, and thus can be used
  23.       // as join keys.
  24.       val (joinPredicates, otherPredicates) =
  25.         condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
  26.           case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
  27.             (canEvaluate(l, right) && canEvaluate(r, left)) => true
  28.           case _ => false
  29.         }

  30.       val joinKeys = joinPredicates.map {
  31.         case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
  32.         case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
  33.       }
  34.       val leftKeys = joinKeys.map(_._1)
  35.       val rightKeys = joinKeys.map(_._2)

  36.       if (joinKeys.nonEmpty) {
  37.         logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
  38.         Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
  39.       } else {
  40.         None
  41.       }
  42.     case _ => None
  43.   }
复制代码

使用道具

藤椅
Lisrelchen 发表于 2017-2-21 10:06:50 |只看作者 |坛友微信交流群
  1. Try Some None

  2. HadoopRDD.scala
  3.   override def getPreferredLocations(split: Partition): Seq[String] = {
  4.     val locs: Option[Seq[String]] = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
  5.       case Some(c) =>
  6.         try {
  7.           val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
  8.           val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
  9.           Some(HadoopRDD.convertSplitLocationInfo(infos))
  10.         } catch {
  11.           case e: Exception =>
  12.             logDebug("Failed to use InputSplitWithLocations.", e)
  13.             None
  14.         }
  15.       case None => None
  16.     }
  17.     locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
  18.   }


  19. ??: 如果不写HadoopRDD.SPLIT... 则找不到SPLIT_INFO_REFLECTIONS

  20. locs有可能是Some或者None, 则getOrElse,传进去一个函数,即去掉本地节点
  21. private[spark] val SPLIT_INFO_REFLECTIONS: Option[SplitInfoReflections] = try {
  22.     Some(new SplitInfoReflections)
  23.   } catch {
  24.     case e: Exception =>
  25.       logDebug("SplitLocationInfo and other new Hadoop classes are " +
  26.           "unavailable. Using the older Hadoop location info code.", e)
  27.       None
  28.   }

  29.   private[spark] class SplitInfoReflections {
  30.     val inputSplitWithLocationInfo =
  31.       Class.forName("org.apache.hadoop.mapred.InputSplitWithLocationInfo")
  32.     val getLocationInfo = inputSplitWithLocationInfo.getMethod("getLocationInfo")
  33.     val newInputSplit = Class.forName("org.apache.hadoop.mapreduce.InputSplit")
  34.     val newGetLocationInfo = newInputSplit.getMethod("getLocationInfo")
  35.     val splitLocationInfo = Class.forName("org.apache.hadoop.mapred.SplitLocationInfo")
  36.     val isInMemory = splitLocationInfo.getMethod("isInMemory")
  37.     val getLocation = splitLocationInfo.getMethod("getLocation")
  38.   }
复制代码

使用道具

板凳
Lisrelchen 发表于 2017-2-21 10:08:31 |只看作者 |坛友微信交流群
  1. Iterator

  2. DAGScheduler.scala
  3. def submitJob[T, U](
  4.       rdd: RDD[T],
  5.       func: (TaskContext, Iterator[T]) => U,
  6.       partitions: Seq[Int],
  7.       callSite: CallSite,
  8.       allowLocal: Boolean,
  9.       resultHandler: (Int, U) => Unit,
  10.       properties: Properties = null): JobWaiter[U] =
  11.   {

  12.     partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
  13.       throw new IllegalArgumentException(
  14.         "Attempting to access a non-existent partition: " + p + ". " +
  15.           "Total number of partitions: " + maxPartitions)
  16.     }
复制代码

使用道具

报纸
Lisrelchen 发表于 2017-2-21 10:09:57 |只看作者 |坛友微信交流群
  1. dropWhile
  2. //RDDOperationScope.withScope
  3. val callerMethodName = Thread.currentThread.getStackTrace()
  4.       .dropWhile(_.getMethodName != ourMethodName)
  5.       .find(_.getMethodName != ourMethodName)
  6.       .map(_.getMethodName)
  7.       .getOrElse {
  8.         // Log a warning just in case, but this should almost certainly never happen
  9.         logWarning("No valid method name for this RDD operation scope!")
  10.         "N/A"
  11.       }


  12. dropWhile(p) 表示如果集合元素符合条件则略过 如

  13. +
  14. val it = Iterator("a", "number", "of", "words")
  15. //定义了小于2的忽略
  16. it dropWhile (_.length < 2)
  17. it.next()


  18. 当执行 it.next 到达"a"时忽略,直接到达"number"
复制代码

使用道具

地板
MouJack007 发表于 2017-2-21 14:32:11 |只看作者 |坛友微信交流群
谢谢楼主分享!

使用道具

7
MouJack007 发表于 2017-2-21 14:32:32 |只看作者 |坛友微信交流群

使用道具

8
kkkm_db 发表于 2017-2-21 16:09:49 |只看作者 |坛友微信交流群
谢谢分享

使用道具

9
钱学森64 发表于 2017-2-21 19:11:44 |只看作者 |坛友微信交流群
谢谢分享

使用道具

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加JingGuanBbs
拉您进交流群

京ICP备16021002-2号 京B2-20170662号 京公网安备 11010802022788号 论坛法律顾问:王进律师 知识产权保护声明   免责及隐私声明

GMT+8, 2024-4-26 14:03