spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Tasks are skewed to one executor
Date Sun, 11 Apr 2021 10:30:21 GMT
Ok good luck with that groupby.

You also have around 3.7GB memory available, so may benefit from
BroadcastHashJoin

Worth checking this writeup
<https://dzone.com/articles/improving-the-performance-of-your-spark-job-on-ske>
in case

HTH






   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 11 Apr 2021 at 11:10, András Kolbert <kolbertandras@gmail.com>
wrote:

> Sure, I will check if that helps!
>
> Incoming raw data for each batch is around 100K records. After processing
> it, only a few thousand records are modified after the union/aggregation to
> the big dataframe. The big dataframe overall size is around 43M records,
> and as I said only a few thousand records change/ being created after a
> batch. Currently, I write the complete dataframe to HDFS after each of the
> batch iteration, even if only a few thousands records actually changed.
>
> On Sun, 11 Apr 2021 at 11:56, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> OK that is what I guessed happening.
>>
>> You have few options to try in your SIT environment
>>
>> First try adding that partition_id to the groupby composite key
>> (account, product,partition_id) which effectively adds Salt.
>>
>> How many rows are being delivered in your streaming data in batch
>> interval?
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 11 Apr 2021 at 10:17, András Kolbert <kolbertandras@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> The groupby is by account, and product now. There is definitely some
>>> skewness (as one expects, most users have interactions with small no of
>>> items.
>>>
>>> What happens at the moment:
>>>
>>>    1. Union
>>>    2. Groupby (account, product)
>>>    3. Calculate partition (new_user_item_agg =
>>>    new_user_item_agg.withColumn("partition_id",
>>>    (F.abs(F.hash(col("account_name"))) % (5*(executor_count))))
>>>    4. Repartition: new_user_item_agg =
>>>    new_user_item_agg.repartition("partition_id")
>>>    5. Write to HDFS:
>>>    df.write \
>>>            .partitionBy("partition_id") \
>>>            .mode("overwrite") \
>>>            .save(hdfs_path)
>>>
>>> Maybe I could add the partition_id to the groupby already and that would
>>> give me some performance increase?
>>>
>>> The biggest issue is this:
>>> [image: image.png]
>>>
>>> Whenever you see some spikes, that's because executors died during the
>>> processing.
>>>
>>> [image: image.png]
>>>
>>> This is the status after:
>>> [image: image.png]
>>>
>>>
>>> The ideal situation would be to optimise the flow and not let the
>>> executors die constantly.
>>>
>>

Mime
View raw message