spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Spark AQE Post-Shuffle partitions coalesce don't work as expected, and even make data skew in some partitions. Need help to debug issue.
Date Tue, 06 Jul 2021 08:13:30 GMT
Ok Nick let us have a look at this.

Your raw size is variable as per json. One row may be x bytes and the other
x*y bytes. Sounds like you run this batch through some cron or airflow and
you carry on from where checkpointLocation points to the last processed
records.

You end up with executors having variable sizes. You salt it (how do you
salt this just to remind) and coalesce meaning use existing partitions but
that does not work well. Have you tried repartition (I am aware of shuffle
price to pay)

To understand this we need one experiment without AQE, otherwise it will be
difficult to come to a meaningful conclusion.

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 5 Jul 2021 at 11:07, Nick Grigoriev <grigor60@gmail.com> wrote:

> Hi Mich,
>
> Thanks for quick response.
>
> 1. No, I use Batch query with fixed start and end offset.
> 2. Yes, My message in Kafkas(json format) can have really big difference
> in size from 1kb to 9kb. And even when I transform JSON to flat Spark SQL
> row it still can has different size.
> 3. I have two stages:  `from_json` function to create a nested
> StructType(Spark SQL Row), some spark sql functions(only on row level, no
> aggregation or join)to transform NESTED to FLAT SQL Row.
> 4. I write it in my mail:
> 4.1 I use spark confidence to enable AQE:
>
> ```
>>     "spark.sql.adaptive.enabled" -> "true",
>>     "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
>>     "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
>>     "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
>>     "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
>> ```
>
> 4.2 To debugging and adding custom logs ->  I create custom Physical Plan
> optimisation.
>       This optimisation is 99% copy of Spark built in Rule. But with
> additional log and force apply to any RangeRepartiton node.
> Code snippet
> https://gist.github.com/GrigorievNick/2f77b26719e46c544e3f20aa48862719
>
> So I start to debug the issue, but AQE don't have enough logs on the input
>> and output of
>> `ShufflePartitionsUtil.coalescePartitions`.
>> That why I rewrite my query to repartitionByRange.sortWithingPartitoins.
>> And [fork Physical Plan optimization with additional logging][1].
>
> 5. This mean that when I don’t enable AQE, My partitions has much better
> distribution than with it.
>     Actually in my specific case my rangeRepartiton give good distribution
> in records(rows), but because records size are very uneven.
>    Mean that number of records in partitions are near the same, but size
> in bytes have a big difference. Up to 10 times. Because some records size
> can be too big.
>    And when I enable AQE, Some partitions became even bigger up to 200
> times bigger. So coalesce some how create very big partitions.
>    And even records(rows) number became uneven.
>
> There are a lot of details, because issue is very strange. And it’s pretty
> hard to understand an real issue.
> I assume problem in MapOutputStatistics which used as input of coalesce
> functions.
> But I am not sure, so I split all my debug way into step and put it in
> mail.
>
> Another issue, that I don’t have a public dataset where I can reproduce
> this issue, right now.
> And I can’t publish my current dataset out of company, because if privacy.
>
> On 4 Jul 2021, at 16:44, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
> Hi Nick,
>
> I looked at both this thread and your SO question.
>
> Trying to understand
>
>
>    1. You are reading through Kafka via Spark structured streaming.
>    2. Your messages from Kafka are not uniform, meaning you may get
>    variable record size in each window.
>    3. How are you processing these messages
>    4. Have you run this without AQE? AQE by default is turned off.
>    5. You stated ".. This state of things is much worse than without
>    AEQ", why is that?
>
>
> It sounds like a lot is packed in your thread so trying to understand the
> base issue and the main fundamental problem
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Jul 2021 at 14:13, Nick Grigoriev <grigor60@gmail.com> wrote:
>
>> I have ask this question on stack overflow, but it look to complex for
>> Q/A resource.
>>
>> https://stackoverflow.com/questions/68236323/spark-aqe-post-shuffle-partitions-coalesce-dont-work-as-expected-and-even-make
>> So I want ask for help here.
>>
>> I use global sort on my spark DF, and when I enable AQE and post-shuffle
>> coalesce, my partitions after sort operation become even worse distributed
>> than before.
>> ```
>>     "spark.sql.adaptive.enabled" -> "true",
>>     "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
>>     "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
>>     "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
>>     "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
>> ```
>> My query, on high level, looks:
>> ```spark
>> .readFromKafka
>> .deserializeJsonToRow
>> .cache
>> .sort(const_part, column which can cause skew, some salt columns)
>> .writeToS3
>> ```
>>  1. column which can cause skew -> yes my data is not well distributed,
>> that's why I use salts.
>>  2. I read data from Kafka, so I use Kafka partition + offset columns as
>> salt.
>>  3. Why Sort which uses reaprtitoinByRange under the hood doesn't help me
>> and I want to enable AEQ? -> Right now I see that my Kafka message can have
>> a too big a difference in size. So I see that my partitions after range
>> repartition have near the same amount of records, but still very uneven in
>> bytes.
>>  4. Why I think AQE must help me? -> I want to create many small ranges
>> which even with my data skew will not be more than ~50mb, so post shuffle
>> coalesce will be able to coalesce them to target size(256mb). In my case
>> spikes, up to 320mb are ok.
>>
>> My first assumption was, that even with a small range has a too big a
>> spike.
>> But I check and confirm that repartition by a range gives me good
>> distribution in records, but bad is size. I have nearly 200 partitions with
>> near the same amount of records and size differences of up to 9x times,
>> from ~100Mb to ~900mb.
>> But with AEQ and repartition to 18000 small ranges, the smallest
>> partition was 18mib and the biggest 1.8Gib.
>> This state of things is much worse than without AEQ.
>> It's important to highlight that I use metrics from Spark UI -> Details
>> for Stage tab to identify partitions size in bytes, and I have my own logs
>> for records.
>>
>>
>> So I start to debug the issue, but AQE don't have enough logs on the
>> input and output of
>> `ShufflePartitionsUtil.coalescePartitions`.
>> That why I rewrite my query to repartitionByRange.sortWithingPartitoins.
>> And [fork Physical Plan optimization with additional logging][1].
>> My logs show me, that my initial idea was right.
>>  * Input partitions after map  and write shuffle stage was split to be
>> small enough
>>  * Coalesce algorithm collects them to goon number well distributed in
>> bytes partition.
>> ```
>> Input shuffleId:2 partitions:17999
>> Max partition size :27362117
>> Min partition size :8758435
>> ```
>> And
>> ```
>> Number of shuffle stages to coalesce 1
>> Reduce number of partitions from 17999 to 188
>> Output partition  maxsize :312832323
>> Output partition min size :103832323
>> ```
>> Min size is so different, because of the size of the last partition, it's
>> expected. TRACE log level shows that 99% of partitions is near 290mib.
>>
>>  * But why spark UI show so different results? ->
>>  * May spark UI be wrong? ->
>>  * Maybe, but except for size, the duration of a task is also too big,
>> which makes me think spark UI is ok.
>>  * So may assumption issue with `MapOutputStatistics` in my stage. But
>> does it always broken or only in my case? ->
>>  *  Only in my case? -> I made a few checks to confirm it.
>>  * * I read the same dataset from s3(parquet files with block size
>> 120mb)-> and AQE work as expected. post shuffle coalesce return to me 188,
>> well distributed by size, partitions. it's important to notice that data on
>> s3 not well distributed, but spark during reading split it to 259 near
>> 120mb size partitions, most of all because of parquet block size 120mb.
>>  * * I read the same dataset from Kafka, but exclude the column with a
>> skew from the partition function -> and AQE work as expected. post shuffle
>> coalesce return to me 203, well distributed by size, partitions.
>>  * * I try to disable cache -> this does not have any result. I use the
>> cache, only to avoid double reading from kafka. Because repartition by a
>> range use sampling.
>>  * * I try to disable AQE and write 18000 partitions to s3 -> result was
>> expected and the same as what my log on coalescing input show: 17999 files,
>> smallest near 8mib and biggest 56mib.
>>
>>  * All these checks make me think that `MapOutputStatistics` is wrong
>> only for my case. May be an issue that how to relate to Kafka source or
>> that my Kafka input data is very uneven distributed.
>>
>> Questions:
>>  * So do anyone have an idea what I do wrong? And What AQE do an
>> partitions skew.
>>  * And what I can do with input data to make post shuffle coalesce work
>> in my case?
>>  * If you think that I am right, please put a comment about it.
>>
>> P.S.
>> I also want to mention that my input Kafka data frame is 2160, not even
>> distributed partitions -> some partitions can be 2x time bigger then
>> others. Read from Kafka topic with 720 partitions and `minPartitions`
>> option * 3.
>>
>>
>>   [1]: https://gist.github.com/GrigorievNick/2f77b26719e46c544e3f20aa488
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>

Mime
View raw message