samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <nickpa...@gmail.com>
Subject Re: Handling defaults and windowed aggregates in stream queries
Date Mon, 02 Mar 2015 01:23:11 GMT
Hi, Milinda,

Sorry to reply late on this. Here are some of my comments:
1) In Calcite's model, it seems that there is no stream-to-relation
conversion step. In the first example where the window specification is
missing, I like your solution to add the default LogicalNowWindow operator
s.t. it makes the physical operator matches the query plan. However, if
Calcite community does not agree to add the default LogicalNowWindow, it
would be fine for us if we always insert a default "now" window on a stream
when we generate the Samza configuration.
2) I am more concerned on the other cases, where window operator is used in
aggregation and join. In your example of windowed aggregation in Calcite,
window spec seems to be a decoration to the LogicalProject operator,
instead of defining a data source to the LogicalProject operator. In the
CQL model we followed, the window operator is considered as a query
primitive that generate a data source for other relation operators to
consume. How exactly is window operator used in Calcite planner? Isn't it
much clear if the following is used?
LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0), CAST($SUM0($2)):INTEGER,
null)])
   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)

On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage <mpathira@umail.iu.edu>
wrote:

> Hi devs,
>
> I ask about $subject in calcite-dev. You can find the archived discussion
> at [1]. I think your thoughts are also valuable in this discussion in
> calcite list.
>
> I discovered the requirement for a default window operator when I tried to
> integrate streamscan (I was using tablescan prevously) into the physical
> plan generation logic. Because of the way we have written the
> OperatorRouter API, we always need a stream-to-relation operator at the
> input. But Calcite generates a query plan like following:
>
> LogicalDelta
>   LogicalProject(id=[$0], product=[$1], quantity=[$2])
>     LogicalFilter(condition=[>($2, 5)])
>
>       StreamScan(table=[[KAFKA, ORDERS]], fields=[[0, 1, 2]])
>
> If we consider LogicalFilter as a relation operator, we need something to
> convert input stream to a relation before sending the tuples downstream.
> In addition to this, there is a optimization where we consider filter
> operator as a tuple operator and have it between StreamScan and
> stream-to-relation operator as a way of reducing the amount of messages
> going downstream.
>
> Other scenario is windowed aggregates. Currently window spec is attached to
> the LogicalProject in query plan like following:
>
> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER (ROWS BETWEEN 2 PRECEDING AND
> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN 2 PRECEDING AND 2
> FOLLOWING)):INTEGER, null)])
>
> I wanted to know from them whether it is possible to move window operation
> just after the stream scan, so that it is compatible with our operator
> layer.
> May be there are better or easier ways to do this. So your comments are
> always welcome.
>
> Thanks
> Milinda
>
>
> [1]
>
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message