flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] rkhachatryan commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time timers before closing operator to properly support endInput
Date Fri, 15 Nov 2019 11:25:41 GMT
rkhachatryan commented on a change in pull request #10151: [FLINK-14231] Handle the processing-time
timers before closing operator to properly support endInput
URL: https://github.com/apache/flink/pull/10151#discussion_r346727792
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
 ##########
 @@ -18,18 +18,42 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.concurrent.NeverCompleteFuture;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
+@Internal
 class ProcessingTimeServiceImpl implements ProcessingTimeService {
 	private final TimerService timerService;
+
 	private final Function<ProcessingTimeCallback, ProcessingTimeCallback> processingTimeCallbackWrapper;
 
+	private final ConcurrentHashMap<ScheduledFuture<?>, Object> pendingTimers;
+
+	private final CompletableFuture<?> timersDoneFutureAfterQuiescing;
+
+	private volatile boolean isQuiesced;
+
 	ProcessingTimeServiceImpl(
 			TimerService timerService,
 			Function<ProcessingTimeCallback, ProcessingTimeCallback> processingTimeCallbackWrapper)
{
 		this.timerService = timerService;
 		this.processingTimeCallbackWrapper = processingTimeCallbackWrapper;
+
+		this.pendingTimers = new ConcurrentHashMap<>();
 
 Review comment:
   Do we need all the timers here? I think just a counter is enough, which we increment/decrement
(atomically) whenever we schedule/execute a task. Later we can check if (isQuiesced &&
coutnter.get == 0) ...
   
   Though we likely need to limit wait time, so we'll have to keep all times sorted to check
whether there are any timers left BEFORE the deadline (ConcurrentSkipList would work).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message