flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Soumya Simanta <soumya.sima...@gmail.com>
Subject Re: Window start and end issue with TumblingProcessingTimeWindows
Date Tue, 07 Jun 2016 12:56:58 GMT
The problem is why is the window end time in the future ? 

For example if my window size is 60 seconds and my window is being evaluated at 3.00 pm then
why is the window end time 3.01 pm and not 3.00 pm even when the data that is being evaluated
falls in the window 2.59 - 3.00. 

Sent from my iPhone

> On Jun 7, 2016, at 3:47 PM, Chesnay Schepler <chesnay@apache.org> wrote:
> 
> could you state a specific problem?
> 
>> On 07.06.2016 06:40, Soumya Simanta wrote:
>> I've a simple program which takes some inputs from a command line (Socket stream)
and then aggregates based on the key. 
>> 
>> When running this program on my local machine I see some output that is counter intuitive
to my understanding of windows in Flink. 
>> 
>> The start time of the Window is around the time the Functions are being evaluated.
However, the window end time is around 60 s (window size) after the current time (please see
below).  
>> 
>> Can someone explain this behaviour please? 
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.TimeCharacteristic
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
>> import org.apache.flink.streaming.api.windowing.time.Time
>> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
>> import org.apache.flink.util.Collector
>> 
>> case class EventAgg(start: Long, end: Long, key: String, value: Int)
>> 
>> object Processor {
>> 
>>   val window_length = 60000 // milliseconds
>> 
>>   def aggregateEvents(key: String, window: TimeWindow, in: Iterable[Event], out:
Collector[EventAgg]): Unit = {
>>     var sum = 0
>>     for (e <- in) {
>>       sum = sum + e.value
>>     }
>>     val start = window.getStart
>>     val end = window.getEnd
>>     val diff = (end - start)
>>     println(s" windowId: ${window.hashCode()} currenttime: ${System.currentTimeMillis()}
key:[$key] start: $start end: $end diff: $diff")
>> 
>> 
>>     out.collect(
>>       new EventAgg(
>>         start = window.getStart,
>>         end = window.getEnd,
>>         key = key,
>>         value = sum
>>       )
>>     )
>>   }
>> 
>>   def main(Args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>     //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> 
>>     val sevents = env.socketTextStream("localhost", 9000)
>>     sevents
>>       .map(x => parseEvent(x))
>>       .keyBy(_.key)
>>       .window(TumblingProcessingTimeWindows.of(Time.milliseconds(window_length)))
>>       .apply(aggregateEvents(_, _, _, _: Collector[EventAgg]))
>>       .map("Default Assigner: " + System.currentTimeMillis().toString + " - " + _.toString)
>>       .print()
>> 
>>     env.execute("Event time windows")
>>   }
>> 
>>   def parseEvent(s: String): Event = {
>>     if (s == null || s.trim().length == 0)
>>       Event("default", 0, 0L)
>>     else {
>>       val parts = s.split(",")
>>       Event(parts(0), parts(1).toInt, 1L)
>>     }
>>   }
>> }
>> 
>> Output
>> 
>>  windowId: -663519360 currenttime: 1465234200007 key:[a] start: 1465234200000 end:
1465234260000 diff: 60000
>>  windowId: -663519360 currenttime: 1465234200006 key:[b] start: 1465234200000 end:
1465234260000 diff: 60000
>> 3> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,a,3)
>> 7> Default Assigner: 1465234200010 - EventAgg(1465234200000,1465234260000,b,4)
>> 
>> 
> 

Mime
View raw message