flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-10100) Optimizer pushes partitioning past Null-Filter
Date Wed, 08 Aug 2018 09:17:00 GMT
Fabian Hueske created FLINK-10100:
-------------------------------------

             Summary: Optimizer pushes partitioning past Null-Filter
                 Key: FLINK-10100
                 URL: https://issues.apache.org/jira/browse/FLINK-10100
             Project: Flink
          Issue Type: Bug
          Components: DataSet API, Optimizer
    Affects Versions: 1.5.2, 1.4.2, 1.3.3, 1.6.0, 1.7.0
            Reporter: Fabian Hueske


The DataSet optimizer pushes certain operations like partitioning or sorting past Filter operators.
It does that because it knows that a {{FilterFunction}} cannot modify the records but only
indicate whether a record should be forwarded or not. 

However, this causes problems if the filter should remove records with null keys. In that
case, the partitioning can be pushed past the filter such that the partitioner has to deal
with null keys. This can fail with a {{NullPointerException}}.

The following code produces an affected plan.

{code}
List<Row> rowList = new ArrayList<>();
rowList.add(Row.of(null, 1L));
rowList.add(Row.of(2L, 2L));
rowList.add(Row.of(2L, 2L));
rowList.add(Row.of(3L, 3L));
rowList.add(Row.of(null, 3L));

DataSet<Row> rows = env.fromCollection(rowList, Types.ROW(Types.LONG, Types.LONG));

DataSet<Long> result = rows
  .filter(r -> r.getField(0) != null)
    .setParallelism(4)
  .groupBy(0)
  .reduceGroup((Iterable<Row> vals, Collector<Long> out) -> {
      long cnt = 0L;
      for(Row v : vals) { cnt++; }
        out.collect(cnt);
      }).returns(Types.LONG)
        .setParallelism(4);

result.output(new DiscardingOutputFormat());
System.out.println(env.getExecutionPlan());
{code}

To resolve the problem, we could remove the field-forward property of {{FilterFunction}}.
In general, it is typically more efficient to filter before shipping or sorting data. So this
might also improve the performance of certain plans.

As a *workaround* until this bug is fix, users can implement the filter with a {{FlatMapFunction}}.
{{FlatMapFunction}} is a more generic interface and the optimizer cannot automatically infer
how the function behaves and won't push partitionings or sorts past the function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message