spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Cheah <mch...@palantir.com>
Subject Visitor function to RDD elements
Date Tue, 22 Oct 2013 19:28:43 GMT
Hi everyone,

I have a driver holding a reference to an RDD. The driver would like to "visit" each item
in the RDD in order, say with a visitor object that invokes visit(item) to modify that visitor's
internal state. The visiting is not commutative (e.g. Visiting item A then B makes a different
internal state from visiting item B then item A). Items in the RDD also are not necessarily
distinct.

I've looked into accumulators which don't work because they require the operation to be commutative.
Collect() will not work because the RDD is too large; in general, bringing the whole RDD into
one partition won't work since the RDD is too large.

Is it possible to iterate over the items in an RDD in order without bringing the entire dataset
into a single JVM at a time, and/or obtain chunks of the RDD in order on the driver? We've
tried using the internal iterator() method. In some cases, we get a stack trace (running locally
with 3 threads). I've included the stack trace below.

Thanks,

-Matt Cheah

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
at com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
at com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
at com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000] milliseconds
at org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
at org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
... 46 more


Mime
View raw message