spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From András Kolbert <kolbertand...@gmail.com>
Subject Re: Spark Streaming - unpersist() method's execution time gradually increase with time
Date Tue, 10 Aug 2021 09:57:17 GMT
Ok, i started digging deeper. I've uncommented the unpersist() method, to
see the benefit.
Now, the execution time is increasing at the write operation, where I'd
have expected this to happen in the first place already.

Strangely enough, I can see the following in Spark UI (40 seconds for the
write operation):


[image: image.png]

However, the job itself runs in 2 seconds!
[image: image.png]

What about the remaining 38 seconds? How could I identify what's going on?

Thanks
Andras

On Tue, 10 Aug 2021 at 09:35, András Kolbert <kolbertandras@gmail.com>
wrote:

> Hi,
>
> I have a long-running spark streaming job. The execution time gradually,
> linearlyincreasing, and in 60 minutes the processing goes from 40 seconds
> to 90 seconds.
>
> I managed to narrow down where I can see an increase in execution time and
> it's the least expected point: when the *unpersist() *method is called,
> the execution time goes from milliseconds to like 40-50 seconds. Bizarre,
> especially because the unpersist()'s default argument is blocking=False.
>
> Look how the unpersist time keeps increasing with time:
>
> [image: image.png]
>
>
> [image: image.png]
>
> [image: image.png]
>
> [image: image.png]
>
>
>
> Pyspark version: 2.4.4
>
>
>
> The generic function that I tend to use for writing checkpoint looks like
> this, after putting several debug points to identify what causes the
> problem:
>
> def write_checkpoint(self, df, event_name, no_partition_save=None,
> no_partition_read=None, partition_by=None, cache=True):
>         t0 = perf_counter()
>         # output to hdfs
>         hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate
> from the previous output
>         t1=perf_counter()
>         self.logger.debug(f'write_checkpoint - get_path execution time:
> {round((perf_counter()-t0),1)} seconds')
>         t0 = perf_counter()
>
>         if no_partition_save:
>             # coalesce instead of repartition can have unwanted behaviour
>             #
> https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
>             df.repartition(no_partition_save) \
>                 .write \
>                 .mode("overwrite") \
>                 .save(hdfs_path)
>         elif partition_by:
>             df.write \
>                 .partitionBy(partition_by) \
>                 .mode("overwrite") \
>                 .save(hdfs_path)
>         else:
>             df \
>                 .write \
>                 .mode("overwrite") \
>                 .save(hdfs_path)
>         t1=perf_counter()
>         self.logger.debug(f'write_checkpoint - write execution time:
> {round((perf_counter()-t0),1)} seconds')
>         t0 = perf_counter()
>         self.hdfs_paths[event_name] = hdfs_path
>
>         df.unpersist()
>         t1=perf_counter()
>         self.logger.debug(f'write_checkpoint - unpersist execution time:
> {round((perf_counter()-t0),1)} seconds')
>
>         t0 = perf_counter()
>         if no_partition_read:
>             df_new =
> self.spark.read.load(hdfs_path).repartition(no_partition_read)
>         else:
>             df_new = self.spark.read.load(hdfs_path)
>
>         if partition_by:
>             df_new = df.repartition(partition_by)
>         if cache:
>             df_new.cache()
>         t1=perf_counter()
>         self.logger.debug(f'write_checkpoint - read execution time:
> {round((perf_counter()-t0),1)} seconds')
>         return df_new
>
>
> In the code, the function is called like this:
> new_df = spark_connect.write_checkpoint(df=df, event_name="df_checkpoint")
>
> I do not have any guesses why this is happening. Anyone has any clever
> thoughts?
>
>
> Thanks
> Andras
>
>
>
>
>
>
>

Mime
View raw message