spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Sela <>
Subject Re: partitionBy causing OOM
Date Tue, 26 Sep 2017 14:53:39 GMT
Thanks for all the answers!
It looks like increasing the heap a little, and setting spark.sql.
shuffle.partitions to a much lower number (I used the recommended
input_size_mb/128 formula) did the trick.
As for partitionBy, unless I use repartition("dt") before the writer, it
actually writes more than one output file per "dt" partition so I guess the
same "dt" value is spread across multiple partitions, right?

On Mon, Sep 25, 2017 at 11:07 PM ayan guha <> wrote:

> Another possible option would be creating partitioned table in hive and
> use dynamic partitioning while inserting. This will not require spark to do
> explocit partition by
> On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
>> wrote:
>> Hi Amit,
>> Spark keeps the partition that it is working on in memory (and does not
>> spill to disk even if it is running OOM). Also since you are getting OOM
>> when using partitionBy (and not when you just use flatMap), there should be
>> one (or few) dates on which your partition size is bigger than the heap.
>> You can do a count on dates to check if there is skewness in your data.
>> The way out would be increase the heap size or use columns in partitionBy
>> (like date + hour) to distribute the data better.
>> Hope this helps!
>> Thanks
>> Ankur
>> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩 <> wrote:
>>> Hi, Amit,
>>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>>> The default is 200 change this property could change the task number
>>> when you are using DataFrame API.
>>> On 26 Sep 2017, at 1:25 AM, Amit Sela <> wrote:
>>> I'm trying to run a simple pyspark application that reads from file
>>> (json), flattens it (explode) and writes back to file (json) partitioned by
>>> date using DataFrameWriter.partitionBy(*cols).
>>> I keep getting OOMEs like:
>>> java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.<init>(
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(
>>> at
>>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(
>>> .......
>>> Explode could make the underlying RDD grow a lot, and maybe in an
>>> unbalanced way sometimes,
>>> adding to that partitioning by date (in daily ETLs for instance) would
>>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>> If I'm not using "partitionBy" with the writer (still exploding)
>>> everything works fine.
>>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>>> both in python and scala).
>>> Thanks!
>> --
> Best Regards,
> Ayan Guha

View raw message