flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] godfreyhe commented on a change in pull request #7881: [FLINK-11795][table-planner-blink] Introduce DataStream nodes and converter rules for batch and stream
Date Mon, 04 Mar 2019 03:09:31 GMT
godfreyhe commented on a change in pull request #7881: [FLINK-11795][table-planner-blink] Introduce
DataStream nodes and converter rules for batch and stream
URL: https://github.com/apache/flink/pull/7881#discussion_r261911680
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalDataStreamTableScan.scala
 ##########
 @@ -51,46 +48,30 @@ class FlinkLogicalNativeTableScan(
   }
 }
 
-class FlinkLogicalNativeTableScanConverter
+class FlinkLogicalDataStreamTableScanConverter
   extends ConverterRule(
     classOf[LogicalTableScan],
     Convention.NONE,
     FlinkConventions.LOGICAL,
-    "FlinkLogicalNativeTableScanConverter") {
+    "FlinkLogicalDataStreamTableScanConverter") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: TableScan = call.rel(0)
-    FlinkLogicalNativeTableScan.isLogicalNativeTableScan(scan)
+    val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
+    dataStreamTable != null
   }
 
   def convert(rel: RelNode): RelNode = {
     val scan = rel.asInstanceOf[TableScan]
-    FlinkLogicalNativeTableScan.create(rel.getCluster, scan.getTable)
+    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
+    new FlinkLogicalDataStreamTableScan(
+      rel.getCluster,
+      traitSet,
+      scan.getTable
+    )
   }
 }
 
-object FlinkLogicalNativeTableScan {
-  val CONVERTER = new FlinkLogicalNativeTableScanConverter
-
-  def isLogicalNativeTableScan(scan: TableScan): Boolean = {
-    val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
-    dataStreamTable != null
-  }
-
-  def create(cluster: RelOptCluster, relOptTable: RelOptTable): FlinkLogicalNativeTableScan
= {
-    val table = relOptTable.unwrap(classOf[Table])
-    val traitSet = cluster.traitSetOf(Convention.NONE).replaceIfs(
-      RelCollationTraitDef.INSTANCE, new Supplier[util.List[RelCollation]]() {
-        def get: util.List[RelCollation] = {
-          if (table != null) {
-            table.getStatistic.getCollations
-          } else {
-            ImmutableList.of[RelCollation]
-          }
-        }
-      })
-    val scan = new FlinkLogicalNativeTableScan(cluster, traitSet, relOptTable)
-    val newTraitSet = scan.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
-    scan.copy(newTraitSet, scan.getInputs).asInstanceOf[FlinkLogicalNativeTableScan]
-  }
 
 Review comment:
   The implementation looks a bit weird, so I revert this and add this back when need.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message