samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <jul...@hydromatic.net>
Subject Re: A question regarding to the default semantic meaning of join
Date Tue, 10 Mar 2015 20:58:05 GMT
See answers inline. Proving, yet again, that there's no such thing as a short semantics email.
:)

> On Mar 9, 2015, at 12:48 PM, Yi Pan <nickpan47@gmail.com> wrote:
> 
> Hi, Julian,
> 
> Thanks for the reply. I want to make sure that I understand your
> explanation on windows in JOIN more explicitly.
> For the following case:
> Q.1) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
> Orders OVER (ORDER BY rowtime RANGE '3' min PROCEDING)
>    JOIN Shipments OVER (ORDER BY rowtime RANGE '2' min PROCEDING)
>    ON Orders.id = Shipments.id
> 
> If we receive the following rows in Orders and Shipments:
> Orders:
> (rowtime=9:00, id=1)
> (rowtime=9:01, id=2)
> (rowtime=9:02, id=3)
> (rowtime=9:03, id=1)
> Shipments:
> (rowtime=12:00, id=2)
> (rowtime=12:00, id=1)
> (rowtime=12:02, id=3)
> (rowtime=12:03, id=1)
> 
> In Q.1, we will have the following windows:
> Orders:
> window1(start=8:58, end=9:00)={(rowtime=9:00, value=1, id=1)}
> window2(start=8:59, end=9:01)={(rowtime=9:00, value=1, id=1),
> (rowtime=9:01, value=1, id=2)}
> window3(start=9:00, end=9:02)={(rowtime=9:00, value=1, id=1),
> (rowtime=9:01, value=1, id=2), (rowtime=9:02, value=1, id=3)}
> window4(start=9:01, end=9:03)={(rowtime=9:01, value=1, id=2),
> (rowtime=9:02, value=1, id=3), (rowtime=9:03, value=1, id=1)}
> Shipments:
> window1(start=11:59, end=12:00)={(rowtime=12:00, cost=1, id=2),
> (rowtime=12:00, cost=1, id=1)}
> window2(start=12:00, end=12:01)={(rowtime=12:00, cost=1, id=2),
> (rowtime=12:00, cost=1, id=1)}
> window3(start=12:01, end=12:02)={(rowtime=12:02, cost=1, id=3)}
> window4(start=12:02, end=12:03)={(rowtime=12:02, cost=1, id=3),
> (rowtime=12:03, cost=1, id=1)}
> 
> So, here are a few questions:
> 1) Can those two streams join now? Note that the Orders and Shipments start
> with different timestamp in their windowing. Does query Q.1 mandates that
> two streams' rowtime being synchronized, implicitly? If not, how do we
> align the two streams' rowtime-based windows in join?

First a couple of minor comments.

a. An interval literal looks like "INTERVAL <formatted string> <time unit> [ TO
<time unit> ]", e.g. INTERVAL '3' MINUTE. Thus a window looks like "RANGE BETWEEN INTERVAL
'2' MINUTES PRECEDING AND INTERVAL '3' MINUTES FOLLOWING".

b. Your interpretation of a 3 minute window is very slightly off, because SQL's BETWEEN includes
its end points. If the current time is 12:00 then "ORDER BY rowtime RANGE INTERVAL '3' MINUTES
PRECEDING", which is shorthand for "ORDER BY rowtime RANGE BETWEEN INTERVAL '3' MINUTES PRECEDING
AND CURRENT ROW" then the window includes 11:57:00 and 12:00:00. It does not include 11:56:59
or 12:00:01.

Now, about aligning timestamps. I don't think it is a good idea to read anything into the
start timestamp. It could screw up re-play, sparse streams, and other stuff. So, your query
will return no results, because the 9:00 window never overlaps the 12:00 window, whatever
time the streams are started.

As it stands, the semantics does not provide any means to "link" two streams.

There are a couple of directions we could go with the semantics. I'll call them "CQL" and
"Relational".

1. CQL. We could let each stream designates a special column that represents its "current
time". Not necessarily wall-clock time (because we want things to work if we re-play the stream
after the fact) but it defines "simultaneity" between streams. (Sometimes this stuff looks
like Special Relativity: see http://en.wikipedia.org/wiki/Relativity_of_simultaneity which
is no surprise, since we're dealing with a distributed system: http://www.podc.org/influential/2000-influential-paper.)


2. Relational. Don't have a notion of "current time". Any linkage between streams must be
explicit, in terms of data values in the streams, like your Q.3.

I think (2) is closer to the spirit of Codd's relational model, which prefers to relationships
in the data to be represented using data. A query written in the style of (2) can be run on
tables in an RDBMS and will produce the same result as against streams. (An RDBMS is guaranteed
to make progress, because all the data is present already.)

In either system, Calcite still needs to verify that a query can make progress before accepting
it. In (1) it makes progress by (in principle) ticking the current time forward and seeing
what rows pop out; in (2) it requires each input stream to have an increasing expression,
and uses that increasing expression to prove that the output stream

I think we can and should support both (1) and (2). The user does not have to specify a special
rowtime column in a stream, and if they don't, they can only use semantic (2). If they do,
they can use either (1) or (2). Most of the time people will not write OVER, so they will
end up with (2), and be happy with it.

> 2) If we synchronize the two streams and change the event time in Shipments
> to the following:
> Shipments:
> (rowtime=9:00, id=2)
> (rowtime=9:00, id=1)
> (rowtime=9:02, id=3)
> (rowtime=9:03, id=1)
> We have the windows in Shipments as:
> window1(start=11:59, end=12:00)={(rowtime=12:00, cost=1, id=2),
> (rowtime=12:00, cost=1, id=1)}
> window2(start=12:00, end=12:01)={(rowtime=12:00, cost=1, id=2),
> (rowtime=12:00, cost=1, id=1)}
> window3(start=12:01, end=12:02)={(rowtime=12:02, cost=1, id=3)}
> window4(start=12:02, end=12:03)={(rowtime=12:02, cost=1, id=3),
> (rowtime=12:03, cost=1, id=1)}
> Now, following your explanation in my 3x3 query example, I assume that the
> semantic meaning of this windowed join is:
> Orders.window1 join Shipments.window1 +
> Orders.window2 join Shipments.window2 +
> Orders.window3 join Shipments.window3 +
> Orders.window4 join Shipments.window4
> Is it the correct interpretation of the windowed join when we define
> windows in both streams?

So, this query will produce something. If we follow the CQL semantics, here's how we figure
out what it will emit.

At 9:01:59, Orders' window has rows between 8:58:59 and 9:01:59 inclusive, namely

(rowtime=9:00, id=1)
(rowtime=9:01, id=2)

and Shipments' window has rows between 8:58:59 and 9:01:59 inclusive, namely

(rowtime=9:00, id=2)
(rowtime=9:00, id=1)

and the Orders-join-Shipments window has

(rowtime=9:00, id=1, rowtime=9:00, id=1)
(rowtime=9:01, id=2, rowtime=0:00, id=2)

Now the clock ticks forward to 9:02:00. The Orders window is now from 8:59:00 to 9:02:00 inclusive,
and a row enters the Orders window:

(rowtime=9:00, id=1)
(rowtime=9:01, id=2)
(rowtime=9:02, id=3) *

The Shipments window is now also from 8:59:00 to 9:02:00 inclusive, and a row enters the Shipments
window:

(rowtime=9:00, id=2)
(rowtime=9:00, id=1)
(rowtime=9:02, id=3) *

and a row enters the Orders-join-Shipments window:

(rowtime=9:00, id=1, rowtime=9:00, id=1)
(rowtime=9:01, id=2, rowtime=9:00, id=2)
(rowtime=9:02, id=3, rowtime=9:02, id=3) *

Thus the stream query emits the new row timestamped 9:02.

To figure out what comes out of the stream, make this calculation, of the contents of the
Orders-join-Shipments window, at every possible clock-tick.

> A follow-up question is below:
> 
> Is Q.1 equivalent to the followings?
> Q.2) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
> Orders OVER (ORDER BY rowtime RANGE CURRENT ROW)
>    JOIN Shipments OVER (ORDER BY rowtime RANGE BETWEEN '2' min PROCEDING
> AND '3' min FOLLOWING)
>    ON Orders.id = Shipments.id

I think you'll find that joining 3 minutes to 3 minutes has a different effect than joining
current row to 5 minutes. Put the data in RDBMS tables and try it. (You might need to create
multiple rows on each side with the same id to see the effect.) All practical stream-to-stream
joins I have seen have joined range-to-current-row, never range-to-range, because people don't
 want the cartesian product.

> Q.3) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
> Orders
>    JOIN Shipments
>    ON Orders.id = Shipments.id AND Orders.rowtime >= Shipments.rowtime -
> 2min AND Orders.rowtime <= Shipments.rowtime + 3min
> 
> In my interpretation, only Q.3 gives me the explicit answers regarding to
> the following:
> 1) What's the size of the time-window in which the join should happen (in
> Q.3, 5 min)
> 2) How to align the timestamps in two different streams in a join ( in Q.3,
> very explicit between the two streams)
> 3) What's the cardinality of the output from the join (in Q.3, for each row
> in Orders, the number of output is the matching results from all rows in
> Shipments in a 5 min window)
> 

I think Q.2 and Q.3 are equivalent. Q.2 is written in the CQL style (1) and Q.3 is written
in the relational style (2). If you are saying that you find it easier to reason about Q.3,
then I agree. :)

Julian


Mime
View raw message