spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AndrĂ¡s Kolbert <kolbertand...@gmail.com>
Subject Spark Streaming - unpersist() method's execution time gradually increase with time
Date Tue, 10 Aug 2021 07:35:49 GMT
Hi,

I have a long-running spark streaming job. The execution time gradually,
linearlyincreasing, and in 60 minutes the processing goes from 40 seconds
to 90 seconds.

I managed to narrow down where I can see an increase in execution time and
it's the least expected point: when the *unpersist() *method is called, the
execution time goes from milliseconds to like 40-50 seconds. Bizarre,
especially because the unpersist()'s default argument is blocking=False.

Look how the unpersist time keeps increasing with time:

[image: image.png]


[image: image.png]

[image: image.png]

[image: image.png]



Pyspark version: 2.4.4



The generic function that I tend to use for writing checkpoint looks like
this, after putting several debug points to identify what causes the
problem:

def write_checkpoint(self, df, event_name, no_partition_save=None,
no_partition_read=None, partition_by=None, cache=True):
        t0 = perf_counter()
        # output to hdfs
        hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate
from the previous output
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - get_path execution time:
{round((perf_counter()-t0),1)} seconds')
        t0 = perf_counter()

        if no_partition_save:
            # coalesce instead of repartition can have unwanted behaviour
            #
https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
            df.repartition(no_partition_save) \
                .write \
                .mode("overwrite") \
                .save(hdfs_path)
        elif partition_by:
            df.write \
                .partitionBy(partition_by) \
                .mode("overwrite") \
                .save(hdfs_path)
        else:
            df \
                .write \
                .mode("overwrite") \
                .save(hdfs_path)
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - write execution time:
{round((perf_counter()-t0),1)} seconds')
        t0 = perf_counter()
        self.hdfs_paths[event_name] = hdfs_path

        df.unpersist()
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - unpersist execution time:
{round((perf_counter()-t0),1)} seconds')

        t0 = perf_counter()
        if no_partition_read:
            df_new =
self.spark.read.load(hdfs_path).repartition(no_partition_read)
        else:
            df_new = self.spark.read.load(hdfs_path)

        if partition_by:
            df_new = df.repartition(partition_by)
        if cache:
            df_new.cache()
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - read execution time:
{round((perf_counter()-t0),1)} seconds')
        return df_new


In the code, the function is called like this:
new_df = spark_connect.write_checkpoint(df=df, event_name="df_checkpoint")

I do not have any guesses why this is happening. Anyone has any clever
thoughts?


Thanks
Andras

Mime
View raw message