spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jiewen Shao <fifistorm...@gmail.com>
Subject Re: Spark Streaming withWatermark
Date Tue, 06 Feb 2018 22:18:43 GMT
Ok, Thanks for confirmation.

So based on my code, I have messages with following timestamps (converted
to more readable format) in the following order:

2018-02-06 12:00:00
2018-02-06 12:00:01
2018-02-06 12:00:02
2018-02-06 12:00:03
2018-02-06 11:59:00  <-- this message should not be counted, right? however
in my test, this one is still counted



On Tue, Feb 6, 2018 at 2:05 PM, Vishnu Viswanath <
vishnu.viswanath25@gmail.com> wrote:

> Yes, that is correct.
>
> On Tue, Feb 6, 2018 at 4:56 PM, Jiewen Shao <fifistorm123@gmail.com>
> wrote:
>
>> Vishnu, thanks for the reply
>> so "event time" and "window end time" have nothing to do with current
>> system timestamp, watermark moves with the higher value of "timestamp"
>> field of the input and never moves down, is that correct understanding?
>>
>>
>> On Tue, Feb 6, 2018 at 11:47 AM, Vishnu Viswanath <
>> vishnu.viswanath25@gmail.com> wrote:
>>
>>> Hi
>>>
>>> 20 second corresponds to when the window state should be cleared. For
>>> the late message to be dropped, it should come in after you receive a
>>> message with event time >= window end time + 20 seconds.
>>>
>>> I wrote a post on this recently: http://vishnuviswana
>>> th.com/spark_structured_streaming.html#watermark
>>>
>>> Thanks,
>>> Vishnu
>>>
>>> On Tue, Feb 6, 2018 at 12:11 PM Jiewen Shao <fifistorm123@gmail.com>
>>> wrote:
>>>
>>>> sample code:
>>>>
>>>> Let's say Xyz is POJO with a field called timestamp,
>>>>
>>>> regarding code withWatermark("timestamp", "20 seconds")
>>>>
>>>> I expect the msg with timestamp 20 seconds or older will be dropped,
>>>> what does 20 seconds compare to? based on my test nothing was dropped no
>>>> matter how old the timestamp is, what did i miss?
>>>>
>>>> Dataset<Xyz> xyz = lines
>>>>         .as(Encoders.STRING())
>>>>         .map((MapFunction<String, Xyz>) value -> mapper.readValue(value,
Xyz.class), Encoders.bean(Xyz.class));
>>>>
>>>> Dataset<Row> aggregated = xyz.withWatermark("timestamp", "20 seconds")
>>>>         .groupBy(functions.window(xyz.col("timestamp"), "5 seconds"), xyz.col("x")
//tumbling window of size 5 seconds (timestamp)
>>>>         ).count();
>>>>
>>>> Thanks
>>>>
>>>>
>>
>

Mime
View raw message