spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: Problem with take vs. takeSample in PySpark
Date Mon, 10 Aug 2015 17:59:11 GMT
I tested this in master (1.5 release), it worked as expected (changed
spark.driver.maxResultSize to 10m),

>>> len(sc.range(10).map(lambda i: '*' * (1<<23) ).take(1))
1
>>> len(sc.range(10).map(lambda i: '*' * (1<<24) ).take(1))
15/08/10 10:45:55 ERROR TaskSetManager: Total size of serialized
results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize
(10.0 MB)
>>> len(sc.range(10).map(lambda i: '*' * (1<<23) ).take(2))
15/08/10 10:46:04 ERROR TaskSetManager: Total size of serialized
results of 1 tasks (16.1 MB) is bigger than spark.driver.maxResultSize
(10.0 MB)

Could you reproduce this in 1.2?

We didn't change take() much since 1.2 (unable build 1.2 branch right
now, because of dependency changed)

On Mon, Aug 10, 2015 at 9:49 AM, David Montague <davwmont@gmail.com> wrote:
> Hi all,
>
> I am getting some strange behavior with the RDD take function in PySpark
> while doing some interactive coding in an IPython notebook.  I am running
> PySpark on Spark 1.2.0 in yarn-client mode if that is relevant.
>
> I am using sc.wholeTextFiles and pandas to load a collection of .csv files
> into an RDD of pandas dataframes. I create an RDD called train_rdd for which
> each row of the RDD contains a label and pandas dataframe pair:
>
> import pandas as pd
> from StringIO import StringIO
>
> rdd = sc.wholeTextFiles(data_path, 1000)
> train_rdd = rdd.map(lambda x: x[0], pd.read_csv(StringIO(x[1]))))
>
> In order to test out the next steps I want to take, I am trying to use take
> to select one of the dataframes and apply the desired modifications before
> writing out the Spark code to apply it to all of the dataframes in parallel.
>
> However, when I try to use take like this:
>
> label, df = train_rdd.take(1)[0]
>
> I get a spark.driver.maxResultSize error (stack trace included at the end of
> this message). Now, each of these dataframes is only about 100MB in size, so
> should easily fit on the driver and not go over the maxResultSize limit of
> 1024MB.
>
> If I instead use takeSample, though, there is no problem:
>
> label, df = train_rdd.takeSample(False, 1, seed=50)[0]
>
> (Here, I have set the seed so that the RDD that is selected is the same one
> that the take function is trying to load (i.e., the first one), just to
> ensure that it is not because the specific dataframe take is getting is too
> large.)
>
> Does calling take result in a collect operation being performed before
> outputting the first item? That would explain to me why this error is
> occurring, but that seems like poor behavior for the take function. Clearly
> takeSample is behaving the way I want it to, but it would be nice if I could
> get this behavior with the take function, or at least without needing to
> choose an element randomly. I was able to get the behavior I wanted above by
> just changing the seed until I got the dataframe I wanted, but I don't think
> that is a good approach in general.
>
> Any insight is appreciated.
>
> Best,
> David Montague
>
>
>
>
> ---------------------------------------------------------------------------
> Py4JJavaError                             Traceback (most recent call last)
> <ipython-input-38-7eca647cba46> in <module>()
>       1 label_s, df_s = train_rdd.takeSample(False, 1, seed=50)[0]
> ----> 2 label, df = train_rdd.take(1)[0]
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/rdd.py in
> take(self, num)
>    1109
>    1110             p = range(partsScanned, min(partsScanned +
> numPartsToTry, totalParts))
> -> 1111             res = self.context.runJob(self, takeUpToNumLeft, p,
> True)
>    1112
>    1113             items += res
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041//python/pyspark/context.py
> in runJob(self, rdd, partitionFunc, partitions, allowLocal)
>     816         # SparkContext#runJob.
>     817         mappedRDD = rdd.mapPartitions(partitionFunc)
> --> 818         it = self._jvm.PythonRDD.runJob(self._jsc.sc(),
> mappedRDD._jrdd, javaPartitions, allowLocal)
>     819         return list(mappedRDD._collect_iterator_through_file(it))
>     820
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
>     536         answer = self.gateway_client.send_command(command)
>     537         return_value = get_return_value(answer, self.gateway_client,
> --> 538                 self.target_id, self.name)
>     539
>     540         for temp_arg in temp_args:
>
> /var/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
> in get_return_value(answer, gateway_client, target_id, name)
>     298                 raise Py4JJavaError(
>     299                     'An error occurred while calling {0}{1}{2}.\n'.
> --> 300                     format(target_id, '.', name), value)
>     301             else:
>     302                 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.runJob.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Total
> size of serialized results of 177 tasks (1038.0 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message