flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
Date Thu, 25 Jan 2018 13:54:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339239#comment-16339239
] 

ASF GitHub Bot commented on FLINK-8240:
---------------------------------------

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5240#discussion_r163847018
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/JSON.scala
---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.descriptors
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.TableSchema
    +
    +import scala.collection.mutable
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Encoding descriptor for JSON.
    +  */
    +class JSON extends EncodingDescriptor("json") {
    +
    +  private val encodingSchema: mutable.LinkedHashMap[String, String] =
    +      mutable.LinkedHashMap[String, String]()
    +  private var fieldMapping: Option[util.Map[String, String]] = None
    +  private var failOnMissingField: Option[Boolean] = None
    +
    +  /**
    +    * Sets the JSON schema with field names and the types for the JSON-encoded input.
    +    * The JSON schema must not contain nested fields.
    +    *
    +    * This method overwrites existing fields added with [[field()]].
    +    *
    +    * @param schema the table schema
    +    */
    +  def schema(schema: TableSchema): JSON = {
    +    this.encodingSchema.clear()
    +    NormalizedProperties.normalizeTableSchema(schema).foreach {
    +      case (n, t) => field(n, t)
    +    }
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type information for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type information of the field
    +    */
    +  def field(fieldName: String, fieldType: TypeInformation[_]): JSON = {
    +    field(fieldName, NormalizedProperties.normalizeTypeInfo(fieldType))
    +    this
    +  }
    +
    +  /**
    +    * Adds a JSON field with the field name and the type string for the JSON-encoding.
    +    * This method can be called multiple times. The call order of this method defines
    +    * also the order of the fields in the JSON-encoding.
    +    *
    +    * @param fieldName the field name
    +    * @param fieldType the type string of the field
    +    */
    +  def field(fieldName: String, fieldType: String): JSON = {
    +    if (encodingSchema.contains(fieldName)) {
    +      throw new IllegalArgumentException(s"Duplicate field name $fieldName.")
    +    }
    +    encodingSchema += (fieldName -> fieldType)
    +    this
    +  }
    +
    +  /**
    +    * Sets a mapping from schema fields to fields of the JSON schema.
    +    *
    +    * A field mapping is required if the fields of produced tables should be named different
than
    +    * the fields of the JSON records.
    +    * The key of the provided Map refers to the field of the table schema,
    +    * the value to the field in the JSON schema.
    +    *
    +    * @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
    +    * @return The builder.
    +    */
    +  def tableToJsonMapping(tableToJsonMapping: util.Map[String, String]): JSON = {
    +    this.fieldMapping = Some(tableToJsonMapping)
    +    this
    +  }
    +
    +  /**
    +    * Sets flag whether to fail if a field is missing or not.
    +    *
    +    * @param failOnMissingField If set to true, the operation fails if there is a missing
field.
    +    *                           If set to false, a missing field is set to null.
    +    * @return The builder.
    +    */
    +  def failOnMissingField(failOnMissingField: Boolean): JSON = {
    +    this.failOnMissingField = Some(failOnMissingField)
    +    this
    +  }
    +
    +  /**
    +    * Internal method for encoding properties conversion.
    +    */
    +  override protected def addEncodingProperties(properties: NormalizedProperties): Unit
= {
    +    properties.putIndexedFixedProperties(
    --- End diff --
    
    After an offline discussion, this approach does not seem to work. The best solution for
this is to use the JSON schema standard. We need to implement the parsing logic in a later
step.


> Create unified interfaces to configure and instatiate TableSources
> ------------------------------------------------------------------
>
>                 Key: FLINK-8240
>                 URL: https://issues.apache.org/jira/browse/FLINK-8240
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>
> At the moment every table source has different ways for configuration and instantiation.
Some table source are tailored to a specific encoding (e.g., {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}})
or only support one encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement
a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining common properties,
and instantiation. The {{TableSourceConverters}} provide a similar functionality but use an
external catalog. We might generialize this interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message