flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry
Date Fri, 10 Nov 2017 16:26:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247736#comment-16247736

Fabian Hueske commented on FLINK-7919:

Hi [~greghogan], yes that would be an option as well.

This would have the benefit of being more aligned with the existing API (outer joins have
their own API calls). The fact, that this issue hasn't been reported earlier also indicates
that it is usually used as an inner join.

> Join with Solution Set fails with NPE if Solution Set has no entry
> ------------------------------------------------------------------
>                 Key: FLINK-7919
>                 URL: https://issues.apache.org/jira/browse/FLINK-7919
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API, Local Runtime
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Fabian Hueske
> A job with a delta iteration fails hard with a NPE in the solution set join, if the solution
set has no entry for the join key of the probe side.
> The following program reproduces the problem:
> {code}
> DataSet<Tuple2<Long, Integer>> values = env.fromElements(
>   Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
> DeltaIteration<Tuple2<Long, Integer>, Tuple2<Long, Integer>> di = values
>   .iterateDelta(values, 5,0);
> DataSet<Tuple2<Long, Integer>> loop = di.getWorkset()
>   .map(new MapFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>>()
>     @Override
>     public Tuple2<Long, Integer> map(Tuple2<Long, Integer> value) throws
Exception {
>       // modifying the key to join on a non existing solution set key 
>       return Tuple2.of(value.f0 + 1, 1);
>     }
>   })
>   .join(di.getSolutionSet()).where(0).equalTo(0)
>   .with(new JoinFunction<Tuple2<Long, Integer>, Tuple2<Long, Integer>,
Tuple2<Long, Integer>>() {
>     @Override
>     public Tuple2<Long, Integer> join(
>       Tuple2<Long, Integer> first, 
>       Tuple2<Long, Integer> second) throws Exception {
>       return Tuple2.of(first.f0, first.f1 + second.f1);
>     }
>   });
> DataSet<Tuple2<Long, Integer>> result = di.closeWith(loop, loop);
> result.print();
> {code}
> It doesn't matter whether the solution set is managed or not. 
> The problem is cause because the solution set hash table prober returns a {{null}} value
if the solution set does not contain a value for the probe side key. 
> The join operator does not check if the return value is {{null}} or not but immediately
tries to create a copy using a {{TypeSerializer}}. This copy fails with a NPE.
> I propose to check for {{null}} and call the join function with {{null}} on the solution
set side. This gives OUTER JOIN semantics for join.
> Since the code was previously failing with a NPE, it is safe to forward the {{null}}
into the {{JoinFunction}}. 
> However, users must be aware that the solution set value may be {{null}} and we need
to update the documentation (JavaDocs + website) to describe the behavior.

This message was sent by Atlassian JIRA

View raw message