samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <nickpa...@gmail.com>
Subject Re: Handling defaults and windowed aggregates in stream queries
Date Fri, 06 Mar 2015 19:09:28 GMT
Yeah, I am still thinking about it. Jay pointed out for event-time window,
the window start time may be derivable if we just keep a single starting
value for fixed length windows. I yet to think about the tuple window case
and the windows with dynamic length (i.e. session window example in
MillWheel).

On Fri, Mar 6, 2015 at 7:24 AM, Milinda Pathirage <mpathira@umail.iu.edu>
wrote:

> I think my previous comment about maintaining start and end offsets as the
> window state will not work when there are delays. We may need to keep
> multiple such offsets. But this may not be a clean solution.
>
> On Thu, Mar 5, 2015 at 2:42 PM, Milinda Pathirage <mpathira@umail.iu.edu>
> wrote:
>
> > Hi Yi,
> >
> > Please find my comments inline.
> >
> > On Thu, Mar 5, 2015 at 1:18 PM, Yi Pan <nickpan47@gmail.com> wrote:
> >
> >> Hi, Milinda,
> >>
> >> We have recently some discussions on the MillWheel model:
> >> http://www.infoq.com/presentations/millwheel.
> >
> >
> > Yes. Above is a very interesting talk. I asked the above question
> > regarding the language, just after watching the talk. I was under the
> > impression that we need to specify these details (handling delays)
> > explicitly in the query.
> >
> >
> >> It is very interesting talk and have one striking point that we did not
> >> think about before: handle late arrivals as a "correction" to the
> earlier
> >> results. Hence, if we follow that model, the late arrival problem that
> you
> >> described can be addressed in the following:
> >>
> >> a) Each window will have a closing policy: it would either be wall-clock
> >> based timeout, or the arrival of messages indicating that we have
> received
> >> all messages in the corresponding event time window
> >>
> >
> > Given that the closing policy is not explicit in the query, how we are
> > going to handle this. Is this policy going to be specific to a query or
> > system wide thing. I think I was not clear about this in the previous
> mail.
> >
> >
> >> b) Each window also keeps all the past messages it receives in the past
> >> windows, up to a large retention size that covers all possible late
> >> arrivals
> >>
> >
> > Are we going to keep this in local storage. Is this (keeping past
> > messages) really necessary in case of monotonic queries. May be you meant
> > to say we just keep metadata about offsets. So we can replay from Kafka
> (I
> > don't have that much experience with Kafka, but I think we can start
> > consuming from random offsets).
> >
> >
> >> c) When a window's closing policy is satisfied, the window operator
> always
> >> emits the current window results
> >>
> >
> > Does this means we are waiting for the window to be closed, before
> sending
> > new messages downstream? This may have performance implications, but this
> > will make it easy to implement the query processing. I think current
> > operator layer can support this style without any changes.
> >
> >
> >> d) When a late arrival message came, the window operator will re-emit
> the
> >> past window results to correct the previous window results
> >>
> >>
> > It would be better if we can do incremental updates without replaying the
> > whole window. But I believe there are advantages of this approach.
> >
> >
> >> In your example, the aggregation for the counter for window from
> >> 10:00-10:59 will have a "wrong" value when the window is closed by an
> >> arrival of message w/ 11:00 timestamp, but will be corrected later by a
> >> late arrival of another message in the time window from 10:00-10:59.
> I.e.
> >> if we keep all the previous window states, late arrival messages will
> >> simply trigger a re-computation of the aggregated counter for the window
> >> 10:00-10:59 and overwrite the previous result. In this model, the final
> >> result is always correct, as long as the late arrivals is within the
> large
> >> retention size.
> >>
> >> I have been thinking of this model and had a discussion with Julian
> >> yesterday. It seems that the followings are more reasonable to me:
> >> 1) Window operator will have a full buffered state of the stream similar
> >> to
> >> a time-varying materialized view over the retention size
> >> 2) Window size and termination (i.e. sliding/tumbling/hopping windows)
> >> will
> >> now determine when we emit window results (i.e. new messages/updates to
> >> the
> >> current window) to the downstream operator s.t. the operators can
> >> calculate
> >> result in time
> >> 3) Late arrivals will be sent to the downstream operator and triggers a
> >> re-computation of the past result based on the full buffered state
> >>
> >> In the above model, the window operator becomes a system feature, or an
> >> implementation of "StreamScan" in Calcite's term. And we do not need
> >> specific language support for the window semantics, with a default time
> >> window operator implementation that serves as a "StreamScan".  All
> window
> >> definition in the query language now only dictates the semantic meaning
> of
> >> aggregation and join on top of the physical window operator which
> >> provides:
> >> a) a varying/growing materialized view; b) a driver that tells the
> >> aggregation/join to compute/re-compute results on-top-of the
> materialized
> >> view.
> >>
> >>
> >>
> > I will think more about this model and may have more questions about this
> > in future :).
> >
> > Thanks
> > Milinda
> >
> >
> >> On Wed, Mar 4, 2015 at 10:28 AM, Milinda Pathirage <
> mpathira@umail.iu.edu
> >> >
> >> wrote:
> >>
> >> > Hi Julian,
> >> >
> >> > I went through the draft and it covers most of our requirements. But
> >> > aggregation over a window will not be as simple as mentioned in the
> >> draft.
> >> >
> >> > In the stream extension draft we have following:
> >> >
> >> > 'How did Calcite know that the 10:00:00 sub-totals were complete at
> >> > > 11:00:00, so that it could emit them? It knows that rowtime is
> >> > increasing,
> >> > > and it knows that FLOOR(rowtime TO HOUR) is also increasing. So,
> once
> >> it
> >> > > has seen a row at or after 11:00:00, it will never see a row that
> will
> >> > > contribute to a 10:00:00 total.'
> >> >
> >> >
> >> > When there are delays, we can't do above. Because observing a row with
> >> > rowtime greater than 11:00:00 doesn't mean events from 10:00:00 to
> >> 10:00:59
> >> > time window will not arrive after this observation. We have discussed
> >> this
> >> > in https://issues.apache.org/jira/browse/SAMZA-552. Even if we
> consider
> >> > the
> >> > 'system time/stream time' as mentioned in SAMZA-552, it doesn't
> >> guarantee
> >> > the absence of delays in a distributed setting. So we may need to
> >> > additional hints/extensions to specify extra information required to
> >> handle
> >> > complexities in window calculations.
> >> >
> >> > May be there are ways to handle this at Samza level, not in the query
> >> > language.
> >> >
> >> > @Chirs, @Yi
> >> > I got the query planner working with some dummy operators and
> re-writing
> >> > the query to add default window operators. But Julian's comments about
> >> > handling defaults and optimizing the query plan (moving the Delta down
> >> and
> >> > removing both Delta and Chi) got me into thinking whether enforcing
> CQL
> >> > semantics as we have in our current operator layer limits the
> >> flexibility
> >> > and increase the complexity of query plan to operator router
> generation.
> >> > Anyway, I am going to take a step back and think more about Julian's
> >> > comments. I'll put my thoughts into a design document for query
> planner.
> >> >
> >> > Thanks
> >> > Milinda
> >> >
> >> >
> >> > On Tue, Mar 3, 2015 at 3:40 PM, Julian Hyde <julian@hydromatic.net>
> >> wrote:
> >> >
> >> > > Sorry to show up late to this party. I've had my head down writing
a
> >> > > description of streaming SQL which I hoped would answer questions
> like
> >> > > this. Here is the latest draft:
> >> > >
> >> https://github.com/julianhyde/incubator-calcite/blob/chi/doc/STREAM.md
> >> > >
> >> > > I've been avoiding windows for now. They are not needed for simple
> >> > queries
> >> > > (project, filter, windowed aggregate) and I wanted to write the
> >> > > specification of more complex queries before I introduce them.
> >> > >
> >> > > Let's look at a simple query, filter. According to CQL, to evaluate
> >> > >
> >> > >   select stream *
> >> > >   from orders
> >> > >   where productId = 10    (query 1)
> >> > >
> >> > > you need to convert orders to a relation over a particular window,
> >> apply
> >> > > the filter, then convert back to a stream. We could write
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between unbounded
> preceding
> >> > and
> >> > > current row)
> >> > >   where productId = 10    (query 2)
> >> > >
> >> > > or we could write
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between current row and
> >> > current
> >> > > row)
> >> > >   where productId = 10      (query 3)
> >> > >
> >> > > Very different windows, but they produce the same result, because
of
> >> the
> >> > > stateless nature of Filter. So, let's suppose that the default
> window
> >> is
> >> > > the one I gave first, "(order by rowtime range between unbounded
> >> > preceding
> >> > > and current row)", and so query 1 is just short-hand for query 2.
> >> > >
> >> > > I currently translate query 1 to
> >> > >
> >> > > Delta
> >> > >   Filter($1 = 10)
> >> > >     Scan(orders)
> >> > >
> >> > > but I should really be translating to
> >> > >
> >> > > Delta
> >> > >   Filter($1 = 10)
> >> > >     Chi(order by $0 range between unbounded preceding and current
> row)
> >> > >       Scan(orders)
> >> > >
> >> > > Delta is the "differentiation" operator and Chi is the "integration"
> >> > > operator. After we apply rules to push the Delta through the Filter,
> >> the
> >> > > Delta and Chi will collide and cancel each other out.
> >> > >
> >> > > Why have I not yet introduced the Chi operator? Because I have not
> yet
> >> > > dealt with a query where it makes any difference.
> >> > >
> >> > > Where it will make a difference is joins. But even for joins, I hold
> >> out
> >> > > hope that we can avoid explicit windows, most of the time. One could
> >> > write
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between current row and
> >> > > interval '1' hour following)
> >> > >   join shipments
> >> > >   on orders.orderId = shipments.orderId    (query 4)
> >> > >
> >> > > but I think most people would find the following clearer:
> >> > >
> >> > >   select stream *
> >> > >   from orders
> >> > >   join shipments
> >> > >   on orders.orderId = shipments.orderId          (query 5)
> >> > >   and shipments.rowtime between orders.rowtime and orders.rowtime
+
> >> > > interval '1' hour
> >> > >
> >> > > Under the covers there are still the implicit windows:
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between unbounded
> preceding
> >> > and
> >> > > current row)
> >> > >   join shipments over (order by rowtime range between unbounded
> >> preceding
> >> > > and current row)
> >> > >   on orders.orderId = shipments.orderId          (query 6)
> >> > >   and shipments.rowtime between orders.rowtime and orders.rowtime
+
> >> > > interval '1' hour
> >> > >
> >> > > Query 6 is equivalent to query 5. But the system can notice the join
> >> > > condition involving the two streams' rowtimes and trim down the
> >> windows
> >> > > (one window to an hour, another window to just the current row)
> >> without
> >> > > changing semantics:
> >> > >
> >> > >   select stream *
> >> > >   from orders over (order by rowtime range between interval '1' hour
> >> > > preceding and current row)
> >> > >   join shipments over (order by rowtime range between current row
> and
> >> > > current row)
> >> > >   on orders.orderId = shipments.orderId          (query 7)
> >> > >   and shipments.rowtime between orders.rowtime and orders.rowtime
+
> >> > > interval '1' hour
> >> > >
> >> > > So, my hope is that end-users will rarely need to use an explicit
> >> window.
> >> > >
> >> > > In the algebra, we will start introducing Chi. It will evaporate for
> >> > > simple queries such as Filter. It will remain for more complex
> queries
> >> > such
> >> > > as stream-to-stream join, because you are joining the current row
of
> >> one
> >> > > stream to a time-varying relation based on the other, and Chi
> >> represents
> >> > > that "recent history of a stream" relation.
> >> > >
> >> > > Julian
> >> > >
> >> > >
> >> > > > On Mar 2, 2015, at 11:42 AM, Milinda Pathirage <
> >> mpathira@umail.iu.edu>
> >> > > wrote:
> >> > > >
> >> > > > Hi Yi,
> >> > > >
> >> > > > As I understand rules and re-writes basically do the same thing
> >> > > > (changing/re-writing the operator tree). But in case of rules
this
> >> > > happens
> >> > > > during planning based on the query planner configuration. And
> >> > re-writing
> >> > > is
> >> > > > done on the planner output, after the query goes through the
> >> planner.
> >> > In
> >> > > > Calcite re-write is happening inside the interpreter and in our
> >> case it
> >> > > > will be inside the query plan to operator router conversion phase.
> >> > > >
> >> > > > Thanks
> >> > > > Milinda
> >> > > >
> >> > > > On Mon, Mar 2, 2015 at 2:31 PM, Yi Pan <nickpan47@gmail.com>
> wrote:
> >> > > >
> >> > > >> Hi, Milinda,
> >> > > >>
> >> > > >> +1 on your default window idea. One question: what's the
> difference
> >> > > between
> >> > > >> a rule and a re-write?
> >> > > >>
> >> > > >> Thanks!
> >> > > >>
> >> > > >> On Mon, Mar 2, 2015 at 7:14 AM, Milinda Pathirage <
> >> > > mpathira@umail.iu.edu>
> >> > > >> wrote:
> >> > > >>
> >> > > >>> @Chris
> >> > > >>> Yes, I was referring to that mail. Actually I was wrong
about
> the
> >> > ‘Now’
> >> > > >>> window, it should be a ‘Unbounded’ window for most
the default
> >> > > scenarios
> >> > > >>> (Section 6.4 of
> >> https://cs.uwaterloo.ca/~david/cs848/stream-cql.pdf
> >> > ).
> >> > > >>> Because
> >> > > >>> applying a ‘Now’ window with size of 1 will double
the number of
> >> > events
> >> > > >>> generated if we consider insert/delete streams. But ‘Unbounded’
> >> will
> >> > > only
> >> > > >>> generate insert events.
> >> > > >>>
> >> > > >>> @Yi
> >> > > >>> 1. You are correct about Calcite.There is no stream-to-relation
> >> > > >> conversion
> >> > > >>> happening. But as I understand we don’t need Calcite
to support
> >> this.
> >> > > We
> >> > > >>> can add it to our query planner as a rule or re-write.
What I am
> >> not
> >> > > sure
> >> > > >>> is whether to use a rule or a re-write.
> >> > > >>> 2. There is a rule in Calcite which extract the Window
out from
> >> the
> >> > > >>> Project. But I am not sure why that didn’t happen in
my test.
> This
> >> > rule
> >> > > >> is
> >> > > >>> added to the planner by default. I’ll ask about this
in Calcite
> >> > mailing
> >> > > >>> list.
> >> > > >>>
> >> > > >>> I think we can figure out a way to move the window to
the input
> >> > stream
> >> > > if
> >> > > >>> Calcite can move the window out from Project. I’ll
see how we
> can
> >> do
> >> > > >> this.
> >> > > >>>
> >> > > >>> Also I’ll go ahead and implement default windows. We
can change
> it
> >> > > later
> >> > > >> if
> >> > > >>> Julian or someone from Calcite comes up with a better
> suggestion.
> >> > > >>>
> >> > > >>> Thanks
> >> > > >>> Milinda
> >> > > >>>
> >> > > >>> On Sun, Mar 1, 2015 at 8:23 PM, Yi Pan <nickpan47@gmail.com>
> >> wrote:
> >> > > >>>
> >> > > >>>> Hi, Milinda,
> >> > > >>>>
> >> > > >>>> Sorry to reply late on this. Here are some of my
comments:
> >> > > >>>> 1) In Calcite's model, it seems that there is no
> >> stream-to-relation
> >> > > >>>> conversion step. In the first example where the window
> >> specification
> >> > > is
> >> > > >>>> missing, I like your solution to add the default
> LogicalNowWindow
> >> > > >>> operator
> >> > > >>>> s.t. it makes the physical operator matches the query
plan.
> >> However,
> >> > > if
> >> > > >>>> Calcite community does not agree to add the default
> >> > LogicalNowWindow,
> >> > > >> it
> >> > > >>>> would be fine for us if we always insert a default
"now" window
> >> on a
> >> > > >>> stream
> >> > > >>>> when we generate the Samza configuration.
> >> > > >>>> 2) I am more concerned on the other cases, where
window
> operator
> >> is
> >> > > >> used
> >> > > >>> in
> >> > > >>>> aggregation and join. In your example of windowed
aggregation
> in
> >> > > >> Calcite,
> >> > > >>>> window spec seems to be a decoration to the LogicalProject
> >> operator,
> >> > > >>>> instead of defining a data source to the LogicalProject
> >> operator. In
> >> > > >> the
> >> > > >>>> CQL model we followed, the window operator is considered
as a
> >> query
> >> > > >>>> primitive that generate a data source for other relation
> >> operators
> >> > to
> >> > > >>>> consume. How exactly is window operator used in Calcite
> planner?
> >> > Isn't
> >> > > >> it
> >> > > >>>> much clear if the following is used?
> >> > > >>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2), 0),
> >> > CAST($SUM0($2)):INTEGER,
> >> > > >>>> null)])
> >> > > >>>>   LogicalWindow(ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING)
> >> > > >>>>
> >> > > >>>> On Thu, Feb 26, 2015 at 12:18 PM, Milinda Pathirage
<
> >> > > >>> mpathira@umail.iu.edu
> >> > > >>>>>
> >> > > >>>> wrote:
> >> > > >>>>
> >> > > >>>>> Hi devs,
> >> > > >>>>>
> >> > > >>>>> I ask about $subject in calcite-dev. You can
find the archived
> >> > > >>> discussion
> >> > > >>>>> at [1]. I think your thoughts are also valuable
in this
> >> discussion
> >> > in
> >> > > >>>>> calcite list.
> >> > > >>>>>
> >> > > >>>>> I discovered the requirement for a default window
operator
> when
> >> I
> >> > > >> tried
> >> > > >>>> to
> >> > > >>>>> integrate streamscan (I was using tablescan prevously)
into
> the
> >> > > >>> physical
> >> > > >>>>> plan generation logic. Because of the way we
have written the
> >> > > >>>>> OperatorRouter API, we always need a stream-to-relation
> >> operator at
> >> > > >> the
> >> > > >>>>> input. But Calcite generates a query plan like
following:
> >> > > >>>>>
> >> > > >>>>> LogicalDelta
> >> > > >>>>>  LogicalProject(id=[$0], product=[$1], quantity=[$2])
> >> > > >>>>>    LogicalFilter(condition=[>($2, 5)])
> >> > > >>>>>
> >> > > >>>>>      StreamScan(table=[[KAFKA, ORDERS]], fields=[[0,
1, 2]])
> >> > > >>>>>
> >> > > >>>>> If we consider LogicalFilter as a relation operator,
we need
> >> > > >> something
> >> > > >>> to
> >> > > >>>>> convert input stream to a relation before sending
the tuples
> >> > > >>> downstream.
> >> > > >>>>> In addition to this, there is a optimization
where we consider
> >> > filter
> >> > > >>>>> operator as a tuple operator and have it between
StreamScan
> and
> >> > > >>>>> stream-to-relation operator as a way of reducing
the amount of
> >> > > >> messages
> >> > > >>>>> going downstream.
> >> > > >>>>>
> >> > > >>>>> Other scenario is windowed aggregates. Currently
window spec
> is
> >> > > >>> attached
> >> > > >>>> to
> >> > > >>>>> the LogicalProject in query plan like following:
> >> > > >>>>>
> >> > > >>>>> LogicalProject(EXPR$0=[CASE(>(COUNT($2) OVER
(ROWS BETWEEN 2
> >> > > >> PRECEDING
> >> > > >>>> AND
> >> > > >>>>> 2 FOLLOWING), 0), CAST($SUM0($2) OVER (ROWS BETWEEN
2
> PRECEDING
> >> > AND 2
> >> > > >>>>> FOLLOWING)):INTEGER, null)])
> >> > > >>>>>
> >> > > >>>>> I wanted to know from them whether it is possible
to move
> window
> >> > > >>>> operation
> >> > > >>>>> just after the stream scan, so that it is compatible
with our
> >> > > >> operator
> >> > > >>>>> layer.
> >> > > >>>>> May be there are better or easier ways to do
this. So your
> >> comments
> >> > > >> are
> >> > > >>>>> always welcome.
> >> > > >>>>>
> >> > > >>>>> Thanks
> >> > > >>>>> Milinda
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>> [1]
> >> > > >>>>>
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> http://mail-archives.apache.org/mod_mbox/incubator-calcite-dev/201502.mbox/browser
> >> > > >>>>>
> >> > > >>>>> --
> >> > > >>>>> Milinda Pathirage
> >> > > >>>>>
> >> > > >>>>> PhD Student | Research Assistant
> >> > > >>>>> School of Informatics and Computing | Data to
Insight Center
> >> > > >>>>> Indiana University
> >> > > >>>>>
> >> > > >>>>> twitter: milindalakmal
> >> > > >>>>> skype: milinda.pathirage
> >> > > >>>>> blog: http://milinda.pathirage.org
> >> > > >>>>>
> >> > > >>>>
> >> > > >>>
> >> > > >>>
> >> > > >>>
> >> > > >>> --
> >> > > >>> Milinda Pathirage
> >> > > >>>
> >> > > >>> PhD Student | Research Assistant
> >> > > >>> School of Informatics and Computing | Data to Insight
Center
> >> > > >>> Indiana University
> >> > > >>>
> >> > > >>> twitter: milindalakmal
> >> > > >>> skype: milinda.pathirage
> >> > > >>> blog: http://milinda.pathirage.org
> >> > > >>>
> >> > > >>
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Milinda Pathirage
> >> > > >
> >> > > > PhD Student | Research Assistant
> >> > > > School of Informatics and Computing | Data to Insight Center
> >> > > > Indiana University
> >> > > >
> >> > > > twitter: milindalakmal
> >> > > > skype: milinda.pathirage
> >> > > > blog: http://milinda.pathirage.org
> >> > >
> >> > >
> >> >
> >> >
> >> > --
> >> > Milinda Pathirage
> >> >
> >> > PhD Student | Research Assistant
> >> > School of Informatics and Computing | Data to Insight Center
> >> > Indiana University
> >> >
> >> > twitter: milindalakmal
> >> > skype: milinda.pathirage
> >> > blog: http://milinda.pathirage.org
> >> >
> >>
> >
> >
> >
> > --
> > Milinda Pathirage
> >
> > PhD Student | Research Assistant
> > School of Informatics and Computing | Data to Insight Center
> > Indiana University
> >
> > twitter: milindalakmal
> > skype: milinda.pathirage
> > blog: http://milinda.pathirage.org
> >
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message