calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <>
Subject Re: About Stream SQL
Date Sat, 06 Feb 2016 01:10:15 GMT
Yes, watermarks, absolutely. The "to do" list has "punctuation", which
is the same thing. (Actually, I prefer to call it "rowtime bound"
because it is feels more like a dynamic constraint than a piece of
data, but the literature[1] calls them punctuation.)

If a stream has punctuation enabled then it may not be sorted but is
nevertheless sortable. So I'm doing that theoreticians often do --
transposing the problem into a simpler domain.

By the way, an out-of-order stream is also sortable if it is t-sorted
(i.e. every record is guaranteed to arrive within t seconds of its
timestamp) or k-sorted (i.e. every record is guaranteed to be no more
than k positions out of order). So queries on these streams can be
planned similarly to queries on streams with punctuation.

And, we often want to aggregate over attributes that are not
time-based but are nevertheless monotonic. "The number of times a team
has shifted between winning-state and losing-state" is one such
monotonic attribute. The system needs to figure out for itself that it
is safe to aggregate over such an attribute; punctuation does not add
any extra information.

I have in mind some metadata (cost metrics) for the planner:

1. Is this stream sorted on a given attribute (or attributes)? (RelMdCollation)
2. Is it possible to sort the stream on a given attribute? (For finite
relations, the answer is always "yes"; for streams it depends on the
existence of punctuation, or linkage between the attributes and the
sort key.)
3. What latency do we need to introduce in order to perform that sort?
4. What is the cost (in CPU, memory etc.) of performing that sort?

We already have (1), in BuiltInMetadata.Collation [2]. For (2), the
answer is always "true" for finite relations. But we'll need to
implement 2, 3 and 4 for streams.




On Fri, Feb 5, 2016 at 3:46 PM, Fabian Hueske <> wrote:
> Hi,
> first of all, thanks for starting this discussion. As Stephan said before,
> the Flink community is working towards support for SQL on streams. IMO, it
> would be very nice if the different efforts for SQL on streams could
> converge towards a core set of semantics and syntax.
> I read the proposal on the Calcite website [1] and the mail thread about
> the TUMBLE and HOP functions [2]. IMO, the discussed windowing semantics
> are well defined and could serve as a common basis. The improved syntax for
> tumbling and hopping windows is very nice (would be good to update the
> Calcite website with this). More concise definitions for sliding and
> cascading windows would be great, too.
> One concern that I have is the requirement of a monotonic attribute. Such
> an attribute might be present in use cases where events are generated at a
> central source, such as transactions of a database system. However, events
> that originate from distributed sources such as sensors, log events of
> cluster machines, etc, do not arrive in timestamp order at a stream
> processor. In fact, the majority of use cases from Flink users has to deal
> with out-of-order events. Flink (and Google Cloud Dataflow / Apache Beam
> incubating) use the notion of event time and watermarks [3][4] to handle
> streams with out-of-order events. While watermarks can help a lot to
> improve the consistency of results, it is not possible to guarantee that no
> late events arrive at an operator. There are different strategies to deal
> with late arriving events (dropping, recomputation of aggregates, ...), but
> usually they have a negative effect on the result correctness and/or a
> downstream systems needs to deal with them, e.g., updating previous results.
> Relaxing the requirement for monotonic attributes to attributes with
> watermarks (watermarks are provided by the data source), would have no
> impact on the corrects of queries over tables with an ordered attribute
> (since there would be no late arriving events). It would also not change
> the semantics of queries over streams with a monotonic attribute, but would
> also allow for streams with (slightly) out-of-order arriving events at the
> cost that query results may become inconsistent in case of late arriving
> results.
> I agree that defining use cases and queries over some sample data would be
> a good way to start.
> Best, Fabian
> [1]
> [2]
> <javascript:void(0)>
> [3]
> [4]
> 2016-02-05 11:29 GMT+01:00 Julian Hyde <>:
>> Stephan,
>> I agree that we have a long way to go to get to a standard, but I
>> think that means that we should start as soon as possible. By which I
>> mean, rather than going ahead and creating Flink-specific extensions,
>> let's have discussions about SQL extensions in a broad forum, pulling
>> in members of other projects. Streaming/CEP is not a new field, so the
>> use cases are well known.
>> It's true that Calcite's streaming SQL doesn't go far beyond standard
>> SQL. I don't want to diverge too far; and besides, one can accomplish
>> a lot with each feature added to the language. What is needed are a
>> very few well chosen deep features; then we can add liberal syntactic
>> sugar on top of these to to make common use cases more concise.
>> For the record, HOP and TUMBLE described in [1] are not OLAP sliding
>> windows; they go in the GROUP BY clause. But unlike GROUP BY, they
>> allow each row to contribute to more than one sub-total. This is novel
>> in SQL (only GROUPING SETS allows this, and in a limited form) and
>> could be the basis for user-defined windows.
>> Also, our sliding windows can be defined by row count as well as by
>> time. For example, suppose you want to calculate the length of a
>> sport's team's streak of consecutive wins or losses. You can partition
>> by N, where N is the number of state changes of the win-loss variable,
>> and so each switch from win-to-lose or lose-to-win starts a new
>> window.
>> As you define your extensions to SQL, I strongly suggest that you make
>> explicit, as columns, any data values that drive behavior. These might
>> include event times, arrival times, processing times, flush
>> directives, and any required orderings. If information is not
>> explicit, we can not reason about the query as algebra, and the
>> planner cannot more radical plans, such as sorting (within a window)
>> or changing how the rows are partitioned across parallel processors. A
>> litmus test is whether a database can apply the same SQL to the data
>> archived from the stream and to achieve the same results.
>> I saw that Fabian blogged recently[2] about stream windows in Flink.
>> Could we start the process by trying to convert some use cases
>> (expressed in English, and with sample input and output data) into
>> SQL? Then we can iterate to make the SQL concise, understandable, and
>> well-defined.
>> Julian
>> [1]
>> [2]
>> On Thu, Feb 4, 2016 at 1:35 AM, Stephan Ewen <> wrote:
>> > Hi!
>> >
>> > True, the Flink community is looking into stream SQL, and is currently
>> > building on top of Calcite. This is all going well, but we probably need
>> > some custom syntax around windowing.
>> >
>> > For Stream SQL Windowing, what I have seen so far in Calcite (correct me
>> if
>> > I am wrong there), is pretty much a variant of the OLAP sliding window
>> > aggregates.
>> >
>> >   - Windows are in those basically calculated by rounding down/up
>> > timestamps, thus bucketizing the events. That works for many cases, but
>> is
>> > quite tricky syntax.
>> >
>> >   - Flink supports various notions of time for windowing (processing
>> time,
>> > ingestion time, event time), as well as triggers. To be able to extend
>> the
>> > window specification with such additional parameters is pretty crucial
>> and
>> > would probably go well with a dedicated window clause.
>> >
>> >   - Flink also has unaligned windows (sessions, timeouts, ...) which are
>> > very hard to map to grouping and window aggregations across ordered
>> groups.
>> >
>> >
>> > Converging to a core standard around stream SQL is very desirable, I
>> > completely agree.
>> > For the basic constructs, I think this is quite feasible and Calcite has
>> > some good suggestions there.
>> >
>> > In the advanced constructs, the systems differ quite heavily currently,
>> so
>> > converging there may be harder there. Also, we are just learning what
>> > semantics people need concerning windowing/event time/etc. May almost be
>> a
>> > tad bit too early to try and define a standard there...
>> >
>> >
>> > Greetings,
>> > Stephan
>> >
>> >
>> > On Thu, Feb 4, 2016 at 9:35 AM, Julian Hyde <> wrote:
>> >
>> >> I totally agree with you. (Sorry for the delayed response; this week has
>> >> been very busy.)
>> >>
>> >> There is a tendency of vendors (and projects) to think that their
>> >> technology is unique, and superior to everyone else’s, and want to
>> showcase
>> >> it in their dialect of SQL. That is natural, and it’s OK, since it makes
>> >> them strive to make their technology better.
>> >>
>> >> However, they have to remember that the end users don’t want something
>> >> unique, they want something that solves their problem. They would like
>> >> something that is standards compliant so that it is easy to learn, easy
>> to
>> >> hire developers for, and — if the worst comes to the worst — easy to
>> >> migrate to a compatible competing technology.
>> >>
>> >> I know the developers at Storm and Flink (and Samza too) and they
>> >> understand the importance of collaborating on a standard.
>> >>
>> >> I have been trying to play a dual role: supplying the parser and planner
>> >> for streaming SQL, and also to facilitate the creation of a standard
>> >> language and semantics of streaming SQL. For the latter, see Streaming
>> page
>> >> on Calcite’s web site[1]. On that page, I intend to illustrate all of
>> the
>> >> main patterns of streaming queries, give them names (e.g. “Tumbling
>> >> windows”), and show how those translate into streaming SQL.
>> >>
>> >> Also, it would be useful to create a reference implementation of
>> streaming
>> >> SQL in Calcite so that you can validate and run queries. The
>> performance,
>> >> scalability and reliability will not be the same as if you ran Storm,
>> Flink
>> >> or Samza, but at least you can see what the semantics should be.
>> >>
>> >> I believe that most, if not all, of the examples that the projects are
>> >> coming up with can be translated into SQL. It will be challenging,
>> because
>> >> we want to preserve the semantics of SQL, allow streaming SQL to
>> >> interoperate with traditional relations, and also retain the general
>> look
>> >> and feel of SQL. (For example, I fought quite hard[2] recently for the
>> >> principle that GROUP BY defines a partition (in the set-theory sense)[3]
>> >> and therefore could not be used to represent a tumbling window, until I
>> >> remembered that GROUPING SETS already allows each input row to appear in
>> >> more than one output sub-total.)
>> >>
>> >> What can you, the users, do? Get involved in the discussion about what
>> you
>> >> want in the language. Encourage the projects to bring their proposed SQL
>> >> features into this forum for discussion, and add to the list of patterns
>> >> and examples on the Streaming page. As in any standards process, the
>> users
>> >> help to keep the vendors focused.
>> >>
>> >> I’ll be talking about streaming SQL, planning, and standardization at
>> the
>> >> Samza meetup in 2 weeks[4], so if any of you are in the Bay Area, please
>> >> stop by.
>> >>
>> >> Julian
>> >>
>> >> [1]
>> >>
>> >> [2]
>> >>
>> >>
>> >> [3]
>> >>
>> >> [4]
>> >>
>> >> > On Jan 29, 2016, at 10:29 PM, Wanglan (Lan) <>
>> >> wrote:
>> >> >
>> >> > Hi to all,
>> >> >
>> >> > I am from Huawei and am focusing on data stream processing.
>> >> > Recently I noticed that both in Storm community and Flink community
>> >> there are endeavors to user Calcite as SQL parser to enable Storm/Flink
>> to
>> >> support SQL. They both want to supplemented or clarify Streaming SQL of
>> >> calcite, especially the definition of windows.
>> >> > I am considering if both communities working on designing Stream SQL
>> >> syntax separately, there would come out two different syntaxes which
>> >> represent the same use case.
>> >> > Therefore, I am wondering if it is possible to unify such work, i.e.
>> >> design and compliment the calcite Streaming SQL to enrich window
>> definition
>> >> so that both storm and flink can reuse the calcite(Streaming SQL) as
>> their
>> >> SQL parser for streaming cases with little change.
>> >> > What do you think about this idea?
>> >> >
>> >>
>> >>

View raw message