Hi All,

Managed to find out what's going on - however, I still can't explain it:

What I've been doing and causing issues in the long run:

df \
        .write \
        .mode("overwrite") \
        .parquet(hdfs_path)

Here I kept rotating the the HDFS_PATH, from /location/event_name/1 to /location/event_name/2 and then /location/event_name/2 to /location/event_name/1

When I changed my process to be the following:
unique_hdfs_path =  /location/event_name/ + UNIX_TIMESTAMP

df \
        .write \
        .mode("overwrite") \
        .parquet(unique_hdfs_path)


The issues seem to be resolved. However, I am still looking for an explanation why... :)

Anyone could maybe shed some light on this?

Thanks
Andras



On Thu, 12 Aug 2021 at 15:55, András Kolbert <kolbertandras@gmail.com> wrote:
Also posted this on Stackoverflow, maybe the description there is more precise:




On Tue, 10 Aug 2021 at 12:00, András Kolbert <kolbertandras@gmail.com> wrote:
image.png
Just one more screenshot from the logging - it took 39.7 seconds for the write this time (inline with Spark UI).

On Tue, 10 Aug 2021 at 11:57, András Kolbert <kolbertandras@gmail.com> wrote:
Ok, i started digging deeper. I've uncommented the unpersist() method, to see the benefit.
Now, the execution time is increasing at the write operation, where I'd have expected this to happen in the first place already.

Strangely enough, I can see the following in Spark UI (40 seconds for the write operation):


image.png

However, the job itself runs in 2 seconds! 
image.png

What about the remaining 38 seconds? How could I identify what's going on?

Thanks
Andras

On Tue, 10 Aug 2021 at 09:35, András Kolbert <kolbertandras@gmail.com> wrote:
Hi,

I have a long-running spark streaming job. The execution time gradually, linearlyincreasing, and in 60 minutes the processing goes from 40 seconds to 90 seconds.

I managed to narrow down where I can see an increase in execution time and it's the least expected point: when the unpersist() method is called, the execution time goes from milliseconds to like 40-50 seconds. Bizarre, especially because the unpersist()'s default argument is blocking=False.

Look how the unpersist time keeps increasing with time:

image.png


image.png

image.png

image.png



Pyspark version: 2.4.4



The generic function that I tend to use for writing checkpoint looks like this, after putting several debug points to identify what causes the problem:

def write_checkpoint(self, df, event_name, no_partition_save=None, no_partition_read=None, partition_by=None, cache=True):
        t0 = perf_counter()
        # output to hdfs
        hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate from the previous output
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - get_path execution time: {round((perf_counter()-t0),1)} seconds')
        t0 = perf_counter()
       
        if no_partition_save:
            # coalesce instead of repartition can have unwanted behaviour
            # https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
            df.repartition(no_partition_save) \
                .write \
                .mode("overwrite") \
                .save(hdfs_path)
        elif partition_by:
            df.write \
                .partitionBy(partition_by) \
                .mode("overwrite") \
                .save(hdfs_path)
        else:
            df \
                .write \
                .mode("overwrite") \
                .save(hdfs_path)
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - write execution time: {round((perf_counter()-t0),1)} seconds')
        t0 = perf_counter()
        self.hdfs_paths[event_name] = hdfs_path

        df.unpersist()
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - unpersist execution time: {round((perf_counter()-t0),1)} seconds')
       
        t0 = perf_counter()
        if no_partition_read:
            df_new = self.spark.read.load(hdfs_path).repartition(no_partition_read)
        else:
            df_new = self.spark.read.load(hdfs_path)

        if partition_by:
            df_new = df.repartition(partition_by)
        if cache:
            df_new.cache()
        t1=perf_counter()
        self.logger.debug(f'write_checkpoint - read execution time: {round((perf_counter()-t0),1)} seconds')
        return df_new


In the code, the function is called like this:
new_df = spark_connect.write_checkpoint(df=df, event_name="df_checkpoint")

I do not have any guesses why this is happening. Anyone has any clever thoughts?


Thanks
Andras