mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pat Ferrel <...@occamsmachete.com>
Subject Re: spark-itemsimilarity scalability / Spark parallelism issues (SimilarityAnalysis.cooccurrencesIDSs)
Date Mon, 21 Aug 2017 18:40:25 GMT
Is it possible to add it to Mahout so as to get the unit tests run? If so we also have a bunch
of integration tests as well as my real-world data.

Again, I don’t see anything wrong with skipping zeros in any case but this method is known
to be slower for certain types of math (IIRC). So I’d bet the unit test will pass. But like
I said in the other answer we may need to figure out how to use this in the best way. First
is to measure it’s impact.

This is complicated by the GPU integration that is happening as we write, so those guys will
have to help. They have already seen benefits of GPUs drop with low density so this could
be the opposite case, bringing more benefit with less density.

As to being “post serialization” we should see your entire jobs logs but in most cases
nothing is done on a cluster without first once running through serialization—in other words
nothing can be done until the data is on the executors. There are lots of in-memory operations,
of course so in those cases there would be no serialization. Intuition says that if this does
speed things up it will be for CCO or, more generally, for certain types of matrices and that
itself may be a very important case.


On Aug 21, 2017, at 11:25 AM, Scruggs, Matt <matt.scruggs@bronto.com> wrote:

Good question :D

For the dataset I mentioned in my first message, the entire run is almost 10x faster (I expect
that speedup to be non-linear since it nearly eliminates a for loop...bigger gains for bigger
datasets). It's possible there are other sections of the code I can't override (e.g. before
serialization of the matrices) that could take advantage of the sparse matrix / vector APIs.

I didn't make these changes in Mahout itself. I created a custom Kryo deserializer within
my app to deserialize SparseRowMatrix instances to a custom subclass of SparseRowMatrix; this
new class overrides the AbstractMatrix implementation of apply(matrix, function). FWIW, all
of my app's existing tests pass and it produces the same results as before these changes.


Matt




On 8/21/17, 1:53 PM, "Pat Ferrel" <pat@occamsmachete.com> wrote:

> Interesting indeed. What is “massive”? Does the change pass all unit tests?
> 
> 
> On Aug 17, 2017, at 1:04 PM, Scruggs, Matt <matt.scruggs@bronto.com> wrote:
> 
> Thanks for the remarks guys!
> 
> I profiled the code running locally on my machine and discovered this loop is where these
setQuick() and getQuick() calls originate (during matrix Kryo deserialization), and as you
can see the complexity of this 2D loop can be very high:
> 
> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/AbstractMatrix.java#L240
> 
> 
> Recall that this algorithm uses SparseRowMatrix whose rows are SequentialAccessSparseVector,
so all this looping seems unnecessary. I created a new subclass of SparseRowMatrix that overrides
that assign(matrix, function) method, and instead of looping through all the columns of each
row, it calls SequentialAccessSparseVector.iterateNonZero() so it only has to touch the cells
with values. I also had to customize MahoutKryoRegistrator a bit with a new default serializer
for this new matrix class. This yielded a massive performance boost and I verified that the
results match exactly for several test cases and datasets. I realize this could have side-effects
in some cases, but I'm not using any other part of Mahout, only SimilaritAnalysis.cooccurrencesIDSs().
> 
> Any thoughts / comments?
> 
> 
> Matt
> 
> 
> 
> On 8/16/17, 8:29 PM, "Ted Dunning" <ted.dunning@gmail.com> wrote:
> 
>> It is common with large numerical codes that things run faster in memory on
>> just a few cores if the communication required outweighs the parallel
>> speedup.
>> 
>> The issue is that memory bandwidth is slower than the arithmetic speed by a
>> very good amount. If you just have to move stuff into the CPU and munch on
>> it a bit it is one thing, but if you have to move the data to CPU and back
>> to memory to distributed it around possibly multiple times, you may wind up
>> with something much slower than you would have had if you were to attack
>> the problem directly.
>> 
>> 
>> 
>> On Wed, Aug 16, 2017 at 4:47 PM, Pat Ferrel <pat@occamsmachete.com> wrote:
>> 
>>> This uses the Mahout blas optimizing solver, which I just use and do not
>>> know well. Mahout virtualizes some things having to do with partitioning
>>> and I’ve never quite understood how they work. There is a .par() on one of
>>> the matrix classes that has a similar function to partition but in all
>>> cases I’ve used .par(auto) and use normal spark repartitioning based on
>>> parallelism. Mahout implements a mapBlock function, which (all things being
>>> equal) looks at a partition at a time in memory. The reduce is not part of
>>> the code I wrote.
>>> 
>>> The reduce is not part of the code I wrote. Maybe someone else can explain
>>> what blas is doing.
>>> 
>>> BTW hashmap is O(log n) on average for large n—caveats apply. We use
>>> fastutils for many things (I thought this was one case) because they are
>>> faster than JVM implementations but feel free to dig in further. We use
>>> downsampling to maintain an overall rough O(n) calculation speed where n =
>>> # rows (users). As the model gets more dense there are greatly diminishing
>>> returns for the density so after the elements per row threshold is reached
>>> we don’t use more in the model creation math.
>>> 
>>> Still feel free to dig into what the optimizer is doing.
>>> 
>>> 
>>> On Aug 15, 2017, at 11:13 AM, Scruggs, Matt <matt.scruggs@bronto.com>
>>> wrote:
>>> 
>>> Thanks Pat, that's good to know!
>>> 
>>> This is the "reduce" step (which gets its own stage in my Spark
>>> jobs...this stage takes almost all the runtime) where most of the work is
>>> being done, and takes longer the more shuffle partitions there are
>>> (relative to # of CPUs):
>>> 
>>> https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4
>>> 5863df3e53/spark/src/main/scala/org/apache/mahout/
>>> sparkbindings/blas/AtA.scala#L258
>>> 
>>> 
>>> Why does the runtime of this reduce stage (that ultimately calls
>>> SequentialAccessSparseVector.getQuick() and setQuick() a lot) depend on
>>> the ratio of (# Spark CPUs / spark.sql.shuffle.partitions)? Essentially
>>> that ratio determines how many "chunks" of shuffle partition (reduce) tasks
>>> must run, and each of those chunks always takes the same amount of time, so
>>> the stage finishes in less time when that ratio is low (preferably 1).
>>> 
>>> EXAMPLES - using 32 cores and 200 shuffle partitions, this stage requires
>>> ceil(145 tasks / 32 cores) = 5 "chunks" of work (145 tasks instead of 200
>>> because of the estimateProductPartitions call in AtA). Each chunk takes ~8
>>> minutes, so (5 chunks * 8 min) = ~40 mins. For a job with 32 cores and 32
>>> shuffle partitions (same CPU resources, still 100% utilized), this stage
>>> requires only ceil(23 tasks / 32 cores) = 1 chunk of work, which takes the
>>> same 8 minutes, so the job finishes ~5x faster. You can take this to the
>>> extreme with just 1 core and 1 shuffle partition, and the stage still takes
>>> the same amount of time! I'd love to know if you can reproduce this
>>> behavior.
>>> 
>>> This goes against most advice and experience I've had with Spark, where
>>> you want to *increase* your partitioning in many cases (or at least leave
>>> it at the default 200, not lower it dramatically) to utilize CPUs better
>>> (and shrink each individual partition's task). There seems to be no
>>> reduction in computational complexity *per task* (within this stage I'm
>>> talking about) even with high values for spark.sql.shuffle.partitions (so
>>> it seems the data isn't actually being partitioned by the shuffle process).
>>> Refer back to the timings w/various configs in my first message.
>>> 
>>> Also...is there a possibility of using a faster hash-based implementation
>>> instead of the setQuick() / getQuick() methods of
>>> SequentialAccessSparseVector? The javadoc on those methods mentions they
>>> shouldn't be used unless absolutely necessary due to their O(log n)
>>> complexity.
>>> 
>>> 
>>> Thanks for your time...this is fun stuff!
>>> Matt
>>> 
>>> 
>>> 
>>> On 8/15/17, 10:15 AM, "Pat Ferrel" <pat@occamsmachete.com> wrote:
>>> 
>>>> Great, this is the best way to use the APIs. The big win with CCO, the
>>> algo you are using is with multiple user actions. Be aware that when you go
>>> to this methods the input IndexedDatasets must be coerced to have
>>> compatible dimensionality, in this case the primary action defines the
>>> user-set used in calculating the model—not the one for making queries,
>>> which can use anonymous user  history. But that is for later and outside
>>> Mahout.
>>>> 
>>>> 1) 4x max parallelism is a rule of thumb since the cores may not need
>>> 100% duty cycle, if they are already at 100% the 4x does no good. 2) you
>>> have found a long running task but there will always be one, if it weren’t
>>> this one it would be another. Different types of tasks use resources
>>> differently. For instance the collects, which must eventually use a the
>>> memory of the Driver to instantiate an in-memory data structure. There is
>>> no magic choice to make this work differently but it avoid several joins,
>>> which are much slower.
>>>> 
>>>> I’m not quite sure what your question is.
>>>> 
>>>> 
>>>> On Aug 15, 2017, at 6:21 AM, Scruggs, Matt <matt.scruggs@bronto.com>
>>> wrote:
>>>> 
>>>> Hi Pat,
>>>> 
>>>> I've taken some screenshots of my Spark UI to hopefully shed some light
>>> on the behavior I'm seeing. Do you mind if I send you a link via direct
>>> email (would rather not post it here)? It's just a shared Dropbox folder.
>>>> 
>>>> 
>>>> Thanks,
>>>> Matt
>>>> 
>>>> 
>>>> 
>>>> On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scruggs@bronto.com> wrote:
>>>> 
>>>>> I'm running a custom Scala app (distributed in a shaded jar) directly
>>> calling SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI.
>>>>> 
>>>>> The input data already gets explicitly repartitioned to spark.cores.max
>>> (defaultParallelism) in our code. I'll try increasing that by the factor of
>>> 4 that you suggest, but all our cores are already utilized so I'm not sure
>>> that will help. It gets bogged down in the post-shuffle (shuffle read /
>>> combine / reduce) phase even with all cores busy the whole time, which is
>>> why I've been playing around with various values for
>>> spark.sql.shuffle.partitions. The O(log n) operations I mentioned seem to
>>> take >95% of runtime.
>>>>> 
>>>>> Thanks,
>>>>> Matt
>>>>> ________________________________
>>>>> From: Pat Ferrel <pat@occamsmachete.com>
>>>>> Sent: Monday, August 14, 2017 11:02:42 PM
>>>>> To: user@mahout.apache.org
>>>>> Subject: Re: spark-itemsimilarity scalability / Spark parallelism
>>> issues (SimilarityAnalysis.cooccurrencesIDSs)
>>>>> 
>>>>> Are you using the CLI? If so it’s likely that there is only one
>>> partition of the data. If you use Mahout in the Spark shell or using it as
>>> a lib, do a repartition on the input data before passing it into
>>> SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to
>>> start with and set max parallelism for spark to the same. The CLI isn’t
>>> really production worthy, just for super easy experiments with CSVs.
>>>>> 
>>>>> 
>>>>> On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scruggs@bronto.com>
>>> wrote:
>>>>> 
>>>>> Howdy,
>>>>> 
>>>>> I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small
>>> dataset (about 870k [user, item] rows in the primary action IDS…no cross
>>> co-occurrence IDS) and I noticed it scales strangely. This is with Mahout
>>> 0.13.0 although the same behavior happens in 0.12.x as well (haven't tested
>>> it before that).
>>>>> 
>>>>> TLDR - regardless of the Spark parallelism (CPUs) I throw at this
>>> routine, every Spark task within the final / busy stage seems to take the
>>> same amount of time, which leads me to guess that every shuffle partition
>>> contains the same amount of data (perhaps the full dataset matrix in
>>> shape/size, albeit with different values). I'm reaching out to see if this
>>> is a known algorithmic complexity issue in this routine, or if my config is
>>> to blame (or both).
>>>>> 
>>>>> Regarding our hardware, we have identical physical machines in a Mesos
>>> cluster with 6 workers and a few masters. Each worker has ~500GB of SSD, 32
>>> cores and 128g RAM. We run lots of Spark jobs and have generally ironed out
>>> the kinks in terms of hardware and cluster config, so I don't suspect any
>>> hardware-related issues.
>>>>> 
>>>>> Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this
>>> dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20,
>>> randomSeed = default, parOpts = default (there's lots of other Spark
>>> config, this is just what I'm varying to check for effects). In particular,
>>> notice how the ratio of (spark.sql.shuffle.partitions / spark.cores.max)
>>> affects the runtime:
>>>>> 
>>>>> * 8 executors w/8 cores each, takes about 45 minutes
>>>>> * note that spark.sql.shuffle.partitions > spark.cores.max
>>>>> spark.cores.max = 64
>>>>> spark.executor.cores = 8
>>>>> spark.sql.shuffle.partitions = 200 (default)
>>>>> 
>>>>> * 1 executors w/24 cores, takes about 65 minutes
>>>>> * note that spark.sql.shuffle.partitions >>> spark.cores.max
>>>>> spark.cores.max = 24
>>>>> spark.executor.cores = 24
>>>>> spark.sql.shuffle.partitions = 200 (default)
>>>>> 
>>>>> * 1 executor w/8 cores, takes about 8 minutes
>>>>> * note that spark.sql.shuffle.partitions = spark.cores.max
>>>>> spark.cores.max = 8
>>>>> spark.executor.cores = 8 (1 executor w/8 cores)
>>>>> spark.sql.shuffle.partitions = 8
>>>>> 
>>>>> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
>>>>> * note that spark.sql.shuffle.partitions = spark.cores.max
>>>>> spark.cores.max = 24
>>>>> spark.executor.cores = 24 (1 executor w/24 cores)
>>>>> spark.sql.shuffle.partitions = 24
>>>>> 
>>>>> * 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!)
>>>>> * note that spark.sql.shuffle.partitions = spark.cores.max
>>>>> spark.cores.max = 64
>>>>> spark.executor.cores = 2
>>>>> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage)
>>>>> 
>>>>> Adjusting the "maxNumInteractions" parameter down to 100 and 50 results
>>> in a minor improvement (5-10%). I've also played around with removing
>>> [user, item] rows from the input dataset for users with only 1
>>> interaction…I read to try that in another thread…that yielded maybe a
>>> 40-50% speed improvement, but I'd rather not toss out data (unless it truly
>>> is totally useless, of course :D ).
>>>>> 
>>>>> When I look at the thread dump within the Spark UI's Executors ->
>>> thread dump pages, it seems all the executors are very busy in the code
>>> pasted below for >95% of the run. GC throughput is very good so we're not
>>> bogged down there...it's just super busy doing running the code below. I am
>>> intrigued about the comments on the SequentialAccessSparseVector methods I
>>> see being called (getQuick and setQuick), which state they take O(log n)
>>> time (https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b4
>>> 5863df3e53/math/src/main/java/org/apache/mahout/math/
>>> SequentialAccessSparseVector.java).
>>>>> 
>>>>> 
>>>>> Thanks all for your time and feedback!
>>>>> 
>>>>> Matt Scruggs
>>>>> 
>>>>> org.apache.mahout.math.OrderedIntDoubleMapping.find(
>>> OrderedIntDoubleMapping.java:105)
>>>>> org.apache.mahout.math.OrderedIntDoubleMapping.get(
>>> OrderedIntDoubleMapping.java:110)
>>>>> org.apache.mahout.math.SequentialAccessSparseVector.getQuick(
>>> SequentialAccessSparseVector.java:157)
>>>>> org.apache.mahout.math.SparseRowMatrix.getQuick(
>>> SparseRowMatrix.java:90)
>>>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
>>>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$
>>> eq(MatrixOps.scala:45)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>>>>> org.apache.spark.util.collection.AppendOnlyMap.
>>> changeValue(AppendOnlyMap.scala:144)
>>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(
>>> SizeTrackingAppendOnlyMap.scala:32)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
>>> ExternalAppendOnlyMap.scala:163)
>>>>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(
>>> BlockStoreShuffleReader.scala:85)
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:79)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:47)
>>>>> org.apache.spark.scheduler.Task.run(Task.scala:86)
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1142)
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:617)
>>>>> java.lang.Thread.run(Thread.java:745)
>>>>> 
>>>>> ……or this code……
>>>>> 
>>>>> org.apache.mahout.math.SparseRowMatrix.setQuick(
>>> SparseRowMatrix.java:105)
>>>>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
>>>>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$
>>> eq(MatrixOps.scala:45)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:
>>> 258)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$
>>> anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>>>>> org.apache.spark.util.collection.AppendOnlyMap.
>>> changeValue(AppendOnlyMap.scala:144)
>>>>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(
>>> SizeTrackingAppendOnlyMap.scala:32)
>>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(
>>> ExternalAppendOnlyMap.scala:163)
>>>>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>>>>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(
>>> BlockStoreShuffleReader.scala:85)
>>>>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(
>>> MapPartitionsRDD.scala:38)
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:79)
>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(
>>> ShuffleMapTask.scala:47)
>>>>> org.apache.spark.scheduler.Task.run(Task.scala:86)
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1142)
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:617)
>>>>> java.lang.Thread.run(Thread.java:745)
>>>>> 
>>>> 
>>> 
>>> 
> 


Mime
View raw message