flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Péter Szabó (JIRA) <j...@apache.org>
Subject [jira] [Commented] (FLINK-1986) Group by fails on iterative data streams
Date Tue, 12 May 2015 09:37:59 GMT

    [ https://issues.apache.org/jira/browse/FLINK-1986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14539589#comment-14539589
] 

Péter Szabó commented on FLINK-1986:
------------------------------------

The problem is that the the StreamIterationHead is not created, because only IterativeDataStream.transform(...)
can create it. groupBy() on an IterativeDataStream does not call transform(), therefore the
exception. All methods of DataStream that is supported for iterations and do not call transform()
should be overriden in IterativeDataStream in order to add the iteration head.

> Group by fails on iterative data streams
> ----------------------------------------
>
>                 Key: FLINK-1986
>                 URL: https://issues.apache.org/jira/browse/FLINK-1986
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Daniel Bali
>              Labels: iteration, streaming
>
> Hello!
> When I try to run a `groupBy` on an IterativeDataStream I get a NullPointerException.
Here is the code that reproduces the issue:
> {code}
> public Test() throws Exception {
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
>     DataStream<Tuple2<Long, Long>> edges = env
>             .generateSequence(0, 7)
>             .map(new MapFunction<Long, Tuple2<Long, Long>>() {
>                 @Override
>                 public Tuple2<Long, Long> map(Long v) throws Exception {
>                     return new Tuple2<>(v, (v + 1));
>                 }
>             });
>     IterativeDataStream<Tuple2<Long, Long>> iteration = edges.iterate();
>     SplitDataStream<Tuple2<Long, Long>> step = iteration.groupBy(1)
>             .map(new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>()
{
>                 @Override
>                 public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws
Exception {
>                     return tuple;
>                 }
>             })
>             .split(new OutputSelector<Tuple2<Long, Long>>() {
>                 @Override
>                 public Iterable<String> select(Tuple2<Long, Long> tuple)
{
>                     List<String> output = new ArrayList<>();
>                     output.add("iterate");
>                     return output;
>                 }
>             });
>     iteration.closeWith(step.select("iterate"));
>     env.execute("Sandbox");
> }
> {code}
> Moving the groupBy before the iteration solves the issue. e.g. this works:
> {code}
> ... iteration = edges.groupBy(1).iterate();
> iteration.map(...)
> {code}
> Here is the stack trace:
> {code}
> Exception in thread "main" java.lang.NullPointerException
> 	at org.apache.flink.streaming.api.graph.StreamGraph.addIterationTail(StreamGraph.java:207)
> 	at org.apache.flink.streaming.api.datastream.IterativeDataStream.closeWith(IterativeDataStream.java:72)
> 	at org.apache.flink.graph.streaming.example.Test.<init>(Test.java:73)
> 	at org.apache.flink.graph.streaming.example.Test.main(Test.java:79)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:601)
> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message