spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cesar Flores <ces...@gmail.com>
Subject Re: Optimal way to re-partition from a single partition
Date Tue, 09 Feb 2016 16:36:04 GMT
Well, actually I am observing a single partition no matter what my input
is. I am using spark 1.3.1.

For what you both are saying, it appears that this sorting issue (going to
a single partition after applying orderBy in a DF) is solved in later
version of Spark? Well, if that is the case, I guess I just need to wait
until my workplace decides to update.


Thanks a lot

On Tue, Feb 9, 2016 at 9:39 AM, Takeshi Yamamuro <linguin.m.s@gmail.com>
wrote:

> Hi,
>
> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
> `HashPartitioning`.
> `RangePartitioning` roughly samples input data and internally computes
> partition bounds
> to split given rows into `spark.sql.shuffle.partitions` partitions.
> Therefore, when sort keys are highly skewed, I think some partitions could
> end up being empty
> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
> .
>
>
> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9379@gmail.com>
> wrote:
>
>> For sql shuffle operations like groupby, the number of output partitions
>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>> not honour this.
>>
>> In my small test, I could see that the number of partitions  in DF
>> returned by orderBy was equal to the total number of distinct keys. Are you
>> observing the same, I mean do you have a single value for all rows in the
>> column on which you are running orderBy? If yes, you are better off not
>> running the orderBy clause.
>>
>> May be someone from spark sql team could answer that how should the
>> partitioning of the output DF be handled when doing an orderBy?
>>
>> Hemant
>> www.snappydata.io
>> https://github.com/SnappyDataInc/snappydata
>>
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <cesar7@gmail.com> wrote:
>>
>>>
>>> I have a data frame which I sort using orderBy function. This operation
>>> causes my data frame to go to a single partition. After using those
>>> results, I would like to re-partition to a larger number of partitions.
>>> Currently I am just doing:
>>>
>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>> partition and around 14 million records
>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>
>>> This process is really slow. Is there any other way of achieving this
>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>
>>>
>>> Thanks a lot
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>



-- 
Cesar Flores

Mime
View raw message