Yes, that is correct.

Vishnu, thanks for the reply
so "event time" and "window end time" have nothing to do with current system timestamp, watermark moves with the higher value of "timestamp" field of the input and never moves down, is that correct understanding?

20 second corresponds to when the window state should be cleared. For the late message to be dropped, it should come in after you receive a message with event time >= window end time + 20 seconds.


sample code:

Let's say Xyz is POJO with a field called timestamp, 
regarding code withWatermark("timestamp", "20 seconds")
I expect the msg with timestamp 20 seconds or older will be dropped, what does 20 seconds compare to? based on my test nothing was dropped no matter how old the timestamp is, what did i miss?

Dataset<Xyz> xyz = lines
.map((MapFunction<String, Xyz>) value -> mapper.readValue(value, Xyz.class), Encoders.bean(Xyz.class));

Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds")
.groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), xyz.col("x") //tumbling window of size 5 seconds (timestamp)