spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christopher Nguyen <...@adatao.com>
Subject Re: Visitor function to RDD elements
Date Tue, 22 Oct 2013 21:16:45 GMT
Matt, it would be useful to back up one level to your problem statement. If
it is strictly restricted as described, then you have a sequential problem
that's not parallelizable. What is the primary design goal here? To
complete the operation in the shortest time possible ("big compute")? Or to
be able to handle very large data sets ("big memory")?  Or to ensure that
the operation completes in a fault-tolerant manner ("reliability")?

There are two paths from here:

   1. Finding parallelizable opportunities: there may be ways to squint at
   the problem in just the right way that provides a way to parallelize it:
      - Maybe you can come up with some algebra or approximations that
      allows for associativity, so that different partitions of the data can be
      operated on in parallel.
      - Perhaps the data is a time series where weekly or monthly chunks
      can be summarized in parallel and the sequential logic can be brought up
      several hierarchical levels.
      - Perhaps the statefulness of the visitor has a finite memory of past
      visits that you can take advantage of.
      2. Finding alternatives: it's important to realize that Spark's
   strength is in "big compute" and not in "big memory". It's only 1 of the 13
   dwarfs of parallel computing patterns, the map-reduce, shared-nothing model
   (cf. D. Patterson et al., "A View From Berkeley ...", under "Monte Carlo").
   It's a very successful model, but one that sometimes requires a refactoring
   of the algorithm/data to make it applicable. So if #1 above isn't at all
   possible, you might look into a "big memory" approach, such as Tachyon, or
   memcached, or even just reading a big file sequentially and applying your
   visitor to each data row, depending critically on what bottleneck you are
   engineering against.


--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Tue, Oct 22, 2013 at 12:28 PM, Matt Cheah <mcheah@palantir.com> wrote:

>  Hi everyone,
>
>  I have a driver holding a reference to an RDD. The driver would like to
> "visit" each item in the RDD in order, say with a visitor object that
> invokes visit(item) to modify that visitor's internal state. The visiting
> is not commutative (e.g. Visiting item A then B makes a different internal
> state from visiting item B then item A). Items in the RDD also are not
> necessarily distinct.
>
>  I've looked into accumulators which don't work because they require the
> operation to be commutative. Collect() will not work because the RDD is too
> large; in general, bringing the whole RDD into one partition won't work
> since the RDD is too large.
>
>  Is it possible to iterate over the items in an RDD in order without
> bringing the entire dataset into a single JVM at a time, and/or obtain
> chunks of the RDD in order on the driver? We've tried using the internal
> iterator() method. In some cases, we get a stack trace (running locally
> with 3 threads). I've included the stack trace below.
>
>  Thanks,
>
>  -Matt Cheah
>
>  org.apache.spark.SparkException: Error communicating with
> MapOutputTracker
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
> at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
> at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:39)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
> at
> org.apache.spark.api.java.JavaRDDLike$class.iterator(JavaRDDLike.scala:60)
> at org.apache.spark.api.java.JavaRDD.iterator(JavaRDD.scala:25)
> at
> com.palantir.finance.server.service.datatable.SparkRawDataTableProvider.compute(SparkRawDataTableProvider.java:76)
> at
> com.palantir.finance.server.datatable.spark.SparkDataTable.visit(SparkDataTable.java:83)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.runDataTableTest(DataTableImplementationParityTests.java:129)
> at
> com.palantir.finance.server.datatable.DataTableImplementationParityTests.testParityOnSort(DataTableImplementationParityTests.java:102)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at
> com.palantir.finance.commons.service.ServiceThreadContainerRule$1.evaluate(ServiceThreadContainerRule.java:28)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> at
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
> at
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10000] milliseconds
> at
> org.apache.spark.internal.akka.dispatch.DefaultPromise.ready(Future.scala:870)
> at
> org.apache.spark.internal.akka.dispatch.DefaultPromise.result(Future.scala:874)
> at org.apache.spark.internal.akka.dispatch.Await$.result(Future.scala:74)
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
> ... 46 more
>
>

Mime
View raw message