calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <jh...@apache.org>
Subject Re: Expressing windowed stream-stream joins in SQL
Date Tue, 28 Feb 2017 04:04:34 GMT
Your query

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
  GROUP BY TUMBLE(a. rowtime, INTERVAL '1' MINUTE)

won't work because there is no constraint on b.rowtime. The system would literally have to
wait forever before it could move on from a, or output a total. Let’s try this:

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON FLOOR(a.rowtime TO MINUTE) = FLOOR(b.rowtime TO MINUTE)
  GROUP BY TUMBLE(a. rowtime, INTERVAL '1' MINUTE)

This query is valid, but may or may not be what you want. In order for records with the same
id to be matched, they will have to occur within the same minute. If a occurs at 10:43:59
and b occurs at 10:44:02, they will not match. Also, if there are several records in a or
b with the same id value then you will get a cartesian product effect.

A common scenario is that b needs to occur a after a within a rolling one minute window. Thus,
if a occurs at 10:43:59 then b can occur between 10:43:59 and 10:44:59. I would write that
query as follows:

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON b.rowtime BETWEEN a.rowtime AND a.rowtime + INTERVAL ‘1’ MINUTE

You can then summarize the joined records, emitting a total each minute:

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON b.rowtime BETWEEN a.rowtime AND a.rowtime + INTERVAL ‘1’ MINUTE
  GROUP BY TUMBLE(a. rowtime, INTERVAL '1' MINUTE)

The following query

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON b.rowtime BETWEEN a.rowtime AND a.rowtime + INTERVAL ‘1’ MINUTE
  GROUP BY TUMBLE(b. rowtime, INTERVAL '1' MINUTE)

differs only in the last line (“b.rowtime” versus “a.rowtime”), but will emit different
totals because we are bucketing by different timestamp fields. In order to implement this
query efficiently, Calcite will have to deduce that a.rowtime and b.rowtime are sufficiently
close that it can re-sort the join (ordered by a.rowtime) into the input that the aggregate
needs (order by r.rowtime). A partial sort over a rolling 1 minute window (think of a priority
queue) would suffice.

Julian


> On Feb 24, 2017, at 2:32 PM, Haohui Mai <wheat9@apache.org> wrote:
> 
> Hi,
> 
> I wonder what would be the best way to express joining two windowed streams
> in SQL?
> 
> A even simpler use case will be joining two streams within the same window
> (see
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html).
> I'm looking at something like
> 
> SELECT a.foo, b.bar FROM a JOIN a.id = b.id GROUP BY TUMBLE(a. rowtime,
> INTERVAL '1' MINUTE)
> 
> Is it the right syntax? Or on a even higher level, does this query has
> proper semantics?
> 
> Your comments are appreciated.
> 
> Regards,
> Haohui


Mime
View raw message