flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mikhail Lipkovich (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7567) DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
Date Tue, 05 Sep 2017 11:49:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153521#comment-16153521
] 

Mikhail Lipkovich commented on FLINK-7567:
------------------------------------------

Hi Peter,
this error occurs due to this task https://issues.apache.org/jira/browse/FLINK-2398
It's not allowed now to have input and feedback streams of different parallel level. What
you can do in your particular example is to change parallelism of the feedback stream:

{code:java}
it => {
      (it.filter(_ > 0).map(_ - 1).setParallelism(1), it.filter(_ > 0).map(_ => 'x')
{code}

Probably we should at least document that parameter keepPartitioning of DataStream.iterate
is ignored


> DataStream#iterate() on env.fromElements() / env.fromCollection() does not work
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-7567
>                 URL: https://issues.apache.org/jira/browse/FLINK-7567
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.2
>         Environment: OS X 10.12.6, Oracle JDK 1.8.0_144, Flink 1.3.2
>            Reporter: Peter Ertl
>
> When I try to execute this simple snippet of code
> {code}
>   @Test
>   def iterateOnElements(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     // do something silly just do get iteration going ...
>     val result = env.fromElements(1, 2, 3).iterate(it => {
>       (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x'))
>     })
>     result.print()
>     env.execute()
>   }
> {code}
> I get the following exception:
> {code}
> java.lang.UnsupportedOperationException: Parallelism of the feedback stream must match
the parallelism of the original stream. Parallelism of original stream: 1; parallelism of
feedback stream: 8
> 	at org.apache.flink.streaming.api.transformations.FeedbackTransformation.addFeedbackEdge(FeedbackTransformation.java:87)
> 	at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:77)
> 	at org.apache.flink.streaming.api.scala.DataStream.iterate(DataStream.scala:519)
> 	at atp.analytics.CassandraReadTest.iterate2(CassandraReadTest.scala:134)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> 	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> 	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> 	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> 	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
> Since is just the simplest iterating stream setup I could imagine this error makes no
sense to me :-P



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message