samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <jul...@hydromatic.net>
Subject Re: Handling defaults and windowed aggregates in stream queries
Date Tue, 03 Mar 2015 20:40:06 GMT
Sorry to show up late to this party. I've had my head down writing a description of streaming
SQL which I hoped would answer questions like this. Here is the latest draft: https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md

I've been avoiding windows for now. They are not needed for simple queries (project, filter,
windowed aggregate) and I wanted to write the specification of more complex queries before
I introduce them.

Let's look at a simple query, filter. According to CQL, to evaluate

  select stream *
  from orders
  where productId = 10    (query 1)

you need to convert orders to a relation over a particular window, apply the filter, then
convert back to a stream. We could write

  select stream *
  from orders over (order by rowtime range between unbounded preceding and current row)
  where productId = 10    (query 2)

or we could write

  select stream *
  from orders over (order by rowtime range between current row and current row)
  where productId = 10      (query 3)

Very different windows, but they produce the same result, because of the stateless nature
of Filter. So, let's suppose that the default window is the one I gave first, "(order by rowtime
range between unbounded preceding and current row)", and so query 1 is just short-hand for
query 2.

I currently translate query 1 to

Delta
  Filter($1 = 10)
    Scan(orders)

but I should really be translating to

Delta
  Filter($1 = 10)
    Chi(order by $0 range between unbounded preceding and current row)
      Scan(orders)

Delta is the "differentiation" operator and Chi is the "integration" operator. After we apply
rules to push the Delta through the Filter, the Delta and Chi will collide and cancel each
other out.

Why have I not yet introduced the Chi operator? Because I have not yet dealt with a query
where it makes any difference.

Where it will make a difference is joins. But even for joins, I hold out hope that we can
avoid explicit windows, most of the time. One could write

  select stream *
  from orders over (order by rowtime range between current row and interval '1' hour following)
  join shipments
  on orders.orderId = shipments.orderId    (query 4)

but I think most people would find the following clearer:

  select stream *
  from orders
  join shipments
  on orders.orderId = shipments.orderId          (query 5)
  and shipments.rowtime between orders.rowtime and orders.rowtime + interval '1' hour

Under the covers there are still the implicit windows:

  select stream *
  from orders over (order by rowtime range between unbounded preceding and current row)
  join shipments over (order by rowtime range between unbounded preceding and current row)
  on orders.orderId = shipments.orderId          (query 6)
  and shipments.rowtime between orders.rowtime and orders.rowtime + interval '1' hour

Query 6 is equivalent to query 5. But the system can notice the join condition involving the
two streams' rowtimes and trim down the windows (one window to an hour, another window to
just the current row) without changing semantics:

  select stream *
  from orders over (order by rowtime range between interval '1' hour preceding and current
row)
  join shipments over (order by rowtime range between current row and current row)
  on orders.orderId = shipments.orderId          (query 7)
  and shipments.rowtime between orders.rowtime and orders.rowtime + interval '1' hour

So, my hope is that end-users will rarely need to use an explicit window.

In the algebra, we will start introducing Chi. It will evaporate for simple queries such as
Filter. It will remain for more complex queries such as stream-to-stream join, because you
are joining the current row of one stream to a time-varying relation based on the other, and
Chi represents that "recent history of a stream" relation.

Julian


> On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <mpathira@umail.iu.edu> wrote:
> 
> Hi Yi,
> 
> As I understand rules and re-writes basically do the same thing
> (changing/re-writing the operator tree). But in case of rules this happens
> during planning based on the query planner configuration. And re-writing is
> done on the planner output, after the query goes through the planner. In
> Calcite re-write is happening inside the interpreter and in our case it
> will be inside the query plan to operator router conversion phase.
> 
> Thanks
> Milinda
> 
> On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <nickpan47@gmail.com> wrote:
> 
>> Hi, Milinda,
>> 
>> +1 on your default window idea. One question: what's the difference between
>> a rule and a re-write?
>> 
>> Thanks!
>> 
>> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <mpathira@umail.iu.edu>
>> wrote:
>> 
>>> @Chris
>>> Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
>>> window, it should be a ‘Unbounded’ window for most the default scenarios
>>> (Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf).
>>> Because
>>> applying a ‘Now’ window with size of 1 will double the number of events
>>> generated if we consider insert/delete streams. But ‘Unbounded’ will only
>>> generate insert events.
>>> 
>>> @Yi
>>> 1. You are correct about Calcite.There is no stream-to-relation
>> conversion
>>> happening. But as I understand we don’t need Calcite to support this. We
>>> can add it to our query planner as a rule or re-write. What I am not sure
>>> is whether to use a rule or a re-write.
>>> 2. There is a rule in Calcite which extract the Window out from the
>>> Project. But I am not sure why that didn’t happen in my test. This rule
>> is
>>> added to the planner by default. I’ll ask about this in Calcite mailing
>>> list.
>>> 
>>> I think we can figure out a way to move the window to the input stream if
>>> Calcite can move the window out from Project. I’ll see how we can do
>> this.
>>> 
>>> Also I’ll go ahead and implement default windows. We can change it later
>> if
>>> Julian or someone from Calcite comes up with a better suggestion.
>>> 
>>> Thanks
>>> Milinda
>>> 
>>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <nickpan47@gmail.com> wrote:
>>> 
>>>> Hi, Milinda,
>>>> 
>>>> Sorry to reply late on this. Here are some of my comments:
>>>> 1) In Calcite's model, it seems that there is no stream-to-relation
>>>> conversion step. In the first example where the window specification is
>>>> missing, I like your solution to add the default LogicalNowWindow
>>> operator
>>>> s.t. it makes the physical operator matches the query plan. However, if
>>>> Calcite community does not agree to add the default LogicalNowWindow,
>> it
>>>> would be fine for us if we always insert a default "now" window on a
>>> stream
>>>> when we generate the Samza configuration.
>>>> 2) I am more concerned on the other cases, where window operator is
>> used
>>> in
>>>> aggregation and join. In your example of windowed aggregation in
>> Calcite,
>>>> window spec seems to be a decoration to the LogicalProject operator,
>>>> instead of defining a data source to the LogicalProject operator. In
>> the
>>>> CQL model we followed, the window operator is considered as a query
>>>> primitive that generate a data source for other relation operators to
>>>> consume. How exactly is window operator used in Calcite planner? Isn't
>> it
>>>> much clear if the following is used?
>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
>>>> null)])
>>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
>>>> 
>>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <
>>> mpathira@umail.iu.edu
>>>>> 
>>>> wrote:
>>>> 
>>>>> Hi devs,
>>>>> 
>>>>> I ask about $subject in calcite-dev. You can find the archived
>>> discussion
>>>>> at [1]. I think your thoughts are also valuable in this discussion in
>>>>> calcite list.
>>>>> 
>>>>> I discovered the requirement for a default window operator when I
>> tried
>>>> to
>>>>> integrate streamscan (I was using tablescan prevously) into the
>>> physical
>>>>> plan generation logic. Because of the way we have written the
>>>>> OperatorRouter API, we always need a stream-to-relation operator at
>> the
>>>>> input. But Calcite generates a query plan like following:
>>>>> 
>>>>> LogicalDelta
>>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
>>>>>    LogicalFilter(condition=[>($2, 5)])
>>>>> 
>>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>>>>> 
>>>>> If we consider LogicalFilter as a relation operator, we need
>> something
>>> to
>>>>> convert input stream to a relation before sending the tuples
>>> downstream.
>>>>> In addition to this, there is a optimization where we consider filter
>>>>> operator as a tuple operator and have it between StreamScan and
>>>>> stream-to-relation operator as a way of reducing the amount of
>> messages
>>>>> going downstream.
>>>>> 
>>>>> Other scenario is windowed aggregates. Currently window spec is
>>> attached
>>>> to
>>>>> the LogicalProject in query plan like following:
>>>>> 
>>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2
>> PRECEDING
>>>> AND
>>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
>>>>> FOLLOWING)):INTEGER, null)])
>>>>> 
>>>>> I wanted to know from them whether it is possible to move window
>>>> operation
>>>>> just after the stream scan, so that it is compatible with our
>> operator
>>>>> layer.
>>>>> May be there are better or easier ways to do this. So your comments
>> are
>>>>> always welcome.
>>>>> 
>>>>> Thanks
>>>>> Milinda
>>>>> 
>>>>> 
>>>>> [1]
>>>>> 
>>>>> 
>>>> 
>>> 
>> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>>>>> 
>>>>> --
>>>>> Milinda Pathirage
>>>>> 
>>>>> PhD Student | Research Assistant
>>>>> School of Informatics and Computing | Data to Insight Center
>>>>> Indiana University
>>>>> 
>>>>> twitter: milindalakmal
>>>>> skype: milinda.pathirage
>>>>> blog: http://milinda.pathirage.org
>>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> Milinda Pathirage
>>> 
>>> PhD Student | Research Assistant
>>> School of Informatics and Computing | Data to Insight Center
>>> Indiana University
>>> 
>>> twitter: milindalakmal
>>> skype: milinda.pathirage
>>> blog: http://milinda.pathirage.org
>>> 
>> 
> 
> 
> 
> -- 
> Milinda Pathirage
> 
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
> 
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org


Mime
View raw message