spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koert Kuipers <>
Subject Re: AQE effectiveness
Date Tue, 29 Sep 2020 21:35:20 GMT
i have been doing tests with iterative algorithms that do caching/uncaching
at each iteration and i see improvements when i turn on AQE for cache.

now i am wondering... with an iterative algo using AQE it is true that the
output of every iteration can have a slightly different number of
partitions (which i observed), and this introduces extra shuffles in joints
between these outputs. so far this downside seems outweighed by the upsides
of AQE, so its not a big deal. but it would be rather simple to remove many
unnecessary shuffles and still retain all the benefits of AQE if we just
only slightly restricted AQEs choices for the number of partitions. for
example we could force it to use a power of 2? this way you get the
benefits of AQE but you dont have a frustrating situation where one
iteration has 67 partitions and the next has 68 partitions and then a join
introduces a shuffle.

On Fri, Aug 21, 2020 at 12:10 PM Maryann Xue <>

> It would break CachedTableSuite."A cached table preserves the partitioning
> and ordering of its cached SparkPlan" if AQE was turned on.
> Anyway, the chance of this outputPartitioning being useful is rather low
> and should not justify turning off AQE for SQL cache.
> On Thu, Aug 20, 2020 at 10:54 PM Koert Kuipers <> wrote:
>> in our inhouse spark version i changed this without trouble and it didnt
>> even break any tests
>> just some minor changes in CacheManager it seems
>> On Thu, Aug 20, 2020 at 1:12 PM Maryann Xue <>
>> wrote:
>>> No. The worst case of enabling AQE in cached data is not losing the
>>> opportunity of using/reusing the cache, but rather just an extra shuffle if
>>> the outputPartitioning happens to match without AQE and not match after
>>> AQE. The chance of this happening is rather low.
>>> On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers <>
>>> wrote:
>>>> i see. it makes sense to maximize re-use of cached data. i didn't
>>>> realize we have two potentially conflicting goals here.
>>>> On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <
>>>>> wrote:
>>>>> AQE has been turned off deliberately so that the `outputPartitioning`
>>>>> of the cached relation won't be changed by AQE partition coalescing or
>>>>> join optimization and the outputPartitioning can potentially be used
>>>>> relations built on top of the cache.
>>>>> On a second thought, we should probably add a config there and enable
>>>>> AQE by default.
>>>>> Thanks,
>>>>> Maryann
>>>>> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <>
>>>>> wrote:
>>>>>> we tend to have spark.sql.shuffle.partitions set very high by default
>>>>>> simply because some jobs need it to be high and it's easier to then
>>>>>> set the default high instead of having people tune it manually per
job. the
>>>>>> main downside is lots of part files which leads to pressure on the
>>>>>> and dynamic allocation becomes troublesome if every aggregation requires
>>>>>> thousands of tasks... even the simplest aggregation on tiny small
data will
>>>>>> demand all resources on the cluster.
>>>>>> because of these issues AQE appeals a lot to me: by automatically
>>>>>> scaling the reducer partitions we avoid these issues. so we have
AQE turned
>>>>>> on by default. every once in a while i scan through our spark AMs
and logs
>>>>>> to see how it's doing. i mostly look for stages that have a number
of tasks
>>>>>> equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't
>>>>>> effective. unfortunately this seems to be the majority. i suspect
it has to
>>>>>> do with caching/persisting which we use frequently. a simple reproduction
>>>>>> is below.
>>>>>> any idea why caching/persisting would interfere with AQE?
>>>>>> best, koert
>>>>>> $ hadoop fs -text fruits.csv
>>>>>> fruit,color,quantity
>>>>>> apple,red,5
>>>>>> grape,blue,50
>>>>>> pear,green,3
>>>>>> # works well using AQE, uses 1 to 3 tasks per job
>>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>>> spark.sql.adaptive.enabled=true
>>>>>> scala> val data ="csv").option("header",
>>>>>> true).load("fruits.csv").persist()
>>>>>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>>>>> # does not work well using AQR, uses 200 tasks (e.g.
>>>>>> spark.sql.shuffle.partitions) for certain jobs. the only difference
is when
>>>>>> persist is called.
>>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>>> spark.sql.adaptive.enabled=true
>>>>>> scala> val data ="csv").option("header",
>>>>>> true).load("fruits.csv").groupBy("fruit").count().persist()
>>>>>> scala> data.write.format("csv").save("out)

View raw message