flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] godfreyhe commented on a change in pull request #8499: [FLINK-12575] [table-planner-blink] Introduce planner rules to remove redundant shuffle and collation
Date Fri, 24 May 2019 11:41:55 GMT
godfreyhe commented on a change in pull request #8499: [FLINK-12575] [table-planner-blink]
Introduce planner rules to remove redundant shuffle and collation
URL: https://github.com/apache/flink/pull/8499#discussion_r287321109
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
 ##########
 @@ -242,6 +245,102 @@ class BatchExecOverAggregate(
     windowGroupInfo
   }
 
+  override def satisfyTraits(requiredTraitSet: RelTraitSet): RelNode = {
+    val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
+    val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+    if (requiredDistribution.getType == ANY && requiredCollation.getFieldCollations.isEmpty)
{
+      return null
+    }
+
+    val selfProvidedTraitSet = inferProvidedTraitSet()
+    if (selfProvidedTraitSet.satisfies(requiredTraitSet)) {
+      // Current node can satisfy the requiredTraitSet,return the current node with ProvidedTraitSet
+      return copy(selfProvidedTraitSet, Seq(getInput))
+    }
+
+    val inputFieldCnt = getInput.getRowType.getFieldCount
+    val canSatisfy = if (requiredDistribution.getType == ANY) {
+      true
+    } else {
+      if (!grouping.isEmpty) {
+        if (requiredDistribution.requireStrict) {
+          requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
+        } else {
+          val isAllFieldsFromInput = requiredDistribution.getKeys.forall(_ < inputFieldCnt)
+          if (isAllFieldsFromInput) {
+            val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+            if (tableConfig.getConf.getBoolean(
+              PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED)) {
+              ImmutableIntList.of(grouping: _*).containsAll(requiredDistribution.getKeys)
+            } else {
+              requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
+            }
+          } else {
+            // If requirement distribution keys are not all comes from input directly,
+            // cannot satisfy requirement distribution and collations.
+            false
+          }
+        }
+      } else {
+        requiredDistribution.getType == SINGLETON
+      }
+    }
+    // If OverAggregate can provide distribution, but it's traits cannot satisfy required
+    // distribution, cannot push down distribution and collation requirement (because later
+    // shuffle will destroy previous collation.
+    if (!canSatisfy) {
+      return null
+    }
+
+    var inputRequiredTraits = getInput.getTraitSet
+    var providedTraits = selfProvidedTraitSet
+    val providedCollation = selfProvidedTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+    if (!requiredDistribution.isTop) {
+      inputRequiredTraits = inputRequiredTraits.replace(requiredDistribution)
+      providedTraits = providedTraits.replace(requiredDistribution)
+    }
+
+    if (providedCollation.satisfies(requiredCollation)) {
+      // the providedCollation can satisfy the requirement,
+      // so don't push down the sort into it's input.
+    } else if (providedCollation.getFieldCollations.isEmpty &&
+      requiredCollation.getFieldCollations.nonEmpty) {
+      // If OverAgg cannot provide collation itself, try to push down collation requirements
into
+      // it's input if collation fields all come from input node.
+      val canPushDownCollation = requiredCollation.getFieldCollations
+        .forall(_.getFieldIndex < inputFieldCnt)
+      if (canPushDownCollation) {
+        inputRequiredTraits = inputRequiredTraits.replace(requiredCollation)
+        providedTraits = providedTraits.replace(requiredCollation)
+      }
+    } else {
+      // Don't push down the sort into it's input,
+      // due to the provided collation will destroy the input's provided collation.
+    }
+    val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
+    copy(providedTraits, Seq(newInput))
+  }
+
+  private def inferProvidedTraitSet(): RelTraitSet = {
+    var selfProvidedTraitSet = getTraitSet
+    // provided distribution
+    val providedDistribution = if (grouping.nonEmpty) {
+      FlinkRelDistribution.hash(grouping.map(Integer.valueOf).toList, requireStrict = false)
 
 Review comment:
   if the grouping of an over instance is a, b, c, that means it could provides hash[a, b,
c] distribution. however its really distribution may be hash[a, b], because its input maybe
provide hash[a, b]. currently, `requireStrict` is true is always used for require distribution.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message