spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Babak Alipour <babak.alip...@gmail.com>
Subject Re: DataFrame Sort gives Cannot allocate a page with more than 17179869176 bytes
Date Wed, 05 Oct 2016 17:00:38 GMT
The issue seems to lie in the RangePartitioner trying to create equal
ranges. [1]

[1] https://spark.apache.org/docs/2.0.0/api/java/org/apache/
spark/RangePartitioner.html

 The *Double* values I'm trying to sort are mostly in the range [0,1] (~70%
of the data which roughly equates 1 billion records), other numbers in the
dataset are as high as 2000. With the RangePartitioner trying to create
equal ranges, some tasks are becoming almost empty while others are
extremely large, due to the heavily skewed distribution.

This is either a bug in Apache Spark or a major limitation of the
framework. Has anyone else encountered this?

*Babak Alipour ,*
*University of Florida*

On Sun, Oct 2, 2016 at 1:38 PM, Babak Alipour <babak.alipour@gmail.com>
wrote:

> Thanks Vadim for sharing your experience, but I have tried multi-JVM setup
> (2 workers), various sizes for spark.executor.memory (8g, 16g, 20g, 32g,
> 64g) and spark.executor.core (2-4), same error all along.
>
> As for the files, these are all .snappy.parquet files, resulting from
> inserting some data from other tables. None of them actually exceeds 25MiB
> (I don't know why this number) Setting the DataFrame to persist using
> StorageLevel.MEMORY_ONLY shows size in memory at ~10g.  I still cannot
> understand why it is trying to create such a big page when sorting. The
> entire column (this df has only 1 column) is not that big, neither are the
> original files. Any ideas?
>
>
> >Babak
>
>
>
> *Babak Alipour ,*
> *University of Florida*
>
> On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov <vadim.semenov@datadoghq.com
> > wrote:
>
>> oh, and try to run even smaller executors, i.e. with
>> `spark.executor.memory` <= 16GiB. I wonder what result you're going to get.
>>
>> On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov <
>> vadim.semenov@datadoghq.com> wrote:
>>
>>> > Do you mean running a multi-JVM 'cluster' on the single machine?
>>> Yes, that's what I suggested.
>>>
>>> You can get some information here:
>>> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apach
>>> e-spark-jobs-part-2/
>>>
>>> > How would that affect performance/memory-consumption? If a multi-JVM
>>> setup can handle such a large input, then why can't a single-JVM break down
>>> the job into smaller tasks?
>>> I don't have an answer to these questions, it requires understanding of
>>> Spark, JVM, and your setup internal.
>>>
>>> I ran into the same issue only once when I tried to read a gzipped file
>>> which size was >16GiB. That's the only time I had to meet this
>>> https://github.com/apache/spark/blob/5d84c7fd83502aeb55
>>> 1d46a740502db4862508fe/core/src/main/java/org/apache/spark/
>>> memory/TaskMemoryManager.java#L238-L243
>>> In the end I had to recompress my file into bzip2 that is splittable to
>>> be able to read it with spark.
>>>
>>>
>>> I'd look into size of your files and if they're huge I'd try to connect
>>> the error you got to the size of the files (but it's strange to me as a
>>> block size of a Parquet file is 128MiB). I don't have any other
>>> suggestions, I'm sorry.
>>>
>>>
>>> On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour <babak.alipour@gmail.com>
>>> wrote:
>>>
>>>> Do you mean running a multi-JVM 'cluster' on the single machine? How
>>>> would that affect performance/memory-consumption? If a multi-JVM setup
>>>> can handle such a large input, then why can't a single-JVM break down the
>>>> job into smaller tasks?
>>>>
>>>> I also found that SPARK-9411 mentions making the page_size configurable
>>>> but it's hard-limited to ((1L << 31) - 1) * 8L [1]
>>>>
>>>> [1] https://github.com/apache/spark/blob/master/core/src/main/ja
>>>> va/org/apache/spark/memory/TaskMemoryManager.java
>>>>
>>>> ​Spark-9452 also talks about larger page sizes but I don't know how
>>>> that affects my use case.​ [2]
>>>>
>>>> [2] https://github.com/apache/spark/pull/7891
>>>>
>>>>
>>>> ​The reason provided here is that the on-heap allocator's maximum page
>>>> size is limited by the maximum amount of data that can be stored in a
>>>> long[]​.
>>>> Is it possible to force this specific operation to go off-heap so that
>>>> it can possibly use a bigger page size?
>>>>
>>>>
>>>>
>>>> ​>Babak​
>>>>
>>>>
>>>> *Babak Alipour ,*
>>>> *University of Florida*
>>>>
>>>> On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov <
>>>> vadim.semenov@datadoghq.com> wrote:
>>>>
>>>>> Run more smaller executors: change `spark.executor.memory` to 32g and
>>>>> `spark.executor.cores` to 2-4, for example.
>>>>>
>>>>> Changing driver's memory won't help because it doesn't participate in
>>>>> execution.
>>>>>
>>>>> On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour <
>>>>> babak.alipour@gmail.com> wrote:
>>>>>
>>>>>> Thank you for your replies.
>>>>>>
>>>>>> @Mich, using LIMIT 100 in the query prevents the exception but given
>>>>>> the fact that there's enough memory, I don't think this should happen
even
>>>>>> without LIMIT.
>>>>>>
>>>>>> @Vadim, here's the full stack trace:
>>>>>>
>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate a
>>>>>> page with more than 17179869176 bytes
>>>>>>         at org.apache.spark.memory.TaskMe
>>>>>> moryManager.allocatePage(TaskMemoryManager.java:241)
>>>>>>         at org.apache.spark.memory.Memory
>>>>>> Consumer.allocatePage(MemoryConsumer.java:121)
>>>>>>         at org.apache.spark.util.collecti
>>>>>> on.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessar
>>>>>> y(UnsafeExternalSorter.java:374)
>>>>>>         at org.apache.spark.util.collecti
>>>>>> on.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExter
>>>>>> nalSorter.java:396)
>>>>>>         at org.apache.spark.sql.execution
>>>>>> .UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
>>>>>>         at org.apache.spark.sql.catalyst.
>>>>>> expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>>>>>> Source)
>>>>>>         at org.apache.spark.sql.catalyst.
>>>>>> expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>>>>>> Source)
>>>>>>         at org.apache.spark.sql.catalyst.
>>>>>> expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>>>>> Source)
>>>>>>         at org.apache.spark.sql.execution
>>>>>> .BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>>>>>         at org.apache.spark.sql.execution
>>>>>> .WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStage
>>>>>> CodegenExec.scala:370)
>>>>>>         at scala.collection.Iterator$$ano
>>>>>> n$11.hasNext(Iterator.scala:408)
>>>>>>         at org.apache.spark.shuffle.sort.
>>>>>> BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWri
>>>>>> ter.java:125)
>>>>>>         at org.apache.spark.scheduler.Shu
>>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:79)
>>>>>>         at org.apache.spark.scheduler.Shu
>>>>>> ffleMapTask.runTask(ShuffleMapTask.scala:47)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>>>>>         at org.apache.spark.executor.Exec
>>>>>> utor$TaskRunner.run(Executor.scala:274)
>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>> lExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>         at java.util.concurrent.ThreadPoo
>>>>>> lExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> I'm running spark in local mode so there is only one executor, the
>>>>>> driver and spark.driver.memory is set to 64g. Changing the driver's
memory
>>>>>> doesn't help.
>>>>>>
>>>>>> *Babak Alipour ,*
>>>>>> *University of Florida*
>>>>>>
>>>>>> On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov <
>>>>>> vadim.semenov@datadoghq.com> wrote:
>>>>>>
>>>>>>> Can you post the whole exception stack trace?
>>>>>>> What are your executor memory settings?
>>>>>>>
>>>>>>> Right now I assume that it happens in UnsafeExternalRowSorter
->
>>>>>>> UnsafeExternalSorter:insertRecord
>>>>>>>
>>>>>>> Running more executors with lower `spark.executor.memory` should
>>>>>>> help.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 30, 2016 at 12:57 PM, Babak Alipour <
>>>>>>> babak.alipour@gmail.com> wrote:
>>>>>>>
>>>>>>>> Greetings everyone,
>>>>>>>>
>>>>>>>> I'm trying to read a single field of a Hive table stored
as Parquet
>>>>>>>> in Spark (~140GB for the entire table, this single field
should be just a
>>>>>>>> few GB) and look at the sorted output using the following:
>>>>>>>>
>>>>>>>> sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field
+ " DESC")
>>>>>>>>
>>>>>>>> ​But this simple line of code gives:
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot allocate
a
>>>>>>>> page with more than 17179869176 bytes
>>>>>>>>
>>>>>>>> Same error for:
>>>>>>>>
>>>>>>>> sql("SELECT " + field + " FROM MY_TABLE).sort(field)
>>>>>>>>
>>>>>>>> and:
>>>>>>>>
>>>>>>>> sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)
>>>>>>>>
>>>>>>>>
>>>>>>>> I'm running this on a machine with more than 200GB of RAM,
running
>>>>>>>> in local mode with spark.driver.memory set to 64g.
>>>>>>>>
>>>>>>>> I do not know why it cannot allocate a big enough page, and
why is
>>>>>>>> it trying to allocate such a big page in the first place?
>>>>>>>>
>>>>>>>> I hope someone with more knowledge of Spark can shed some
light on
>>>>>>>> this. Thank you!
>>>>>>>>
>>>>>>>>
>>>>>>>> *​Best regards,​*
>>>>>>>> *Babak Alipour ,*
>>>>>>>> *University of Florida*
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message