spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From András Kolbert <kolbertand...@gmail.com>
Subject Re: Tasks are skewed to one executor
Date Sun, 11 Apr 2021 10:09:46 GMT
Sure, I will check if that helps!

Incoming raw data for each batch is around 100K records. After processing
it, only a few thousand records are modified after the union/aggregation to
the big dataframe. The big dataframe overall size is around 43M records,
and as I said only a few thousand records change/ being created after a
batch. Currently, I write the complete dataframe to HDFS after each of the
batch iteration, even if only a few thousands records actually changed.

On Sun, 11 Apr 2021 at 11:56, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> OK that is what I guessed happening.
>
> You have few options to try in your SIT environment
>
> First try adding that partition_id to the groupby composite key  (account,
> product,partition_id) which effectively adds Salt.
>
> How many rows are being delivered in your streaming data in batch interval?
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 11 Apr 2021 at 10:17, András Kolbert <kolbertandras@gmail.com>
> wrote:
>
>> Hi,
>>
>> The groupby is by account, and product now. There is definitely some
>> skewness (as one expects, most users have interactions with small no of
>> items.
>>
>> What happens at the moment:
>>
>>    1. Union
>>    2. Groupby (account, product)
>>    3. Calculate partition (new_user_item_agg =
>>    new_user_item_agg.withColumn("partition_id",
>>    (F.abs(F.hash(col("account_name"))) % (5*(executor_count))))
>>    4. Repartition: new_user_item_agg =
>>    new_user_item_agg.repartition("partition_id")
>>    5. Write to HDFS:
>>    df.write \
>>            .partitionBy("partition_id") \
>>            .mode("overwrite") \
>>            .save(hdfs_path)
>>
>> Maybe I could add the partition_id to the groupby already and that would
>> give me some performance increase?
>>
>> The biggest issue is this:
>> [image: image.png]
>>
>> Whenever you see some spikes, that's because executors died during the
>> processing.
>>
>> [image: image.png]
>>
>> This is the status after:
>> [image: image.png]
>>
>>
>> The ideal situation would be to optimise the flow and not let the
>> executors die constantly.
>>
>

Mime
View raw message