flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] beyond1920 commented on a change in pull request #8035: [FLINK-11975][table-planner-blink] Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
Date Mon, 25 Mar 2019 07:43:51 GMT
beyond1920 commented on a change in pull request #8035: [FLINK-11975][table-planner-blink]
Support to run a sql query : 'SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)'
URL: https://github.com/apache/flink/pull/8035#discussion_r268515702
 
 

 ##########
 File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##########
 @@ -91,6 +103,99 @@ abstract class StreamTableEnvironment(
       AccModeTraitDef.INSTANCE)
   )
 
+  /**
+    * Merge global job parameters and table config parameters,
+    * and set the merged result to GlobalJobParameters
+    */
+  private def mergeParameters(): Unit = {
+    if (!isConfigMerged && execEnv != null && execEnv.getConfig != null)
{
+      val parameters = new Configuration()
+      if (config != null && config.getConf != null) {
+        parameters.addAll(config.getConf)
+      }
+
+      if (execEnv.getConfig.getGlobalJobParameters != null) {
+        execEnv.getConfig.getGlobalJobParameters.toMap.foreach {
+          kv => parameters.setString(kv._1, kv._2)
+        }
+      }
+      parameters.setBoolean(
+        StateUtil.STATE_BACKEND_ON_HEAP,
+        StateUtil.isHeapState(execEnv.getStateBackend))
+      execEnv.getConfig.setGlobalJobParameters(parameters)
+      isConfigMerged = true
+    }
+  }
+
+  /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataStream]] and handed over to the
+    * [[TableSink]] to write it.
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The expected type of the [[DataStream]] which represents the [[Table]].
+    */
+  override private[table] def writeToSink[T](
+    table: Table,
+    sink: TableSink[T],
+    sinkName: String): Unit = {
+    val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+    translateSink(sinkNode)
+  }
+
+  /**
+    * Translates a [[Table]] into a [[DataStream]].
+    *
+    * The transformation involves optimizing the relational expression tree as defined by
+    * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators.
+    *
+    * @param table               The root node of the relational expression tree.
+    * @param updatesAsRetraction Set to true to encode updates as retraction messages.
+    * @param withChangeFlag      Set to true to emit records with change flags.
+    * @param resultType          The [[org.apache.flink.api.common.typeinfo.TypeInformation[_]]
of
+    *                            the resulting [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translateToDataStream[T](
+    table: Table,
+    updatesAsRetraction: Boolean,
+    withChangeFlag: Boolean,
+    resultType: TypeInformation[T]): DataStream[T] = {
+    val sink = new DataStreamTableSink[T](table, resultType, updatesAsRetraction, withChangeFlag)
+    val sinkName = createUniqueTableName()
+    val sinkNode = LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+    val transformation = translateSink(sinkNode)
+    new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]]
+  }
+
+  private def translateSink(sink: LogicalSink): StreamTransformation[_] = {
+    mergeParameters()
+
+    val optimizedPlan = optimize(sink)
+    val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
+    translateNodeDag(Seq(optimizedPlan))
+    require(optimizedNodes.size() == 1)
+    translate(optimizedNodes.head)
+  }
+
+  /**
+    * Translates a [[StreamExecNode]] plan into a [[StreamTransformation]].
+    *
+    * @param node The plan to translate.
+    * @return The [[StreamTransformation]] of type [[BaseRow]].
+    */
+  private def translate(node: ExecNode[_, _]): StreamTransformation[_] = {
+    node match {
+      case node: StreamExecNode[_] => node.translateToPlan(this)
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message