flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] 1u0 commented on a change in pull request #8523: [FLINK-12481][runtime] Invoke timer callback in task thread (via mailbox)
Date Mon, 27 May 2019 10:43:21 GMT
1u0 commented on a change in pull request #8523: [FLINK-12481][runtime] Invoke timer callback
in task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523#discussion_r287741194

 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 @@ -1358,4 +1358,19 @@ public void actionsUnavailable() throws InterruptedException {
+	private class TimerInvocationContext implements SystemProcessingTimeService.ScheduledCallbackExecutionContext
+		@Override
+		public void invoke(ProcessingTimeCallback callback, long timestamp) throws InterruptedException
+			mailbox.putMail(() -> {
+				synchronized (getCheckpointLock()) {
 Review comment:
   Hi @Aitozi, thank you for the review!
   Personally, I'm not sure if this lock can be dropped at current stage (I'm yet trying to
understand the runtime and which things happen in different tasks in parallel).
   I've preserved it to keep old semantics when the callback was invoked in the timer's service
thread pool (just to be "safe"). My reasoning was that some implementations of `onProcessingTime`
may be missing proper synchronization lock (hence, the original reason why the timer service
had it in the first place) **and** at the current stage, there are still some additional threads
that may access tasks' state and make some operations (in particular methods related to checkpoints
   Although `SourceStreamTask` already has such lock around invocation in it's own mailbox
loop, but the base `StreamTask` doesn't have it (the base class make invoke some letters before
reaching the default action).
   Stefan also thinks that this lock is not needed anymore. I can try to read the code again
more thoroughly, to check if `onProcessingTime` is either
    * guaranteed to be executed under `SourceStreamTask.defaultAction`;
    * use locking internally, as implementation detail;
    * don't need locking at all.
   Regarding timer cancellation, my thought is that it isn't guaranteed anyway: the code that
use those timers, may cancel the timer, but it didn't have any guarantees if the timer has
already fired and the callback was already invoked.
   So, to support cancellation of `ProcessingTimeCallback`s that are already in the mailbox,
would be a slight optimization (by stretching cancellation time window untill the callback
is in the mailbox). But not having it, should be not worse situation as before, imo.

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message