samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <nickpa...@gmail.com>
Subject Re: A question regarding to the default semantic meaning of join
Date Mon, 09 Mar 2015 19:48:51 GMT
Hi, Julian,

Thanks for the reply. I want to make sure that I understand your
explanation on windows in JOIN more explicitly.
For the following case:
Q.1) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
Orders OVER (ORDER BY rowtime RANGE '3' min PROCEDING)
    JOIN Shipments OVER (ORDER BY rowtime RANGE '2' min PROCEDING)
    ON Orders.id = Shipments.id

If we receive the following rows in Orders and Shipments:
Orders:
(rowtime=9:00, id=1)
(rowtime=9:01, id=2)
(rowtime=9:02, id=3)
(rowtime=9:03, id=1)
Shipments:
(rowtime=12:00, id=2)
(rowtime=12:00, id=1)
(rowtime=12:02, id=3)
(rowtime=12:03, id=1)

In Q.1, we will have the following windows:
Orders:
window1(start=8:58, end=9:00)={(rowtime=9:00, value=1, id=1)}
window2(start=8:59, end=9:01)={(rowtime=9:00, value=1, id=1),
(rowtime=9:01, value=1, id=2)}
window3(start=9:00, end=9:02)={(rowtime=9:00, value=1, id=1),
(rowtime=9:01, value=1, id=2), (rowtime=9:02, value=1, id=3)}
window4(start=9:01, end=9:03)={(rowtime=9:01, value=1, id=2),
(rowtime=9:02, value=1, id=3), (rowtime=9:03, value=1, id=1)}
Shipments:
window1(start=11:59, end=12:00)={(rowtime=12:00, cost=1, id=2),
(rowtime=12:00, cost=1, id=1)}
window2(start=12:00, end=12:01)={(rowtime=12:00, cost=1, id=2),
(rowtime=12:00, cost=1, id=1)}
window3(start=12:01, end=12:02)={(rowtime=12:02, cost=1, id=3)}
window4(start=12:02, end=12:03)={(rowtime=12:02, cost=1, id=3),
(rowtime=12:03, cost=1, id=1)}

So, here are a few questions:
1) Can those two streams join now? Note that the Orders and Shipments start
with different timestamp in their windowing. Does query Q.1 mandates that
two streams' rowtime being synchronized, implicitly? If not, how do we
align the two streams' rowtime-based windows in join?
2) If we synchronize the two streams and change the event time in Shipments
to the following:
Shipments:
(rowtime=9:00, id=2)
(rowtime=9:00, id=1)
(rowtime=9:02, id=3)
(rowtime=9:03, id=1)
We have the windows in Shipments as:
window1(start=11:59, end=12:00)={(rowtime=12:00, cost=1, id=2),
(rowtime=12:00, cost=1, id=1)}
window2(start=12:00, end=12:01)={(rowtime=12:00, cost=1, id=2),
(rowtime=12:00, cost=1, id=1)}
window3(start=12:01, end=12:02)={(rowtime=12:02, cost=1, id=3)}
window4(start=12:02, end=12:03)={(rowtime=12:02, cost=1, id=3),
(rowtime=12:03, cost=1, id=1)}
Now, following your explanation in my 3x3 query example, I assume that the
semantic meaning of this windowed join is:
Orders.window1 join Shipments.window1 +
Orders.window2 join Shipments.window2 +
Orders.window3 join Shipments.window3 +
Orders.window4 join Shipments.window4
Is it the correct interpretation of the windowed join when we define
windows in both streams? A follow-up question is below:

Is Q.1 equivalent to the followings?
Q.2) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
Orders OVER (ORDER BY rowtime RANGE CURRENT ROW)
    JOIN Shipments OVER (ORDER BY rowtime RANGE BETWEEN '2' min PROCEDING
AND '3' min FOLLOWING)
    ON Orders.id = Shipments.id
Q.3) SELECT STREAM id, cost, value, Shipments.rowtime as shiptime FROM
Orders
    JOIN Shipments
    ON Orders.id = Shipments.id AND Orders.rowtime >= Shipments.rowtime -
2min AND Orders.rowtime <= Shipments.rowtime + 3min

In my interpretation, only Q.3 gives me the explicit answers regarding to
the following:
1) What's the size of the time-window in which the join should happen (in
Q.3, 5 min)
2) How to align the timestamps in two different streams in a join ( in Q.3,
very explicit between the two streams)
3) What's the cardinality of the output from the join (in Q.3, for each row
in Orders, the number of output is the matching results from all rows in
Shipments in a 5 min window)

Comments?

Thanks a lot in advance!

-Yi

And query (1) semi-infinite x semi-infinite could, if id is not unique
> in orders and shipments, produce an even bigger multiplier.
>
> For people writing practical queries I would only recommend having a
> multi-row window on one side of the join, and/or make sure that their
> id column is unique on one side of the join.
>
> To summarize: if we make assumptions about joining on rowtime, then
> your query and queries (1), (2), (3) are valid, but due to their
> different windows are not equivalent.
>
> Julian
>
> On Fri, Mar 6, 2015 at 4:28 PM, Yi Pan <nickpan47@gmail.com> wrote:
> > Hi, Julian,
> >
> > I am writing down some detailed examples of join and need your further
> help
> > in understanding the semantic meaning of the following example:
> >
> > SELECT id, value, cost FROM Orders OVER (ROWS 3 PRECEDING) JOIN Shipments
> > OVER (ROWS 3 PROCEDING) ON Orders.id = Shipments.id
> >
> > In this example, if id is not the row number, what would be the default
> > semantic meaning of the join? Does it mean that each sliding window in
> > Orders now need to join with each sliding window in Shipments? If that's
> > the case, that would be equivalent to removing all the windows and just
> do
> > a full stream-to-stream join, which is not possible for infinite streams.
> > Hence, does not make sense to define window in the query here. On the
> other
> > hand, if the semantic of the join is to join the "current windows" from
> > both streams, how do we align the windows from two streams? Because in
> this
> > case, when the query starts from two streams now determines the results.
> >
> > Does the above example make sense in SQL? Or should we just declare that
> > this is not supported as joins of two streams w/o time-bound?
> >
> > Thanks!
> >
> > -Yi
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message