flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mikhail Lipkovich (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
Date Tue, 05 Sep 2017 13:20:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153629#comment-16153629
] 

Mikhail Lipkovich edited comment on FLINK-7567 at 9/5/17 1:19 PM:
------------------------------------------------------------------

Since DataStream.Iterate() already contains flag 'keepPartitioning' it seems for me that we
could align parallelism of input and feedback streams in this method when this flag is set
and it's possible to align it and to throw an exception with more clear message when it's
not. 
I'm a newbie here, so maybe I'm missing something important. What do you think [~aljoscha]
?


was (Author: mlipkovich):
Since DataStream.Iterate() already contains flag 'keepPartitioning' it seems for me that we
could align parallelism of input and feedback streams in this method when it's possible and
to throw an exception with more clear message when it's not. 
I'm a newbie here, so maybe I'm missing something important. What do you think [~aljoscha]
?

> DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-7567
>                 URL: https://issues.apache.org/jira/browse/FLINK-7567
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.2
>         Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>            Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     // do something silly just do get iteration going ...
>     val result = env.fromElements(1, 2, 3).iterate(it => {
>       (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
>     })
>     result.print()
>     env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream must match
the parallelism of the original stream. Parallelism of original stream: 1; parallelism of
feedback stream: 8
> 	at org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
> 	at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
> 	at org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
> 	at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> 	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> 	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> 	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error makes no
sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message