spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From András Kolbert <kolbertand...@gmail.com>
Subject Re: Spark Streaming - unpersist() method's execution time gradually increase with time
Date Mon, 16 Aug 2021 08:41:56 GMT
Hi All,

Managed to find out what's going on - however, I still can't explain it:

What I've been doing and causing issues in the long run:

df \
        .write \
        .mode("overwrite") \
        .parquet(hdfs_path)

Here I kept rotating the the HDFS_PATH, from */location/event_name/1* to
*/location/event_name/2* and then */location/event_name/2* to*
/location/event_name/1*

When I changed my process to be the following:
unique_hdfs_path =  */location/event_name/ + UNIX_TIMESTAMP*

df \
        .write \
        .mode("overwrite") \
        .parquet(unique_hdfs_path)


The issues seem to be resolved. However, I am still looking for an
explanation why... :)

Anyone could maybe shed some light on this?

Thanks
Andras



On Thu, 12 Aug 2021 at 15:55, András Kolbert <kolbertandras@gmail.com>
wrote:

> Also posted this on Stackoverflow, maybe the description there is more
> precise:
>
>
> https://stackoverflow.com/questions/68758772/specific-spark-write-operation-gradually-increase-with-time-in-streaming-applica
>
>
>
> On Tue, 10 Aug 2021 at 12:00, András Kolbert <kolbertandras@gmail.com>
> wrote:
>
>> [image: image.png]
>> Just one more screenshot from the logging - it took 39.7 seconds for the
>> write this time (inline with Spark UI).
>>
>> On Tue, 10 Aug 2021 at 11:57, András Kolbert <kolbertandras@gmail.com>
>> wrote:
>>
>>> Ok, i started digging deeper. I've uncommented the unpersist() method,
>>> to see the benefit.
>>> Now, the execution time is increasing at the write operation, where I'd
>>> have expected this to happen in the first place already.
>>>
>>> Strangely enough, I can see the following in Spark UI (40 seconds for
>>> the write operation):
>>>
>>>
>>> [image: image.png]
>>>
>>> However, the job itself runs in 2 seconds!
>>> [image: image.png]
>>>
>>> What about the remaining 38 seconds? How could I identify what's going
>>> on?
>>>
>>> Thanks
>>> Andras
>>>
>>> On Tue, 10 Aug 2021 at 09:35, András Kolbert <kolbertandras@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a long-running spark streaming job. The execution time
>>>> gradually, linearlyincreasing, and in 60 minutes the processing goes from
>>>> 40 seconds to 90 seconds.
>>>>
>>>> I managed to narrow down where I can see an increase in execution time
>>>> and it's the least expected point: when the *unpersist() *method is
>>>> called, the execution time goes from milliseconds to like 40-50 seconds.
>>>> Bizarre, especially because the unpersist()'s default argument is
>>>> blocking=False.
>>>>
>>>> Look how the unpersist time keeps increasing with time:
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> [image: image.png]
>>>>
>>>> [image: image.png]
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>>
>>>> Pyspark version: 2.4.4
>>>>
>>>>
>>>>
>>>> The generic function that I tend to use for writing checkpoint looks
>>>> like this, after putting several debug points to identify what causes the
>>>> problem:
>>>>
>>>> def write_checkpoint(self, df, event_name, no_partition_save=None,
>>>> no_partition_read=None, partition_by=None, cache=True):
>>>>         t0 = perf_counter()
>>>>         # output to hdfs
>>>>         hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate
>>>> from the previous output
>>>>         t1=perf_counter()
>>>>         self.logger.debug(f'write_checkpoint - get_path execution time:
>>>> {round((perf_counter()-t0),1)} seconds')
>>>>         t0 = perf_counter()
>>>>
>>>>         if no_partition_save:
>>>>             # coalesce instead of repartition can have unwanted
>>>> behaviour
>>>>             #
>>>> https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
>>>>             df.repartition(no_partition_save) \
>>>>                 .write \
>>>>                 .mode("overwrite") \
>>>>                 .save(hdfs_path)
>>>>         elif partition_by:
>>>>             df.write \
>>>>                 .partitionBy(partition_by) \
>>>>                 .mode("overwrite") \
>>>>                 .save(hdfs_path)
>>>>         else:
>>>>             df \
>>>>                 .write \
>>>>                 .mode("overwrite") \
>>>>                 .save(hdfs_path)
>>>>         t1=perf_counter()
>>>>         self.logger.debug(f'write_checkpoint - write execution time:
>>>> {round((perf_counter()-t0),1)} seconds')
>>>>         t0 = perf_counter()
>>>>         self.hdfs_paths[event_name] = hdfs_path
>>>>
>>>>         df.unpersist()
>>>>         t1=perf_counter()
>>>>         self.logger.debug(f'write_checkpoint - unpersist execution
>>>> time: {round((perf_counter()-t0),1)} seconds')
>>>>
>>>>         t0 = perf_counter()
>>>>         if no_partition_read:
>>>>             df_new =
>>>> self.spark.read.load(hdfs_path).repartition(no_partition_read)
>>>>         else:
>>>>             df_new = self.spark.read.load(hdfs_path)
>>>>
>>>>         if partition_by:
>>>>             df_new = df.repartition(partition_by)
>>>>         if cache:
>>>>             df_new.cache()
>>>>         t1=perf_counter()
>>>>         self.logger.debug(f'write_checkpoint - read execution time:
>>>> {round((perf_counter()-t0),1)} seconds')
>>>>         return df_new
>>>>
>>>>
>>>> In the code, the function is called like this:
>>>> new_df = spark_connect.write_checkpoint(df=df,
>>>> event_name="df_checkpoint")
>>>>
>>>> I do not have any guesses why this is happening. Anyone has any clever
>>>> thoughts?
>>>>
>>>>
>>>> Thanks
>>>> Andras
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Mime
View raw message