spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tom Vacek <minnesota...@gmail.com>
Subject Re: Visitor function to RDD elements
Date Tue, 22 Oct 2013 21:26:11 GMT
Unfortunately, I think you're going to either have to fly a lot of data
around or create a lot of garbage.


On Tue, Oct 22, 2013 at 3:36 PM, Patrick Wendell <pwendell@gmail.com> wrote:

> Hey Matt,
>
> It seems like you are trying to perform an operation that just isn't
> parrallelizable. In that case, it's going to be tricky without collecting
> the entire dataset on one node.
>
> Spark does not expose an iterator like you are suggesting, that lets you
> traverse an RDD. You could build one yourself though by collecting one
> partition at a time at the driver, thought this would require some lower
> level understanding of Spark.
>
> - Patrick
>
>
>
> On Tue, Oct 22, 2013 at 1:02 PM, Matt Cheah <mcheah@palantir.com> wrote:
>
>>  In this context, it would be able to create a visitor mapping for each
>> partition. However, I'm looking for the ability to use a single visitor
>> object that will walk over all partitions.
>>
>>  I suppose I could do this if I used coalesce() to combine everything to
>> one partition but that's too much memory in one partition. Am I
>> misinterpreting how to use it?
>>
>>   From: Mark Hamstra <mark@clearstorydata.com>
>> Reply-To: "user@spark.incubator.apache.org" <
>> user@spark.incubator.apache.org>
>> Date: Tuesday, October 22, 2013 12:51 PM
>> To: user <user@spark.incubator.apache.org>
>> Subject: Re: Visitor function to RDD elements
>>
>>   mapPartitions
>> mapPartitionsWithIndex
>>
>>  With care, you can use these and maintain the iteration order within
>> partitions.  Beware, though, that any reduce functions need to be
>> associative and commutative.
>>
>>
>> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mcheah@palantir.com> wrote:
>>
>>>  Hi everyone,
>>>
>>>  I have a driver holding a reference to an RDD. The driver would like
>>> to "visit" each item in the RDD in order, say with a visitor object that
>>> invokes visit(item) to modify that visitor's internal state. The visiting
>>> is not commutative (e.g. Visiting item A then B makes a different internal
>>> state from visiting item B then item A). Items in the RDD also are not
>>> necessarily distinct.
>>>
>>>  I've looked into accumulators which don't work because they require
>>> the operation to be commutative. Collect() will not work because the RDD is
>>> too large; in general, bringing the whole RDD into one partition won't work
>>> since the RDD is too large.
>>>
>>>  Is it possible to iterate over the items in an RDD in order without
>>> bringing the entire dataset into a single JVM at a time, and/or obtain
>>> chunks of the RDD in order on the driver? We've tried using the internal
>>> iterator() method. In some cases, we get a stack trace (running locally
>>> with 3 threads). I've included the stack trace below.
>>>
>>>  Thanks,
>>>
>>>  -Matt Cheah
>>>
>>>  org.apache.spark.SparkException: Error communicating with
>>> MapOutputTracker
>>> at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>> at
>>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>> at
>>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>>> at
>>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>>> at
>>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>>> at
>>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>>> at
>>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>> at
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>> at
>>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>> at
>>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>>> at
>>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>> after [10000] milliseconds
>>> at
>>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>> at
>>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>>> at
>>> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>> ... 46 more
>>>
>>>
>>
>

Mime
View raw message