spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christopher Nguyen <...@adatao.com>
Subject Re: Visitor function to RDD elements
Date Tue, 22 Oct 2013 21:23:07 GMT
For better precision,

s/Or to be able to handle very large data sets ("big memory")/Or to be able
to hold very large data sets in one place ("big memory")/g

--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 2:16 PM, Christopher Nguyen <ctn@adatao.com> wrote:

> Matt, it would be useful to back up one level to your problem statement.
> If it is strictly restricted as described, then you have a sequential
> problem that's not parallelizable. What is the primary design goal here? To
> complete the operation in the shortest time possible ("big compute")? Or to
> be able to handle very large data sets ("big memory")?  Or to ensure that
> the operation completes in a fault-tolerant manner ("reliability")?
>
> There are two paths from here:
>
>    1. Finding parallelizable opportunities: there may be ways to squint
>    at the problem in just the right way that provides a way to parallelize it:
>       - Maybe you can come up with some algebra or approximations that
>       allows for associativity, so that different partitions of the data can be
>       operated on in parallel.
>       - Perhaps the data is a time series where weekly or monthly chunks
>       can be summarized in parallel and the sequential logic can be brought up
>       several hierarchical levels.
>       - Perhaps the statefulness of the visitor has a finite memory of
>       past visits that you can take advantage of.
>       2. Finding alternatives: it's important to realize that Spark's
>    strength is in "big compute" and not in "big memory". It's only 1 of the 13
>    dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
>    (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
>    It's a very successful model, but one that sometimes requires a refactoring
>    of the algorithm/data to make it applicable. So if #1 above isn't at all
>    possible, you might look into a "big memory" approach, such as Tachyon, or
>    memcached, or even just reading a big file sequentially and applying your
>    visitor to each data row, depending critically on what bottleneck you are
>    engineering against.
>
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mcheah@palantir.com> wrote:
>
>>  Hi everyone,
>>
>>  I have a driver holding a reference to an RDD. The driver would like to
>> "visit" each item in the RDD in order, say with a visitor object that
>> invokes visit(item) to modify that visitor's internal state. The visiting
>> is not commutative (e.g. Visiting item A then B makes a different internal
>> state from visiting item B then item A). Items in the RDD also are not
>> necessarily distinct.
>>
>>  I've looked into accumulators which don't work because they require the
>> operation to be commutative. Collect() will not work because the RDD is too
>> large; in general, bringing the whole RDD into one partition won't work
>> since the RDD is too large.
>>
>>  Is it possible to iterate over the items in an RDD in order without
>> bringing the entire dataset into a single JVM at a time, and/or obtain
>> chunks of the RDD in order on the driver? We've tried using the internal
>> iterator() method. In some cases, we get a stack trace (running locally
>> with 3 threads). I've included the stack trace below.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>>  org.apache.spark.SparkException: Error communicating with
>> MapOutputTracker
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>> at
>> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>> at
>> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>> at
>> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
>> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
>> at
>> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
>> at
>> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
>> at
>> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at
>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>> at
>> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
>> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at
>> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
>> at
>> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
>> at
>> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [10000] milliseconds
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
>> at
>> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
>> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
>> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>> ... 46 more
>>
>>
>

Mime
View raw message