- 阅读权限
- 255
- 威望
- 0 级
- 论坛币
- 49957 个
- 通用积分
- 79.5487
- 学术水平
- 253 点
- 热心指数
- 300 点
- 信用等级
- 208 点
- 经验
- 41518 点
- 帖子
- 3256
- 精华
- 14
- 在线时间
- 766 小时
- 注册时间
- 2006-5-4
- 最后登录
- 2022-11-6
|
- Extractor Objects
- 参见 官方文档 Extractor Objects
- 下面Spark SQL代码,当遇到 case ExtractEquiJoinKeys 时自动使用ExtractEquiJoinKeys对象(?only object) unapply 方法
- case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
- val buildSide =
- if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) {
- joins.BuildRight
- } else {
- joins.BuildLeft
- }
- val hashJoin = joins.ShuffledHashJoin(
- leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
- condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
- ...
- object ExtractEquiJoinKeys extends Logging with PredicateHelper {
- /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
- type ReturnType =
- (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
- def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
- case join @ Join(left, right, joinType, condition) =>
- logDebug(s"Considering join on: $condition")
- // Find equi-join predicates that can be evaluated before the join, and thus can be used
- // as join keys.
- val (joinPredicates, otherPredicates) =
- condition.map(splitConjunctivePredicates).getOrElse(Nil).partition {
- case EqualTo(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
- (canEvaluate(l, right) && canEvaluate(r, left)) => true
- case _ => false
- }
- val joinKeys = joinPredicates.map {
- case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
- case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
- }
- val leftKeys = joinKeys.map(_._1)
- val rightKeys = joinKeys.map(_._2)
- if (joinKeys.nonEmpty) {
- logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
- Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
- } else {
- None
- }
- case _ => None
- }
复制代码
|
|