spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Holden Karau <hol...@pigscanfly.ca>
Subject Re: Spark reduce serialization question
Date Mon, 07 Mar 2016 04:51:12 GMT
You might want to try treeAggregate

On Sunday, March 6, 2016, Takeshi Yamamuro <linguin.m.s@gmail.com> wrote:

> Hi,
>
> I'm not exactly sure what's your codes like though, ISTM this is a correct
> behaviour.
> If the size of data that a driver fetches exceeds the limit, the driver
> throws this exception.
> (See
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L68
> )
> So, in your case, your driver tries to fetch 1345.5 MB data of 4 models
> from executors, then it fails.
> Thanks,
>
> On Sat, Mar 5, 2016 at 6:11 AM, James Jia <jamesjia@berkeley.edu
> <javascript:_e(%7B%7D,'cvml','jamesjia@berkeley.edu');>> wrote:
>
>> I'm running a distributed KMeans algorithm with 4 executors.
>>
>> I have a RDD[Data]. I use mapPartition to run a learner on each data partition, and
then call reduce with my custom model reduce function to reduce the result of the model to
start a new iteration.
>>
>> The model size is around ~330 MB. I would expect that the required memory for the
serialized result at the driver to be at most 2*300 MB in order to reduce two models, but
it looks like Spark is serializing all of the models to the driver before reducing.
>>
>> The error message says that the total size of the serialized results is 1345.5MB,
which is approximately 4 * 330 MB. I know I can set the driver's max result size, but I just
want to confirm that this is expected behavior.
>>
>> Thanks!
>>
>> James
>>
>> Stage 0:==============>                                            (1 + 3) / 4]16/02/19
05:59:28 ERROR TaskSetManager: Total size of serialized results of 4 tasks (1345.5 MB) is
bigger than spark.driver.maxResultSize (1024.0 MB)
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Total size of
serialized results of 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0
MB)
>>
>>   at org.apache.spark.scheduler.DAGScheduler.org <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>>
>>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>>
>>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>>
>>   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>
>>   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>>
>>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>
>>   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>
>>   at scala.Option.foreach(Option.scala:257)
>>
>>   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>>
>>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>>
>>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>>
>>   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>>
>>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>>
>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>>
>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>>
>>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)
>>
>>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>
>>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>
>>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>>
>>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)
>>
>>   at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)
>>
>>   ... 50 elided
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau

Mime
View raw message