flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: why doesn't the over-window-aggregation sort the element(considering watermark) before processing?
Date Wed, 18 Apr 2018 08:04:43 GMT
The over window operates on an unbounded stream of data. Hence it is not
possible to sort the complete stream.
Instead we can sort ranges of the stream. Flink uses watermarks to define
these ranges.

The operator processes the records in timestamp order that are not late,
i.e., have timestamps larger than the last watermark.
In principle there are different ways to handle records that violate this
condition. In the current implementation of the operator we simply drop
these records.

At the current state, the only thing to avoid records from being dropped is
to use more conservative watermarks. Note that this will increase the
processing latency.

Best, Fabian

2018-04-18 8:55 GMT+02:00 Yan Zhou [FDS Science] <yzhou@coupang.com>:

> Hi,
>
>
> I use bounded over-window  aggregation in my application. However,
> sometimes some input elements are "discarded" and not generating output. By
> reading the source code of *RowTimeBoundedRangeOver.scala, *I realize the
> record is actually discarded if it is out of order. Please see the quoted
> code block below. Please help me to understand why don't we sort the record
> first? Said we are using *BoundedOutOfOrdernessTimestampExtractor*. we
> can use watermark to select a portion of the elements to do the sorting.
> when watermark proceeds, process the elements that are before the watermark
> and extend the portion of elements for sorting.
>
>
>
> Best
>
> Yan
>
>
>
> *override def processElement(*
> *    inputC: CRow,*
> *    ctx: ProcessFunction[CRow, CRow]#Context,*
> *    out: Collector[CRow]): Unit = {*
> *// triggering timestamp for trigger calculation*
> *val triggeringTs = input.getField(rowTimeIdx).asInstanceOf[Long]*
>
> *val lastTriggeringTs = lastTriggeringTsState.value*
>
> *// check if the data is expired, if not, save the data and register event
> time timer*
> *if (triggeringTs > lastTriggeringTs) {*
> *// put in cache, and register timer to process/clean    *
> *// ...*
> *}else{*
> *// DISCARD*
> *}*
> *} *
>
>

Mime
View raw message