spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jungtaek Lim <kabh...@gmail.com>
Subject Re: groupBy and then coalesce impacts shuffle partitions in unintended way
Date Thu, 09 Aug 2018 05:15:11 GMT
> shouldnt coalesce introduce a new map-phase with less tasks instead of
changing the previous shuffle?

The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
results in narrow dependency, hence no shuffle.

So it is pretty clear that you need to use "repartition". Not sure there's
any available trick to achieve it without calling repartition.

Thanks,
Jungtaek Lim (HeartSaVioR)

1.
https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937


2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <koert@tresata.com>님이 작성:

> sorry i meant to say:
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with
> 100 tasks.
>
> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <koert@tresata.com> wrote:
>
>> the only thing that seems to stop this so far is a checkpoint.
>>
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>> tasks.
>>
>> now i need to figure out how to do this without having to checkpoint. i
>> wish i could insert something like a dummy operation that logical steps
>> cannot jump over.
>>
>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <koert@tresata.com> wrote:
>>
>>> ok thanks.
>>>
>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase with
>>> less tasks instead of changing the previous shuffle?
>>>
>>> using repartition seems too expensive just to keep the number of files
>>> down. so i guess i am back to looking for another solution.
>>>
>>>
>>>
>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <vadim@datadoghq.com>
>>> wrote:
>>>
>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>> have to use `repartition` instead which is going to introduce an extra
>>>> shuffle stage
>>>>
>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <koert@tresata.com> wrote:
>>>> >
>>>> > one small correction: lots of files leads to pressure on the spark
>>>> driver program when reading this data in spark.
>>>> >
>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <koert@tresata.com>
>>>> wrote:
>>>> >>
>>>> >> hi,
>>>> >>
>>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>>> for a given column with a count, and finally i coalesce to a smaller number
>>>> of partitions before writing out to disk. so roughly:
>>>> >>
>>>> >>
>>>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>>> >>
>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>> >>
>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>>> over the partitioning of the groupBy.
>>>> >>
>>>> >> any idea why?
>>>> >>
>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>> files, lots of files leads to pressure down the line on executors reading
>>>> this data (i am writing to just one partition of a larger dataset), and
>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>> like a good idea, no?
>>>> >>
>>>> >> but i do need 2048 partitions in my shuffle due to the operation
i
>>>> am doing in the groupBy (in my real problem i am not just doing a count...).
>>>> >>
>>>> >> thanks!
>>>> >> koert
>>>> >>
>>>> >
>>>>
>>>>
>>>> --
>>>> Sent from my iPhone
>>>>
>>>
>>>
>>
>

Mime
View raw message