flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Johannes Günther (JIRA) <j...@apache.org>
Subject [jira] [Reopened] (FLINK-2152) Provide zipWithIndex utility in flink-contrib
Date Thu, 23 Jul 2015 19:52:04 GMT

     [ https://issues.apache.org/jira/browse/FLINK-2152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Johannes Günther reopened FLINK-2152:
-------------------------------------

When Using zip with index in a longer running job the following error occurs:


ava.lang.Exception: The user defined 'open()' method caused an exception: null
	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
	at java.util.ArrayList.sort(ArrayList.java:1456)
	at java.util.Collections.sort(Collections.java:175)
	at org.apache.flink.api.java.utils.DataSetUtils$2.open(DataSetUtils.java:80)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:492)
	... 3 more
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:93)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: null
	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
	at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:366)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
	at java.util.ArrayList.sort(ArrayList.java:1456)
	at java.util.Collections.sort(Collections.java:175)
	at org.apache.flink.api.java.utils.DataSetUtils$2.open(DataSetUtils.java:80)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33)
	at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:492)
	... 3 more


This can be fixed by wrapping a concurrent list around the counts variable 
e.g. CopyOnWriteArrayList from java.util.concurrrent (in the open method)

> Provide zipWithIndex utility in flink-contrib
> ---------------------------------------------
>
>                 Key: FLINK-2152
>                 URL: https://issues.apache.org/jira/browse/FLINK-2152
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API
>            Reporter: Robert Metzger
>            Assignee: Andra Lungu
>            Priority: Trivial
>              Labels: starter
>             Fix For: 0.10
>
>
> We should provide a simple utility method for zipping elements in a data set with a dense
index.
> its up for discussion whether we want it directly in the API or if we should provide
it only as a utility from {{flink-contrib}}.
> I would put it in {{flink-contrib}}.
> See my answer on SO: http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message