calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <>
Subject Re: Handling of system attributes in a row
Date Thu, 16 Feb 2017 15:25:03 GMT
Hi Julian,

thanks a lot for your thoughts and advice!

I'd like to give a bit more background on why we were thinking about system
attributes and why we did not want to allow changing the time attribute.

In Flink, the event-time attribute is handled as meta data of each record.
Users do not have access to the timestamp other than assigning a it once to
a record.
This is done with a so-called TimestampExtractor which extracts an event
timestamp from a record and assign it as meta data. The TimestampExtractor
does also injects watermarks into the stream which are aligned with the
assigned timestamps.
The assignment of timestamps usually happens in the stream source or
immediately behind the source.

There are three main reasons why we do not allow access to the timestamp
1. Changing the event timestamp (or using a different attribute as time
attribute) would also require to adapt the watermarks. Since it is not easy
to reason about the order of records in a distributed streaming
application, this is super easy to get wrong. The result will be either
lots of late data (watermarks to much ahead) or lots of state to handle
(watermark to much behind).
2. Some operators "create" new records such as windowed aggregations. These
new records need timestamps which are aligned with the watermarks of the
stream. Flink's operators take care of this.
3. We consider timestamps and watermarks as properties of a stream (hence
also the common practice to assign them during or close to stream

I think I understand your concerns about system(-managed) columns and the
system becoming non-relational.
Please correct me if I'm wrong but the main point is that if the system
does not give users control over the rowtime it has to automatically assign
timestamps to records which originate, for example, from a windowed
aggregation or a join.
In this case, the semantics of a query would no longer depend on the data
itself but on the way how the system assigns timestamps.
The only way to make the query well defined is give users control over the
time attribute which is used to define windowed aggregates or joins.

This is a very valid point that I didn't consider before.
On the other hand, changing a timestamp column does also mean we have to
generate new watermarks, which is a super tricky business as well.

I have to think a little bit about that.

Thank you,

2017-02-15 20:18 GMT+01:00 Julian Hyde <>:

> The “system columns” feature was working when the Calcite code base was
> used in a previous project several years ago, though it may have atrophied
> since.
> In that project, we actually had ROWTIME as a system column. It was
> partially successful, but raised some problems. Consider a streaming join.
> The source relations have ROWTIME columns, but the join would create an
> additional ROWTIME column.
> (1) Should the original ROWTIME columns just disappear? It wasn’t clear.
> (2) What if you wanted to assign your own ROWTIME from an expression. Is
> it sufficient to just write ‘expression AS ROWTIME’ in the SELECT clause?
> This a more profound operation than simply renaming a column. Being a
> system column implies certain data type, not null, and maybe ordering.
> Would you also have to apply an ORDER BY?
> (3) Having system columns in streams conflicts with the goal of using the
> same SQL for streaming and historic (regular SQL) queries.
> (4) We found ourselves relying on these columns under the covers in
> certain operators. The system had become non-relational.
> So, after that experience I concluded that ROWTIME should be just a
> regular column. Just a column that we know a lot about: it is a timestamp
> (usually; although we allow other data types); it is conventionally called
> ROWTIME but it doesn’t have to be; it is not null; it is usually a sort
> column (but it may not be - the stream might be k-sorted, e.g. within 10
> rows or 10 minutes of being sorted, or sorted within a particular
> warehouseId; or “sortable").
> The “sortable” concept is really powerful. Consider the Orders stream and
> the Orders table. The stream is infinite, so the only sortable columns are
> those that are already sorted or are within N rows or T seconds of being
> sorted, or have some kind of guarantee in terms of watermarks. The table is
> finite, so everything is sortable. If we want to do an operation such as
> “GROUP BY FLOOR(ROWTIME TO HOUR)” it is sufficient that ROWTIME is
> sortable. You do not need ROWTIME to be a system column.
> My advice is, rather than requiring “blessed” system columns, you have a
> convention for the names of event-time and processing-time columns, and
> make your operators consume and produce on those columns explicitly.
> Julian
> > On Feb 15, 2017, at 12:56 AM, Timo Walther <> wrote:
> >
> > Hi everyone,
> >
> > we (from Flink) are currently discussing how we can express
> time-semantics (event-time or processing-time) in a SQL query. The optimal
> solution would be to have two system attributes that are part of every
> table schema/every row data type. We could then access it like `SELECT *
> FROM MyTable ORDER BY rowtime`. However, it should not be part of the
> result in an expansion (`*`) and the user should not modify those
> attributes (no aliasing, read-only). I had a look into SqlValidator and
> there are several lines that contain things like `includeSystemVars` or
> `isSystemField` but nothing concrete. Am I right that this feature is not
> entirely implemented yet? Which parts would you touch/override to implement
> this feature?
> >
> > Thanks in advance.
> >
> > Regards,
> > Timo
> >
> >
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message