flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yan Zhou [FDS Science]" <yz...@coupang.com>
Subject why doesn't the over-window-aggregation sort the element(considering watermark) before processing?
Date Wed, 18 Apr 2018 06:55:02 GMT
Hi,


I use bounded over-window  aggregation in my application. However, sometimes some input elements
are "discarded" and not generating output. By reading the source code of RowTimeBoundedRangeOver.scala,
I realize the record is actually discarded if it is out of order. Please see the quoted code
block below. Please help me to understand why don't we sort the record first? Said we are
using BoundedOutOfOrdernessTimestampExtractor. we can use watermark to select a portion of
the elements to do the sorting. when watermark proceeds, process the elements that are before
the watermark and extend the portion of elements for sorting.



Best

Yan



override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {
// triggering timestamp for trigger calculation
val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]

val lastTriggeringTs = lastTriggeringTsState.value

// check if the data is expired, if not, save the data and register event time timer
if (triggeringTs > lastTriggeringTs) {
// put in cache, and register timer to process/clean
// ...
}else{
// DISCARD
}
}


Mime
View raw message