spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: groupBy and then coalesce impacts shuffle partitions in unintended way
Date Wed, 08 Aug 2018 20:55:01 GMT
sorry i meant to say:
wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with
100 tasks.

On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <koert@tresata.com> wrote:

> the only thing that seems to stop this so far is a checkpoint.
>
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with 4
> tasks.
>
> now i need to figure out how to do this without having to checkpoint. i
> wish i could insert something like a dummy operation that logical steps
> cannot jump over.
>
> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> ok thanks.
>>
>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase with
>> less tasks instead of changing the previous shuffle?
>>
>> using repartition seems too expensive just to keep the number of files
>> down. so i guess i am back to looking for another solution.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <vadim@datadoghq.com>
>> wrote:
>>
>>> `coalesce` sets the number of partitions for the last stage, so you
>>> have to use `repartition` instead which is going to introduce an extra
>>> shuffle stage
>>>
>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <koert@tresata.com> wrote:
>>> >
>>> > one small correction: lots of files leads to pressure on the spark
>>> driver program when reading this data in spark.
>>> >
>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <koert@tresata.com>
>>> wrote:
>>> >>
>>> >> hi,
>>> >>
>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>> for a given column with a count, and finally i coalesce to a smaller number
>>> of partitions before writing out to disk. so roughly:
>>> >>
>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>> lesce(100).write.format(...).save(...)
>>> >>
>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>> >>
>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>> over the partitioning of the groupBy.
>>> >>
>>> >> any idea why?
>>> >>
>>> >> i am doing coalesce because it is not helpful to write out 2048
>>> files, lots of files leads to pressure down the line on executors reading
>>> this data (i am writing to just one partition of a larger dataset), and
>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>> like a good idea, no?
>>> >>
>>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>>> doing in the groupBy (in my real problem i am not just doing a count...).
>>> >>
>>> >> thanks!
>>> >> koert
>>> >>
>>> >
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>>
>

Mime
View raw message