flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] dianfu commented on a change in pull request #8561: [FLINK-12588][python] Add TableSchema for Python Table API.
Date Wed, 29 May 2019 11:56:42 GMT
dianfu commented on a change in pull request #8561: [FLINK-12588][python] Add TableSchema for
Python Table API.
URL: https://github.com/apache/flink/pull/8561#discussion_r288526645
 
 

 ##########
 File path: flink-python/pyflink/table/types.py
 ##########
 @@ -1273,6 +1318,190 @@ def _to_java_type(data_type):
         raise TypeError("Not supported type: %s" % data_type)
 
 
+_primitive_to_boxed_map = {'int': 'java.lang.Integer',
+                           'long': 'java.lang.Long',
+                           'byte': 'java.lang.Byte',
+                           'short': 'java.lang.Short',
+                           'char': 'java.lang.Character',
+                           'boolean': 'java.lang.Boolean',
+                           'float': 'java.lang.Float',
+                           'double': 'java.lang.Double'}
+
+
+def is_instance_of(java_data_type, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, basestring):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
+    else:
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return gateway.jvm.org.apache.flink.api.python.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_data_type)
+
+
+def _to_python_type(java_data_type_input):
+    gateway = get_gateway()
+
+    if is_instance_of(java_data_type_input, gateway.jvm.TypeInformation):
+        # input is TypeInformation
+        LegacyTypeInfoDataTypeConverter = \
+            gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
+        java_data_type = LegacyTypeInfoDataTypeConverter.toDataType(java_data_type_input)
+    else:
+        # input is DataType
+        java_data_type = java_data_type_input
+
+    # Atomic Type with parameters.
+    if is_instance_of(java_data_type, gateway.jvm.AtomicDataType):
+        logical_type = java_data_type.getLogicalType()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.CharType):
+            python_type = DataTypes.CHAR(logical_type.getLength(), logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.VarCharType):
+            python_type = DataTypes.VARCHAR(logical_type.getLength(), logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.BinaryType):
+            python_type = DataTypes.BINARY(logical_type.getLength(), logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.VarBinaryType):
+            python_type = DataTypes.VARBINARY(logical_type.getLength(), logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.DecimalType):
+            python_type = DataTypes.DECIMAL(logical_type.getPrecision(),
+                                            logical_type.getScale(),
+                                            logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TimeType):
+            python_type = DataTypes.TIME(logical_type.getPrecision(), logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TimestampType):
+            j_kind = logical_type.getKind()
+            kind = None
+            if j_kind == gateway.jvm.TimestampKind.REGULAR:
+                kind = TimestampKind.REGULAR
+            elif j_kind == gateway.jvm.TimestampKind.ROWTIME:
+                kind = TimestampKind.ROWTIME
+            elif j_kind == gateway.jvm.TimestampKind.PROCTIME:
+                kind = TimestampKind.PROCTIME
+            if kind is None:
+                raise Exception("not supported java timestamp kind %s" % j_kind)
+            python_type = DataTypes.TIMESTAMP(kind,
+                                              logical_type.getPrecision(),
+                                              logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.BooleanType):
+            python_type = DataTypes.BOOLEAN(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TinyIntType):
+            python_type = DataTypes.TINYINT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.SmallIntType):
+            python_type = DataTypes.SMALLINT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.IntType):
+            python_type = DataTypes.INT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.BigIntType):
+            python_type = DataTypes.BIGINT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.FloatType):
+            python_type = DataTypes.FLOAT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.DoubleType):
+            python_type = DataTypes.DOUBLE(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.DateType):
+            python_type = DataTypes.DATE(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TimeType):
+            python_type = DataTypes.TIME(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
+            raise \
+                TypeError("Not supported type: %s, ZonedTimestampType is not supported currently."
+                          % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.LocalZonedTimestampType):
+            raise \
+                TypeError("Not supported type: %s, LocalZonedTimestampType is not supported
"
+                          "currently." % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType):
+            raise \
+                TypeError("Not supported type: %s, DayTimeIntervalType is not supported currently."
+                          % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
+            raise \
+                TypeError("Not supported type: %s, YearMonthIntervalType is not supported
"
+                          "currently." % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.LegacyTypeInformationType):
+            type_info = logical_type.getTypeInformation()
+            BasicArrayTypeInfo = gateway.jvm.org.apache.flink.api.common.typeinfo.\
+                BasicArrayTypeInfo
+            if type_info == BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO:
+                python_type = DataTypes.ARRAY(DataTypes.STRING())
+            else:
+                raise TypeError("Not supported type: %s, it is recognized as a legacy type."
+                                % type_info)
+        else:
+            raise TypeError("Not supported type: %s, it is not supported in python type system"
+                            " currently" % java_data_type_input)
+
+        if conversion_clz is not None:
+            type_class_name = conversion_clz.getName()
+            if type_class_name in _primitive_to_boxed_map:
+                type_class_name = _primitive_to_boxed_map[type_class_name]
+            python_type.bridged_to(type_class_name)
+        return python_type
+
+    # Array Type, MultiSet Type.
+    elif is_instance_of(java_data_type, gateway.jvm.CollectionDataType):
+        logical_type = java_data_type.getLogicalType()
+        element_type = java_data_type.getElementDataType()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.ArrayType):
+            python_type = DataTypes.ARRAY(_to_python_type(element_type), logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.MultisetType):
+            python_type = DataTypes.MULTISET(_to_python_type(element_type),
+                                             logical_type.isNullable())
+        else:
+            raise TypeError("Not supported colletion data type: %s" % java_data_type_input)
+
+        if conversion_clz is not None:
+            python_type.bridged_to(conversion_clz.getName())
+        return python_type
+
+    # Map Type.
+    elif is_instance_of(java_data_type, gateway.jvm.KeyValueDataType):
+        logical_type = java_data_type.getLogicalType()
+        key_type = java_data_type.getKeyDataType()
+        value_type = java_data_type.getValueDataType()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.MapType):
+            python_type = DataTypes.MAP(
+                _to_python_type(key_type),
+                _to_python_type(value_type),
+                logical_type.isNullable())
+        else:
+            raise TypeError("Not supported map data type: %s" % java_data_type_input)
+
+        if conversion_clz is not None:
+            python_type.bridged_to(conversion_clz.getName())
+        return python_type
+
+    # Row Type.
+    elif is_instance_of(java_data_type, gateway.jvm.FieldsDataType):
+        logical_type = java_data_type.getLogicalType()
+        field_data_types = java_data_type.getFieldDataTypes()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.RowType):
+            fields = [DataTypes.FIELD(item,
+                                      _to_python_type(
+                                          field_data_types[item])) for item in field_data_types]
+            python_type = DataTypes.ROW(fields, logical_type.isNullable())
+        else:
+            raise TypeError("Not supported row data type: %s" % java_data_type_input)
+
+        if conversion_clz is not None:
+            python_type.bridged_to(conversion_clz.getName())
+        return python_type
+
+    # Unrecognized type.
+    else:
+        TypeError("Unsupported data type: %s" % java_data_type_input)
 
 Review comment:
   What about changing all the error message to "Unsupported data type: %s"?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message