samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Milinda Pathirage <mpath...@umail.iu.edu>
Subject Re: Handling defaults and windowed aggregates in stream queries
Date Mon, 02 Mar 2015 15:14:10 GMT
@Chris
Yes, I was referring to that mail. Actually I was wrong about the ‘Now’
window, it should be a ‘Unbounded’ window for most the default scenarios
(Section 6.4 of https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf). Because
applying a ‘Now’ window with size of 1 will double the number of events
generated if we consider insert/delete streams. But ‘Unbounded’ will only
generate insert events.

@Yi
1. You are correct about Calcite.There is no stream-to-relation conversion
happening. But as I understand we don’t need Calcite to support this. We
can add it to our query planner as a rule or re-write. What I am not sure
is whether to use a rule or a re-write.
2. There is a rule in Calcite which extract the Window out from the
Project. But I am not sure why that didn’t happen in my test. This rule is
added to the planner by default. I’ll ask about this in Calcite mailing
list.

I think we can figure out a way to move the window to the input stream if
Calcite can move the window out from Project. I’ll see how we can do this.

Also I’ll go ahead and implement default windows. We can change it later if
Julian or someone from Calcite comes up with a better suggestion.

Thanks
Milinda

On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <nickpan47@gmail.com> wrote:

> Hi, Milinda,
>
> Sorry to reply late on this. Here are some of my comments:
> 1) In Calcite's model, it seems that there is no stream-to-relation
> conversion step. In the first example where the window specification is
> missing, I like your solution to add the default LogicalNowWindow operator
> s.t. it makes the physical operator matches the query plan. However, if
> Calcite community does not agree to add the default LogicalNowWindow, it
> would be fine for us if we always insert a default "now" window on a stream
> when we generate the Samza configuration.
> 2) I am more concerned on the other cases, where window operator is used in
> aggregation and join. In your example of windowed aggregation in Calcite,
> window spec seems to be a decoration to the LogicalProject operator,
> instead of defining a data source to the LogicalProject operator. In the
> CQL model we followed, the window operator is considered as a query
> primitive that generate a data source for other relation operators to
> consume. How exactly is window operator used in Calcite planner? Isn't it
> much clear if the following is used?
> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
> null)])
>    LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
>
> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <mpathira@umail.iu.edu
> >
> wrote:
>
> > Hi devs,
> >
> > I ask about $subject in calcite-dev. You can find the archived discussion
> > at [1]. I think your thoughts are also valuable in this discussion in
> > calcite list.
> >
> > I discovered the requirement for a default window operator when I tried
> to
> > integrate streamscan (I was using tablescan prevously) into the physical
> > plan generation logic. Because of the way we have written the
> > OperatorRouter API, we always need a stream-to-relation operator at the
> > input. But Calcite generates a query plan like following:
> >
> > LogicalDelta
> >   LogicalProject(id=[$0], product=[$1], quantity=[$2])
> >     LogicalFilter(condition=[>($2, 5)])
> >
> >       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
> >
> > If we consider LogicalFilter as a relation operator, we need something to
> > convert input stream to a relation before sending the tuples downstream.
> > In addition to this, there is a optimization where we consider filter
> > operator as a tuple operator and have it between StreamScan and
> > stream-to-relation operator as a way of reducing the amount of messages
> > going downstream.
> >
> > Other scenario is windowed aggregates. Currently window spec is attached
> to
> > the LogicalProject in query plan like following:
> >
> > LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING
> AND
> > 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> > FOLLOWING)):INTEGER, null)])
> >
> > I wanted to know from them whether it is possible to move window
> operation
> > just after the stream scan, so that it is compatible with our operator
> > layer.
> > May be there are better or easier ways to do this. So your comments are
> > always welcome.
> >
> > Thanks
> > Milinda
> >
> >
> > [1]
> >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message