flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ron Crocker (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-3697) keyBy() with nested POJO computes invalid field position indexes
Date Mon, 04 Apr 2016 19:11:25 GMT
Ron Crocker created FLINK-3697:

             Summary: keyBy() with nested POJO computes invalid field position indexes
                 Key: FLINK-3697
                 URL: https://issues.apache.org/jira/browse/FLINK-3697
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.0.0
         Environment: MacOS X 10.10
            Reporter: Ron Crocker
            Priority: Minor

Using named keys in keyBy() for nested POJO types results in failure. The iindexes for named
key fields are used inconsistently with nested POJO types. In particular, {{PojoTypeInfo.getFlatFields()}}
returns the field's position after (apparently) flattening the structure but is referenced
in the unflattened version of the POJO type by {{PojoTypeInfo.getTypeAt()}}.

In the example below, getFlatFields() returns positions 0, 1, and 14. These positions appear
correct in the flattened structure of the Data class. However, in {{KeySelector<X, Tuple>
getSelectorForKeys(Keys<X> keys, TypeInformation<X> typeInfo, ExecutionConfig
executionConfig)}}, a call to {{compositeType.getTypeAt(logicalKeyPositions[i])}} for the
third key results {{PojoTypeInfo.getTypeAt()}} declaring it out of range, as it compares the
length of the directly named fields of the object vs the length of flattened version of that

Concrete Example:
Consider this graph:
DataStream<TimesliceData> dataStream = see.addSource(new FlinkKafkaConsumer08<>(timesliceConstants.topic,
new DataDeserialzer(), kafkaConsumerProperties));

      .flatMap(new DataMapper())
      .keyBy("aaa", "abc", "wxyz")

{{DataDeserialzer}} returns a "NativeDataFormat" object; {{DataMapper}} takes this NativeDataFormat
object and extracts individual Data objects: {code}
public class Data {
    public int aaa;
    public int abc;
    public long wxyz;
    public int t1;
    public int t2;
    public Policy policy;
    public Stats stats;

    public Data() {}

A {{Policy}} object is an instance of this class:
public class AggregatableMetricStoragePolicy implements MetricStoragePolicy {
    public short a;
    public short b;
    public boolean c;
    public boolean d;

    public Policy() {}

A {{Stats}} object is an instance of this class:
public class Stats {
    public long count;
    public float a;
    public float b;
    public float c;
    public float d;
    public float e;

    public Stats() {}

This message was sent by Atlassian JIRA

View raw message