spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <ko...@tresata.com>
Subject Re: groupBy and then coalesce impacts shuffle partitions in unintended way
Date Thu, 09 Aug 2018 06:07:24 GMT
well an interesting side effect of this is that i can now control the
number of partitions for every shuffle in a dataframe job, as opposed to
having a single setting for number of partitions across all shuffles.

basically i can set spark.sql.shuffle.partitions to some huge number, and
then for every groupByKey (or any other shuffle operation) follow it up
with a coalesce to set the number of partitions. its like i have
numPartitions back from those good old RDD shuffle methods :)


On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers <koert@tresata.com> wrote:

> an new map task after a shuffle is also a narrow dependency, isnt it? its
> narrow because data doesn't need to move, e.g. every partition depends on
> single partition, preferably on same machine.
>
> modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
> potentially make a mess of performance, especially when no shuffle is
> needed. just a new map task.
>
>
> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <kabhwan@gmail.com> wrote:
>
>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>> changing the previous shuffle?
>>
>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>> results in narrow dependency, hence no shuffle.
>>
>> So it is pretty clear that you need to use "repartition". Not sure
>> there's any available trick to achieve it without calling repartition.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a03081
>> 65f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/
>> spark/sql/Dataset.scala#L2918-L2937
>>
>>
>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <koert@tresata.com>님이
작성:
>>
>>> sorry i meant to say:
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with
>>> 100 tasks.
>>>
>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <koert@tresata.com> wrote:
>>>
>>>> the only thing that seems to stop this so far is a checkpoint.
>>>>
>>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>>> then a reduce phase with 2048 reducers, and then finally a map phase with
4
>>>> tasks.
>>>>
>>>> now i need to figure out how to do this without having to checkpoint. i
>>>> wish i could insert something like a dummy operation that logical steps
>>>> cannot jump over.
>>>>
>>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <koert@tresata.com>
>>>> wrote:
>>>>
>>>>> ok thanks.
>>>>>
>>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>>> with less tasks instead of changing the previous shuffle?
>>>>>
>>>>> using repartition seems too expensive just to keep the number of files
>>>>> down. so i guess i am back to looking for another solution.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <vadim@datadoghq.com>
>>>>> wrote:
>>>>>
>>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>>> have to use `repartition` instead which is going to introduce an
extra
>>>>>> shuffle stage
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <koert@tresata.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > one small correction: lots of files leads to pressure on the
spark
>>>>>> driver program when reading this data in spark.
>>>>>> >
>>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <koert@tresata.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> hi,
>>>>>> >>
>>>>>> >> i am reading data from files into a dataframe, then doing
a
>>>>>> groupBy for a given column with a count, and finally i coalesce to
a
>>>>>> smaller number of partitions before writing out to disk. so roughly:
>>>>>> >>
>>>>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>>>>> lesce(100).write.format(...).save(...)
>>>>>> >>
>>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>>> >>
>>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce
has
>>>>>> taken over the partitioning of the groupBy.
>>>>>> >>
>>>>>> >> any idea why?
>>>>>> >>
>>>>>> >> i am doing coalesce because it is not helpful to write out
2048
>>>>>> files, lots of files leads to pressure down the line on executors
reading
>>>>>> this data (i am writing to just one partition of a larger dataset),
and
>>>>>> since i have less than 100 executors i expect it to be efficient.
so sounds
>>>>>> like a good idea, no?
>>>>>> >>
>>>>>> >> but i do need 2048 partitions in my shuffle due to the operation
i
>>>>>> am doing in the groupBy (in my real problem i am not just doing a
count...).
>>>>>> >>
>>>>>> >> thanks!
>>>>>> >> koert
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from my iPhone
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Mime
View raw message