spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From András Kolbert <kolbertand...@gmail.com>
Subject Re: Spark Streaming - unpersist() method's execution time gradually increase with time
Date Thu, 12 Aug 2021 13:55:41 GMT
Also posted this on Stackoverflow, maybe the description there is more
precise:

https://stackoverflow.com/questions/68758772/specific-spark-write-operation-gradually-increase-with-time-in-streaming-applica



On Tue, 10 Aug 2021 at 12:00, András Kolbert <kolbertandras@gmail.com>
wrote:

> [image: image.png]
> Just one more screenshot from the logging - it took 39.7 seconds for the
> write this time (inline with Spark UI).
>
> On Tue, 10 Aug 2021 at 11:57, András Kolbert <kolbertandras@gmail.com>
> wrote:
>
>> Ok, i started digging deeper. I've uncommented the unpersist() method, to
>> see the benefit.
>> Now, the execution time is increasing at the write operation, where I'd
>> have expected this to happen in the first place already.
>>
>> Strangely enough, I can see the following in Spark UI (40 seconds for the
>> write operation):
>>
>>
>> [image: image.png]
>>
>> However, the job itself runs in 2 seconds!
>> [image: image.png]
>>
>> What about the remaining 38 seconds? How could I identify what's going on?
>>
>> Thanks
>> Andras
>>
>> On Tue, 10 Aug 2021 at 09:35, András Kolbert <kolbertandras@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a long-running spark streaming job. The execution time gradually,
>>> linearlyincreasing, and in 60 minutes the processing goes from 40 seconds
>>> to 90 seconds.
>>>
>>> I managed to narrow down where I can see an increase in execution time
>>> and it's the least expected point: when the *unpersist() *method is
>>> called, the execution time goes from milliseconds to like 40-50 seconds.
>>> Bizarre, especially because the unpersist()'s default argument is
>>> blocking=False.
>>>
>>> Look how the unpersist time keeps increasing with time:
>>>
>>> [image: image.png]
>>>
>>>
>>> [image: image.png]
>>>
>>> [image: image.png]
>>>
>>> [image: image.png]
>>>
>>>
>>>
>>> Pyspark version: 2.4.4
>>>
>>>
>>>
>>> The generic function that I tend to use for writing checkpoint looks
>>> like this, after putting several debug points to identify what causes the
>>> problem:
>>>
>>> def write_checkpoint(self, df, event_name, no_partition_save=None,
>>> no_partition_read=None, partition_by=None, cache=True):
>>>         t0 = perf_counter()
>>>         # output to hdfs
>>>         hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate
>>> from the previous output
>>>         t1=perf_counter()
>>>         self.logger.debug(f'write_checkpoint - get_path execution time:
>>> {round((perf_counter()-t0),1)} seconds')
>>>         t0 = perf_counter()
>>>
>>>         if no_partition_save:
>>>             # coalesce instead of repartition can have unwanted behaviour
>>>             #
>>> https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
>>>             df.repartition(no_partition_save) \
>>>                 .write \
>>>                 .mode("overwrite") \
>>>                 .save(hdfs_path)
>>>         elif partition_by:
>>>             df.write \
>>>                 .partitionBy(partition_by) \
>>>                 .mode("overwrite") \
>>>                 .save(hdfs_path)
>>>         else:
>>>             df \
>>>                 .write \
>>>                 .mode("overwrite") \
>>>                 .save(hdfs_path)
>>>         t1=perf_counter()
>>>         self.logger.debug(f'write_checkpoint - write execution time:
>>> {round((perf_counter()-t0),1)} seconds')
>>>         t0 = perf_counter()
>>>         self.hdfs_paths[event_name] = hdfs_path
>>>
>>>         df.unpersist()
>>>         t1=perf_counter()
>>>         self.logger.debug(f'write_checkpoint - unpersist execution time:
>>> {round((perf_counter()-t0),1)} seconds')
>>>
>>>         t0 = perf_counter()
>>>         if no_partition_read:
>>>             df_new =
>>> self.spark.read.load(hdfs_path).repartition(no_partition_read)
>>>         else:
>>>             df_new = self.spark.read.load(hdfs_path)
>>>
>>>         if partition_by:
>>>             df_new = df.repartition(partition_by)
>>>         if cache:
>>>             df_new.cache()
>>>         t1=perf_counter()
>>>         self.logger.debug(f'write_checkpoint - read execution time:
>>> {round((perf_counter()-t0),1)} seconds')
>>>         return df_new
>>>
>>>
>>> In the code, the function is called like this:
>>> new_df = spark_connect.write_checkpoint(df=df,
>>> event_name="df_checkpoint")
>>>
>>> I do not have any guesses why this is happening. Anyone has any clever
>>> thoughts?
>>>
>>>
>>> Thanks
>>> Andras
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Mime
View raw message