spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Karthikeyan Ravi <qwertykarthi1...@gmail.com>
Subject [Spark Streaming] [DISCUSS] Clear metadata method and Generate Batches using same Event Loop
Date Mon, 08 Feb 2021 20:19:24 GMT
Hello,

Our system observed this behaviour of Batches getting delayed for
generation in spark streaming and thereby creating a very big batch and
followed by few zero record batches. I read the code and added logs to
confirm this root cause details below

1. GenerateBatch(Recurring Timer) and ClearMetadata(Event after completion
of a batch) are being added to same event loop -
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L62
2. In our case we are running concurrentJobs(which is more than 1) - so the
clearMetadata takes longer time because it is trying to clear all the
batches before (current batch - rememberDuration) - which is again a
problem here - in case of teams using concurrentJobs > 1 should we clear
all previous batches? some batches might be still executing because of some
dependency delays or other issues.
3. Because of #2, GenerateBatch event is delayed since eventLoop has a
BlockingQueue which tries to complete clearMetadata first. And the
generateBatch when it comes in to execute - pulls all remaining which has
not been pulled before for a long time and other consecutive generate
batches are pulling zero records.


Proposals:
1. Is there a way we could use different eventloops for generate batch and
clear metadata? is this a feasible option?
2. Should we just clear the metadata only for the batch that completes? and
not assume all the before batches of (batch time - rememberDuration) would
have been complete. ?
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L452


Change the code from generatedRDDs.filter(_._1 <= (time -
rememberDuration)) to this generatedRDDs.filter(_._1 == (time))

I would like to pick up fixing these issues if anyone could validate the
problems and proposals.

-- 
Regards,
Karthikeyan R.

Mime
View raw message