flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: assign time attribute after first window group when using Flink SQL
Date Wed, 18 Apr 2018 07:44:48 GMT
This sounds like a windowed join between the raw stream and the aggregated
stream.
It might be possible to do the "lookup" in the second raw stream with
another windowed join. If not, you can fall back to the DataStream API /
ProcessFunction and implement the lookup logic as you need it.

Best, Fabian

2018-04-18 3:03 GMT+02:00 Ivan Wang <ivan.wang2010@gmail.com>:

> Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:
>
> *Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SlidingGroupWindow('w2,
> 'end, 150.rows, 1.rows) is invalid: Event-time grouping windows on row
> intervals in a stream environment are currently not supported.*
>
> Then I tried to OverWindows, luckily it can serve my requirement as well.
> Now my table query is like below
>
> .window(Tumble.over("15.seconds").on("timeMill").as("w1"))
> .groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max as p_max, price.min
as p_min")
> .window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
> .select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min over w2 as min");
>
>
> It works and I can get what I want. However, the result is not ordered by
> the rowtime (here I use "end" as alias). Is this by default and any thing
> to get it ordered?
>
> Below is the entire requirement,
>
> Basically there's one raw stream (r1), and I group it first by time as w1
> then by window count as w2. I'd like to compare the "price" field in every
> raw event with the same field in the most close preceding event in w2.
> If condition meets, I'd like to use the price value and timestamp in that
> event to get one matching event from another raw stream (r2).
>
> CEP sounds to be a good idea. But I need to refer to event in other stream
> (r2) in current pattern condition (r1). Is it possible to do this using CEP?
>
> Thanks
> Ivan
>
>
>
> On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Sorry, I forgot to CC the user mailing list in my reply.
>>
>> 2018-04-12 17:27 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>
>>> Hi,
>>>
>>> Assuming you are using event time, the right function to generate a row
>>> time attribute from a window would be "w1.rowtime" instead of "w1.start".
>>>
>>> The reason why Flink is picky about this is that we must ensure that the
>>> result rows of the windows are aligned with the watermarks of the stream.
>>>
>>> Best, Fabian
>>>
>>>
>>> Ivan Wang <ivan.wang2010@gmail.com> schrieb am So., 8. Apr. 2018, 22:26:
>>>
>>>> Hi all,
>>>>
>>>>
>>>>
>>>> I'd like to use 2 window group in a chain in my program as below.
>>>>
>>>>
>>>>
>>>> Table myTable = cTable
>>>>         .window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
>>>>         .groupBy("symbol, w1").select("w1.start as start, w1.end as
>>>> end, symbol, price.max as p_max, price.min as p_min")
>>>>         .window(Slide.*over*("150.rows").every("1.rows").on("start").a
>>>> s("w2"))
>>>>         .groupBy("symbol, w2").select("w2.start, w2.end, symbol,
>>>> p_max.max, p_min.min")
>>>>         ;
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows,
>>>> 1.rows) is invalid: Sliding window expects a time attribute for grouping
in
>>>> a stream environment.
>>>>
>>>>          at org.apache.flink.table.plan.lo
>>>> gical.LogicalNode.failValidation(LogicalNode.scala:149)
>>>>
>>>>          at org.apache.flink.table.plan.lo
>>>> gical.WindowAggregate.validate(operators.scala:658)
>>>>
>>>>          at org.apache.flink.table.api.Win
>>>> dowGroupedTable.select(table.scala:1159)
>>>>
>>>>          at org.apache.flink.table.api.Win
>>>> dowGroupedTable.select(table.scala:1179)
>>>>
>>>>          at minno.gundam.ReadPattern.main(ReadPattern.java:156)
>>>>
>>>>
>>>>
>>>> Is there any way to assign time attribute after the first groupBy (w1)?
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Ivan
>>>>
>>>>
>>>>
>>>>
>>
>

Mime
View raw message