flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2828) Add interfaces for Table API input formats
Date Mon, 02 Nov 2015 14:20:27 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14985272#comment-14985272
] 

ASF GitHub Bot commented on FLINK-2828:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1237#discussion_r43631877
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/FieldBacktracker.scala
---
    @@ -0,0 +1,79 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.expressions.analysis
    +
    +import org.apache.flink.api.table.expressions.{Naming, ResolvedFieldReference}
    +import org.apache.flink.api.table.input.{AdaptiveTableSource, TableSource}
    +import org.apache.flink.api.table.plan._
    +
    +object FieldBacktracker {
    +
    +  /**
    +   * Tracks a field back to its Root and returns its original name and AdaptiveTableSource
    +   * if possible.
    +   * This only happens if the field is forwarded unmodified. Renaming operations are
reverted.
    +   *
    +   * @param op start operator
    +   * @param fieldName field name at start operator
    +   * @return original field name with corresponding AdaptiveTableSource
    +   */
    +  def resolveFieldNameAndTableSource(op: PlanNode, fieldName: String):
    +      Option[(AdaptiveTableSource, String)] = {
    +    op match {
    +      case s@Select(input, selection) =>
    +        var resolvedField: Option[(AdaptiveTableSource, String)] = None
    +        // only follow unmodified fields
    +        selection.foreach(expr => {
    +          if (resolvedField.isEmpty) {
    +            expr match {
    +              case ResolvedFieldReference(name, _) if name == fieldName =>
    +                resolvedField = resolveFieldNameAndTableSource(input, fieldName)
    +              case n@Naming(ResolvedFieldReference(oldName, _), newName) if newName ==
fieldName =>
    +                resolvedField = resolveFieldNameAndTableSource(input, oldName)
    +              case _ => // do nothing
    +            }
    +          }
    +        })
    --- End diff --
    
    But here again you will traverse the whole `selection` sequence. Is there any special
reason for it? Why not simply calculate the elements which are needed to find the first resolved
field? With Scala you have all the possibilities at hand and furthermore you get rid of the
`var` variable:
    
    ```
    val resolvedField = selection.view.flatMap{
      case ResolvedFieldReference(name, _) if name == fieldName =>
        resolvedField = resolveFieldNameAndTableSource(input, fieldName)
      case n@Naming(ResolvedFieldReference(oldName, _), newName) if newName == fieldName =>
        resolvedField = resolveFieldNameAndTableSource(input, oldName)
      case _ => // do nothing
    }.headOption
    ```


> Add interfaces for Table API input formats
> ------------------------------------------
>
>                 Key: FLINK-2828
>                 URL: https://issues.apache.org/jira/browse/FLINK-2828
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>
> In order to support input formats for the Table API, interfaces are necessary. I propose
two types of TableSources:
> - AdaptiveTableSources can adapt their output to the requirements of the plan. Although
the output schema stays the same, the TableSource can react on field resolution and/or predicates
internally and can return adapted DataSet/DataStream versions in the "translate" step.
> - StaticTableSources are an easy way to provide the Table API with additional input formats
without much implementation effort (e.g. for fromCsvFile())
> TableSources need to be deeply integrated into the Table API.
> The TableEnvironment requires a newly introduced AbstractExecutionEnvironment (common
super class of all ExecutionEnvironments for DataSets and DataStreams).
> Here's what a TableSource can see from more complicated queries:
> {code}
> getTableJava(tableSource1)
>   .filter("a===5 || a===6")
>   .select("a as a4, b as b4, c as c4")
>   .filter("b4===7")
>   .join(getTableJava(tableSource2))
>   .where("a===a4 && c==='Test' && c4==='Test2'")
> // Result predicates for tableSource1:
> //  List("a===5 || a===6", "b===7", "c==='Test2'")
> // Result predicates for tableSource2:
> //  List("c==='Test'")
> // Result resolved fields for tableSource1 (true = filtering, false=selection):
> //  Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), ("c", true))
> // Result resolved fields for tableSource2 (true = filtering, false=selection):
> //  Set(("a", true), ("c", true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message