spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: groupBy and then coalesce impacts shuffle partitions in unintended way
Date Thu, 09 Aug 2018 05:38:40 GMT
an new map task after a shuffle is also a narrow dependency, isnt it? its
narrow because data doesn't need to move, e.g. every partition depends on
single partition, preferably on same machine.

modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
potentially make a mess of performance, especially when no shuffle is
needed. just a new map task.


On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <kabhwan@gmail.com> wrote:

> > shouldnt coalesce introduce a new map-phase with less tasks instead of
> changing the previous shuffle?
>
> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
> results in narrow dependency, hence no shuffle.
>
> So it is pretty clear that you need to use "repartition". Not sure there's
> any available trick to achieve it without calling repartition.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
> 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/
> Dataset.scala#L2918-L2937
>
>
> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <koert@tresata.com>님이 작성:
>
>> sorry i meant to say:
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with
>> 100 tasks.
>>
>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <koert@tresata.com> wrote:
>>
>>> the only thing that seems to stop this so far is a checkpoint.
>>>
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>>> tasks.
>>>
>>> now i need to figure out how to do this without having to checkpoint. i
>>> wish i could insert something like a dummy operation that logical steps
>>> cannot jump over.
>>>
>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <koert@tresata.com> wrote:
>>>
>>>> ok thanks.
>>>>
>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>> with less tasks instead of changing the previous shuffle?
>>>>
>>>> using repartition seems too expensive just to keep the number of files
>>>> down. so i guess i am back to looking for another solution.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <vadim@datadoghq.com>
>>>> wrote:
>>>>
>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>> have to use `repartition` instead which is going to introduce an extra
>>>>> shuffle stage
>>>>>
>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <koert@tresata.com>
>>>>> wrote:
>>>>> >
>>>>> > one small correction: lots of files leads to pressure on the spark
>>>>> driver program when reading this data in spark.
>>>>> >
>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <koert@tresata.com>
>>>>> wrote:
>>>>> >>
>>>>> >> hi,
>>>>> >>
>>>>> >> i am reading data from files into a dataframe, then doing a
groupBy
>>>>> for a given column with a count, and finally i coalesce to a smaller
number
>>>>> of partitions before writing out to disk. so roughly:
>>>>> >>
>>>>> >> spark.read.format(...).load(...).groupBy(column).count().
>>>>> coalesce(100).write.format(...).save(...)
>>>>> >>
>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>> >>
>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce
has
>>>>> taken over the partitioning of the groupBy.
>>>>> >>
>>>>> >> any idea why?
>>>>> >>
>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>> since i have less than 100 executors i expect it to be efficient. so
sounds
>>>>> like a good idea, no?
>>>>> >>
>>>>> >> but i do need 2048 partitions in my shuffle due to the operation
i
>>>>> am doing in the groupBy (in my real problem i am not just doing a count...).
>>>>> >>
>>>>> >> thanks!
>>>>> >> koert
>>>>> >>
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Sent from my iPhone
>>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message