spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From András Kolbert <kolbertand...@gmail.com>
Subject Re: Spark Streaming - unpersist() method's execution time gradually increase with time
Date Tue, 10 Aug 2021 10:00:00 GMT
[image: image.png]
Just one more screenshot from the logging - it took 39.7 seconds for the
write this time (inline with Spark UI).

On Tue, 10 Aug 2021 at 11:57, András Kolbert <kolbertandras@gmail.com>
wrote:

> Ok, i started digging deeper. I've uncommented the unpersist() method, to
> see the benefit.
> Now, the execution time is increasing at the write operation, where I'd
> have expected this to happen in the first place already.
>
> Strangely enough, I can see the following in Spark UI (40 seconds for the
> write operation):
>
>
> [image: image.png]
>
> However, the job itself runs in 2 seconds!
> [image: image.png]
>
> What about the remaining 38 seconds? How could I identify what's going on?
>
> Thanks
> Andras
>
> On Tue, 10 Aug 2021 at 09:35, András Kolbert <kolbertandras@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a long-running spark streaming job. The execution time gradually,
>> linearlyincreasing, and in 60 minutes the processing goes from 40 seconds
>> to 90 seconds.
>>
>> I managed to narrow down where I can see an increase in execution time
>> and it's the least expected point: when the *unpersist() *method is
>> called, the execution time goes from milliseconds to like 40-50 seconds.
>> Bizarre, especially because the unpersist()'s default argument is
>> blocking=False.
>>
>> Look how the unpersist time keeps increasing with time:
>>
>> [image: image.png]
>>
>>
>> [image: image.png]
>>
>> [image: image.png]
>>
>> [image: image.png]
>>
>>
>>
>> Pyspark version: 2.4.4
>>
>>
>>
>> The generic function that I tend to use for writing checkpoint looks like
>> this, after putting several debug points to identify what causes the
>> problem:
>>
>> def write_checkpoint(self, df, event_name, no_partition_save=None,
>> no_partition_read=None, partition_by=None, cache=True):
>>         t0 = perf_counter()
>>         # output to hdfs
>>         hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate
>> from the previous output
>>         t1=perf_counter()
>>         self.logger.debug(f'write_checkpoint - get_path execution time:
>> {round((perf_counter()-t0),1)} seconds')
>>         t0 = perf_counter()
>>
>>         if no_partition_save:
>>             # coalesce instead of repartition can have unwanted behaviour
>>             #
>> https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
>>             df.repartition(no_partition_save) \
>>                 .write \
>>                 .mode("overwrite") \
>>                 .save(hdfs_path)
>>         elif partition_by:
>>             df.write \
>>                 .partitionBy(partition_by) \
>>                 .mode("overwrite") \
>>                 .save(hdfs_path)
>>         else:
>>             df \
>>                 .write \
>>                 .mode("overwrite") \
>>                 .save(hdfs_path)
>>         t1=perf_counter()
>>         self.logger.debug(f'write_checkpoint - write execution time:
>> {round((perf_counter()-t0),1)} seconds')
>>         t0 = perf_counter()
>>         self.hdfs_paths[event_name] = hdfs_path
>>
>>         df.unpersist()
>>         t1=perf_counter()
>>         self.logger.debug(f'write_checkpoint - unpersist execution time:
>> {round((perf_counter()-t0),1)} seconds')
>>
>>         t0 = perf_counter()
>>         if no_partition_read:
>>             df_new =
>> self.spark.read.load(hdfs_path).repartition(no_partition_read)
>>         else:
>>             df_new = self.spark.read.load(hdfs_path)
>>
>>         if partition_by:
>>             df_new = df.repartition(partition_by)
>>         if cache:
>>             df_new.cache()
>>         t1=perf_counter()
>>         self.logger.debug(f'write_checkpoint - read execution time:
>> {round((perf_counter()-t0),1)} seconds')
>>         return df_new
>>
>>
>> In the code, the function is called like this:
>> new_df = spark_connect.write_checkpoint(df=df, event_name="df_checkpoint")
>>
>> I do not have any guesses why this is happening. Anyone has any clever
>> thoughts?
>>
>>
>> Thanks
>> Andras
>>
>>
>>
>>
>>
>>
>>

Mime
View raw message