Hey, Julian,
Thanks! I was headdown in writing out the design doc for the window
operator till now.
Thanks for your explanation. I think that we have an agreement on this now.
If we interpret Q.2 and Q.3 equivalent, there is an implicit assumption in
Q.2 that Orders.rowtime = Shipment.rowtime, which was explicitly addressed
in Q.3.
Your example using http://en.wikipedia.org/wiki/Relativity_of_simultaneity is
interesting and makes sense. Essentially, the joiner is the "observer" in
this case and the concept of "simultaneity" should be defined by the
joiner. In Q.2, the definition of "simultaneity" is implicitly through
rowtime (assuming that two distinct worlds in the universe using the same
time clock) while in Q.3, the "observer" explicitly defines the "same time"
when two distinct worlds using different time clocks.
Hence, my thought is that for windowed join, it should be up to the joiner
to implement which window in stream A needs to be join to which windows in
stream B. Does it sound good?
On Tue, Mar 10, 2015 at 1:58 PM, Julian Hyde <julian@hydromatic.net> wrote:
> See answers inline. Proving, yet again, that there's no such thing as a
> short semantics email. :)
>
> > On Mar 9, 2015, at 12:48 PM, Yi Pan <nickpan47@gmail.com> wrote:
> >
> > Hi, Julian,
> >
> > Thanks for the reply. I want to make sure that I understand your
> > explanation on windows in JOIN more explicitly.
> > For the following case:
> > Q.1) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
> > Orders OVER (ORDER BY rowtime RANGE '3' min PROCEDING)
> > JOIN Shipments OVER (ORDER BY rowtime RANGE '2' min PROCEDING)
> > ON Orders.id = Shipments.id
> >
> > If we receive the following rows in Orders and Shipments:
> > Orders:
> > (rowtime=9:00, id=1)
> > (rowtime=9:01, id=2)
> > (rowtime=9:02, id=3)
> > (rowtime=9:03, id=1)
> > Shipments:
> > (rowtime=12:00, id=2)
> > (rowtime=12:00, id=1)
> > (rowtime=12:02, id=3)
> > (rowtime=12:03, id=1)
> >
> > In Q.1, we will have the following windows:
> > Orders:
> > window1(start=8:58, end=9:00)={(rowtime=9:00, value=1, id=1)}
> > window2(start=8:59, end=9:01)={(rowtime=9:00, value=1, id=1),
> > (rowtime=9:01, value=1, id=2)}
> > window3(start=9:00, end=9:02)={(rowtime=9:00, value=1, id=1),
> > (rowtime=9:01, value=1, id=2), (rowtime=9:02, value=1, id=3)}
> > window4(start=9:01, end=9:03)={(rowtime=9:01, value=1, id=2),
> > (rowtime=9:02, value=1, id=3), (rowtime=9:03, value=1, id=1)}
> > Shipments:
> > window1(start=11:59, end=12:00)={(rowtime=12:00, cost=1, id=2),
> > (rowtime=12:00, cost=1, id=1)}
> > window2(start=12:00, end=12:01)={(rowtime=12:00, cost=1, id=2),
> > (rowtime=12:00, cost=1, id=1)}
> > window3(start=12:01, end=12:02)={(rowtime=12:02, cost=1, id=3)}
> > window4(start=12:02, end=12:03)={(rowtime=12:02, cost=1, id=3),
> > (rowtime=12:03, cost=1, id=1)}
> >
> > So, here are a few questions:
> > 1) Can those two streams join now? Note that the Orders and Shipments
> start
> > with different timestamp in their windowing. Does query Q.1 mandates that
> > two streams' rowtime being synchronized, implicitly? If not, how do we
> > align the two streams' rowtimebased windows in join?
>
> First a couple of minor comments.
>
> a. An interval literal looks like "INTERVAL <formatted string> <time unit>
> [ TO <time unit> ]", e.g. INTERVAL '3' MINUTE. Thus a window looks like
> "RANGE BETWEEN INTERVAL '2' MINUTES PRECEDING AND INTERVAL '3' MINUTES
> FOLLOWING".
>
> b. Your interpretation of a 3 minute window is very slightly off, because
> SQL's BETWEEN includes its end points. If the current time is 12:00 then
> "ORDER BY rowtime RANGE INTERVAL '3' MINUTES PRECEDING", which is shorthand
> for "ORDER BY rowtime RANGE BETWEEN INTERVAL '3' MINUTES PRECEDING AND
> CURRENT ROW" then the window includes 11:57:00 and 12:00:00. It does not
> include 11:56:59 or 12:00:01.
>
> Now, about aligning timestamps. I don't think it is a good idea to read
> anything into the start timestamp. It could screw up replay, sparse
> streams, and other stuff. So, your query will return no results, because
> the 9:00 window never overlaps the 12:00 window, whatever time the streams
> are started.
>
> As it stands, the semantics does not provide any means to "link" two
> streams.
>
> There are a couple of directions we could go with the semantics. I'll call
> them "CQL" and "Relational".
>
> 1. CQL. We could let each stream designates a special column that
> represents its "current time". Not necessarily wallclock time (because we
> want things to work if we replay the stream after the fact) but it defines
> "simultaneity" between streams. (Sometimes this stuff looks like Special
> Relativity: see http://en.wikipedia.org/wiki/Relativity_of_simultaneity
> which is no surprise, since we're dealing with a distributed system:
> http://www.podc.org/influential/2000influentialpaper.)
>
> 2. Relational. Don't have a notion of "current time". Any linkage between
> streams must be explicit, in terms of data values in the streams, like your
> Q.3.
>
> I think (2) is closer to the spirit of Codd's relational model, which
> prefers to relationships in the data to be represented using data. A query
> written in the style of (2) can be run on tables in an RDBMS and will
> produce the same result as against streams. (An RDBMS is guaranteed to make
> progress, because all the data is present already.)
>
> In either system, Calcite still needs to verify that a query can make
> progress before accepting it. In (1) it makes progress by (in principle)
> ticking the current time forward and seeing what rows pop out; in (2) it
> requires each input stream to have an increasing expression, and uses that
> increasing expression to prove that the output stream
>
> I think we can and should support both (1) and (2). The user does not have
> to specify a special rowtime column in a stream, and if they don't, they
> can only use semantic (2). If they do, they can use either (1) or (2). Most
> of the time people will not write OVER, so they will end up with (2), and
> be happy with it.
>
> > 2) If we synchronize the two streams and change the event time in
> Shipments
> > to the following:
> > Shipments:
> > (rowtime=9:00, id=2)
> > (rowtime=9:00, id=1)
> > (rowtime=9:02, id=3)
> > (rowtime=9:03, id=1)
> > We have the windows in Shipments as:
> > window1(start=11:59, end=12:00)={(rowtime=12:00, cost=1, id=2),
> > (rowtime=12:00, cost=1, id=1)}
> > window2(start=12:00, end=12:01)={(rowtime=12:00, cost=1, id=2),
> > (rowtime=12:00, cost=1, id=1)}
> > window3(start=12:01, end=12:02)={(rowtime=12:02, cost=1, id=3)}
> > window4(start=12:02, end=12:03)={(rowtime=12:02, cost=1, id=3),
> > (rowtime=12:03, cost=1, id=1)}
> > Now, following your explanation in my 3x3 query example, I assume that
> the
> > semantic meaning of this windowed join is:
> > Orders.window1 join Shipments.window1 +
> > Orders.window2 join Shipments.window2 +
> > Orders.window3 join Shipments.window3 +
> > Orders.window4 join Shipments.window4
> > Is it the correct interpretation of the windowed join when we define
> > windows in both streams?
>
> So, this query will produce something. If we follow the CQL semantics,
> here's how we figure out what it will emit.
>
> At 9:01:59, Orders' window has rows between 8:58:59 and 9:01:59 inclusive,
> namely
>
> (rowtime=9:00, id=1)
> (rowtime=9:01, id=2)
>
> and Shipments' window has rows between 8:58:59 and 9:01:59 inclusive,
> namely
>
> (rowtime=9:00, id=2)
> (rowtime=9:00, id=1)
>
> and the OrdersjoinShipments window has
>
> (rowtime=9:00, id=1, rowtime=9:00, id=1)
> (rowtime=9:01, id=2, rowtime=0:00, id=2)
>
> Now the clock ticks forward to 9:02:00. The Orders window is now from
> 8:59:00 to 9:02:00 inclusive, and a row enters the Orders window:
>
> (rowtime=9:00, id=1)
> (rowtime=9:01, id=2)
> (rowtime=9:02, id=3) *
>
> The Shipments window is now also from 8:59:00 to 9:02:00 inclusive, and a
> row enters the Shipments window:
>
> (rowtime=9:00, id=2)
> (rowtime=9:00, id=1)
> (rowtime=9:02, id=3) *
>
> and a row enters the OrdersjoinShipments window:
>
> (rowtime=9:00, id=1, rowtime=9:00, id=1)
> (rowtime=9:01, id=2, rowtime=9:00, id=2)
> (rowtime=9:02, id=3, rowtime=9:02, id=3) *
>
> Thus the stream query emits the new row timestamped 9:02.
>
> To figure out what comes out of the stream, make this calculation, of the
> contents of the OrdersjoinShipments window, at every possible clocktick.
>
> > A followup question is below:
> >
> > Is Q.1 equivalent to the followings?
> > Q.2) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
> > Orders OVER (ORDER BY rowtime RANGE CURRENT ROW)
> > JOIN Shipments OVER (ORDER BY rowtime RANGE BETWEEN '2' min PROCEDING
> > AND '3' min FOLLOWING)
> > ON Orders.id = Shipments.id
>
> I think you'll find that joining 3 minutes to 3 minutes has a different
> effect than joining current row to 5 minutes. Put the data in RDBMS tables
> and try it. (You might need to create multiple rows on each side with the
> same id to see the effect.) All practical streamtostream joins I have
> seen have joined rangetocurrentrow, never rangetorange, because people
> don't want the cartesian product.
>
> > Q.3) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
> > Orders
> > JOIN Shipments
> > ON Orders.id = Shipments.id AND Orders.rowtime >= Shipments.rowtime 
> > 2min AND Orders.rowtime <= Shipments.rowtime + 3min
> >
> > In my interpretation, only Q.3 gives me the explicit answers regarding to
> > the following:
> > 1) What's the size of the timewindow in which the join should happen (in
> > Q.3, 5 min)
> > 2) How to align the timestamps in two different streams in a join ( in
> Q.3,
> > very explicit between the two streams)
> > 3) What's the cardinality of the output from the join (in Q.3, for each
> row
> > in Orders, the number of output is the matching results from all rows in
> > Shipments in a 5 min window)
> >
>
> I think Q.2 and Q.3 are equivalent. Q.2 is written in the CQL style (1)
> and Q.3 is written in the relational style (2). If you are saying that you
> find it easier to reason about Q.3, then I agree. :)
>
> Julian
>
>
