flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] hequn8128 commented on a change in pull request #8057: [FLINK-12028][table] Add `addColumns`,`renameColumns`, `dropColumns` …
Date Thu, 28 Mar 2019 05:33:55 GMT
hequn8128 commented on a change in pull request #8057: [FLINK-12028][table] Add `addColumns`,`renameColumns`,
`dropColumns` …
URL: https://github.com/apache/flink/pull/8057#discussion_r269861826
 
 

 ##########
 File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##########
 @@ -451,6 +450,117 @@ class TableImpl(
     }
     tableName
   }
+
+  override def addColumns(fields: String): Table = {
+    addColumns(ExpressionParser.parseExpressionList(fields): _*);
+  }
+
+  override def addColumns(fields: Expression*): Table = {
+    addColumns(false, fields: _*)
+  }
+
+  override def addColumns(replaceIfExist: Boolean, fields: String): Table = {
+    addColumns(replaceIfExist, ExpressionParser.parseExpressionList(fields): _*)
+  }
+
+  override def addColumns(replaceIfExist: Boolean, fields: Expression*): Table = {
+
+    val aggNames = extractAggregationsAndProperties(
+      fields.map(expressionBridge.bridge), tableEnv)._1
+
+    if(aggNames.nonEmpty){
+      throw new TableException(
+        s"The added field expression cannot be an aggregations, find [${aggNames.head}].")
+    }
+
+    val childFields = logicalPlan.output.map(a => UnresolvedFieldReference(a.name))
+
+    if (replaceIfExist) {
+
+      val finalFields = new ListBuffer[Expression]()
+      val addFields = fields.map(expressionBridge.bridge)
+      childFields.foreach(e => finalFields.append(e))
+
+      // replace field if exist.
+      addFields.foreach {
+        case e@Alias(_, name, _) =>
+          val index = finalFields.indexWhere(p => p match {
+            case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(name)
+            case a: Alias => a.name.equalsIgnoreCase(name)
+            case _ => false
+          })
+          if (index >= 0) {
+            finalFields(index) = e
+          } else {
+            finalFields.append(e)
+          }
+        case e =>
+          throw new TableException(
+            s"Should add an alias to the [$e], if replaceIfExist is true.")
+      }
+      select(finalFields: _*)
+    } else {
+      select(childFields ++ fields:_*)
+    }
+  }
+
+  override def renameColumns(fields: String): Table = {
+    renameColumns(ExpressionParser.parseExpressionList(fields): _*)
+  }
+
+  override def renameColumns(fields: Expression*): Table = {
+
+    val childFields = logicalPlan.output.map(a => UnresolvedFieldReference(a.name))
+    val finalFields = childFields.map(e => e.asInstanceOf[Expression]).toArray
+
+    // Rename existing fields
+    fields.map(expressionBridge.bridge).foreach {
+      case e@Alias(child: UnresolvedFieldReference, _, _) =>
+        val index = finalFields.indexWhere(p => p match {
+          case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(child.name)
+          case _ => false
+        })
+        if (index >= 0) {
+          finalFields(index) = e
+        } else {
+          throw new TableException(s"Rename field [${child.name}] does not exist in source
table.")
+        }
+      case e =>
+        throw new TableException(
+          s"Unexpected field expression type [$e]. " +
+            s"Renaming must add an alias to the original field, e.g., a as a1.")
+    }
+    select(finalFields: _*)
+  }
+
+  override def dropColumns(fields: String): Table = {
+    dropColumns(ExpressionParser.parseExpressionList(fields): _*)
+  }
+
+  override def dropColumns(fields: Expression*): Table = {
+    val childFields = logicalPlan.output.map(a => UnresolvedFieldReference(a.name))
+    val dropFields = fields.map(expressionBridge.bridge)
+
+    val finalFields = childFields.toBuffer
+
+    // Remove the fields which should be delete in the final list
 
 Review comment:
   should be deleted

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message