Sure, I will check if that helps!

Incoming raw data for each batch is around 100K records. After processing it, only a few thousand records are modified after the union/aggregation to the big dataframe. The big dataframe overall size is around 43M records, and as I said only a few thousand records change/ being created after a batch. Currently, I write the complete dataframe to HDFS after each of the batch iteration, even if only a few thousands records actually changed. 

On Sun, 11 Apr 2021 at 11:56, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:
OK that is what I guessed happening.

You have few options to try in your SIT environment

First try adding that partition_id to the groupby composite key  (account, product,partition_id) which effectively adds Salt.

How many rows are being delivered in your streaming data in batch interval?

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 11 Apr 2021 at 10:17, András Kolbert <kolbertandras@gmail.com> wrote:
Hi,

The groupby is by account, and product now. There is definitely some skewness (as one expects, most users have interactions with small no of items.

What happens at the moment:
  1. Union
  2. Groupby (account, product)
  3. Calculate partition (new_user_item_agg = new_user_item_agg.withColumn("partition_id", (F.abs(F.hash(col("account_name"))) % (5*(executor_count))))
  4. Repartition: new_user_item_agg = new_user_item_agg.repartition("partition_id")
  5. Write to HDFS: 
    df.write \
            .partitionBy("partition_id") \
            .mode("overwrite") \
            .save(hdfs_path)
Maybe I could add the partition_id to the groupby already and that would give me some performance increase?

The biggest issue is this:
image.png

Whenever you see some spikes, that's because executors died during the processing.

image.png

This is the status after:
image.png


The ideal situation would be to optimise the flow and not let the executors die constantly.