samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: Streaming SQL - object models, ASTs and algebras
Date Sun, 01 Feb 2015 14:02:12 GMT
Great summary Chris.

I agree that we should really really strive not to have another mandatory
operational dependency for out of the box usage.

To clarify what I was saying, I am less concerned about the partition count
which is readily discoverable and basically just part of the "ddl" for the
topic regardless of how you create it, I was actually concerned about the
partition key. I.e.
  create stream partitioned_page_views as select * from page_views
produces different output depending on the partition key if you consider
the ordering of the stream relevant (which I do).

So I think the discussion item there is really about the time vs tuple
driven production which amounts to CQL vs StreamBase semantics. I don't
think I understand all the tradeoffs, but some of the corner cases of the
time based production were quite odd (see that paper I linked from
SAMZA-390) when you have duplicate timestamps (and when you have a million
messages per second, millisecond resolution on time means you will have
hundreds of thousands of messages with each timestamp).

Personally I really think supporting multiple schemas that disagree on the
fields is going to take you outside the realm of sql and be quite difficult
to use. I think for databases they should publish each table to it's own
topic and include a txn/scn type thing to allow downstream users who want
to merge into a total order to do so.

-Jay

On Sat, Jan 31, 2015 at 12:50 PM, Chris Riccomini <criccomini@apache.org>
wrote:

> Hey all,
>
> Trying to respond in order.
>
> > If one wants finer-grained control over what the output topic would be
> like, wouldn't it make sense to use a CREATE TABLE AS <query> statement?
>
> Yes. CREATE TABLE AS could be used to define all three things that I
> mentioned (partition key, partition count, and schema). I think the
> discussion is about whether we *should* use it, and if so, for which ones.
> See Jay's comments about wanting to explicitly define partitions in the
> language. I agree with you, though. On the producer-side, between SEELCT
> ... clasuses, and CREATE TABLE AS, we have enough to define outgoing
> schemas.
>
> The point of my comments was just to explore what we could do with 1)
> neither a schema registry 2) nor any SQL-level stream-specific clauses.
> Once we've established that spectrum, I think we can argue where along it
> we'd like to be.
>
> > How would you know that even if you did look at the records? I'm pretty
> sure that's impossible with avro.... Isn't it mandatory to have some sort
> of metadata repository for Samza SQL to work?
>
> It really depends on the serde. JSON, BSON, and MessagePack can all be
> decoded (and the schema inferred) just by reading the message. The three
> you mention (Avro, Thrift, Protobuf) might require a schema registry,
> though all three could be encoded with per-message schemas as well. It
> depends on the serde. I'm not saying that this is a good idea, but there
> are certainly a lot of ways to make SQL work (for some definition of work)
> without a schema registry.
>
> > Given the solutions that already exist though, would it really be that
> much effort to leverage one of them?
>
> I might be overly-pessimistic, but adding a new piece of software to the
> stack almost always has a non-trivial amount of overhead to
> it. Operational, mental-model, code, etc. Samza is already so heavy-weight
> as it is (ZK, YARN, Kafka). I agree with your statement that half the power
> of SQL comes from the DDL. What I'm trying to do is figure out what the MVP
> is. It would totally be useful to have a schema registry that defines
> partition keys, partition counts, schemas, etc. What I was suggesting is
> that we might be able to forego it initially, and try and implement the SQL
> in a future-proof way, such that adding it later isn't a big problem.
>
> Basically, I want to limit scope, so we could have something to show for
> ourselves fast, rather than trying to do everything up front.
>
> > I think the schema repository acts as the stand in for the database
> catalog or hcat. People who don't have that will have to give their schema
> with the query. I think that can be some kind of plugin to provide the
> schema so it is automatically inferred for the avro people and manually
> provided for others.
>
> I agree. If you have a schema repo, we can use it. If you don't, you can
> either 1) define the schema in your query or 2) you'll fail at runtime.
> That seems OK to me. The json model that Julian and Milinda mention also
> seems like a viable solution for those without a schema registry, though
> it's a bit of a pain. Basically, to get the best experience, you need a
> schema repo.
>
> But this still leaves us with the partition key/partition count discussion.
> It sounds like there's some consensus on the partition key being defined in
> SQL (CREATE TABLE foo PARTITION BY member_id AS SELECT ... or something).
>
> Is it fair to say that the last remaining point of my original comment is
> on partition count being defined in the SQL or not? Personally, I tend to
> agree with Jay's comments, but I'm not picky about whether the partition
> count is defined directly in the query, or simply as a hint:
>
>   CREATE TABLE foo PARTITION BY member_id WITH 200 PARTITIONS SELECT ...
>
> Or:
>
>   -- partition_count=200
>   CREATE TABLE foo PARTITION BY member_id SELECT ...
>
> I'm mostly just concerned that we have no way to define this now, so it
> seems mandatory that we have *some* way of defining it (unless we expect
> users to manually run a create-topic command before executing every query).
>
> I guess I have two remaining questions, then:
>
> 1. What should we do about the partition count definition? (Jay's
> discussion)
> 2. We had been building the operator layer (Yi's stuff) to support
> heterogeneous schema types in a single stream. The schema discussion we've
> been having seems to suggest that we only want to support one schema per
> stream. Since Yi's stuff is more general, it could be made to work with
> just one schema per-stream, but I just wanted to call it out as a conscious
> decision. This seems as though it would prevent us from being able to
> select over streams that want a strong time-order amongst messages of
> different types (e.g. a change log from a DB shard that had more than on
> table in it).
>
> Cheers,
> Chris
>
> On Fri, Jan 30, 2015 at 3:50 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Chris,
> >
> > I think the schema repository acts as the stand in for the database
> catalog
> > or hcat. People who don't have that will have to give their schema with
> the
> > query. I think that can be some kind of plugin to provide the schema so
> it
> > is automatically inferred for the avro people and manually provided for
> > others.
> >
> > -Jay
> >
> > On Fri, Jan 30, 2015 at 12:06 PM, Chris Riccomini <criccomini@apache.org
> >
> > wrote:
> >
> > > Hey all,
> > >
> > > I have a few more comments on the metadata issue that I brought up. The
> > > three things that we lack right now are:
> > >
> > > 1. Partition count.
> > > 2. Partition key.
> > > 3. Stream (or message) schema.
> > >
> > > These have to be evaluated both on the ingest (first consumers) and
> > egress
> > > (final producers) of a query.
> > >
> > > On the consumer-side, (1) can be determined simply by asking Kafka. On
> > the
> > > producer-side, we have no way to determine how many partitions an
> output
> > > stream should be right now, short of having the user specify it in some
> > > way.
> > >
> > > On the producer-side (2) must defined in SQL, or via a hint, for final
> > > output to a stream. Intermediate streams can be keyed based off of the
> > > consumers within the session. On the consumer-side, the consumer can
> > always
> > > re-partition an input stream based on the partition key that it needs,
> so
> > > it doesn't need to know a priori what the key was.
> > >
> > > Message schemas (3) can be evaluated on the consumer-side at runtime by
> > > poking at the incoming messages, which should be self-describing.
> Output
> > > message schemas can be derived based off of the transformations of
> > incoming
> > > messages on the producer-side.
> > >
> > > If we're OK with these strategies, then it seems we only really need to
> > > worry about:
> > >
> > > 1. Producer-side partition count.
> > > 2. Producer-side partition key.
> > >
> > > One other annoying property of (3) is that compile-time validation of a
> > > query won't be able to check that a field for a given stream actually
> > > exists--it'll fail at runtime. This could perhaps be accelerated by
> > > sampling some messages before deploying the job, to check that the
> > messages
> > > have the appropriate fields, but it's still a runtime check.
> > >
> > > Cheers,
> > > Chris
> > >
> > > On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <
> criccomini@apache.org
> > >
> > > wrote:
> > >
> > > > Hey all,
> > > >
> > > > Just catching up on this thread. The Calcite + Samza approach seems
> > > pretty
> > > > compelling to me. I think most of what Julian is arguing for makes
> > sense.
> > > > My main concern is with practicalities.
> > > >
> > > > One specific case of this is the discussion about the partitioning
> > model.
> > > > In an ideal world, I agree, developers wouldn't need to define
> > partitions
> > > > in SQL. Practically speaking, though, Samza (Kafka, really) currently
> > > > doesn't have a metadata repository. Without a metadata repository, we
> > > have
> > > > no way of knowing 1) which key a topic is partitioned by, and 2) what
> > the
> > > > schema of the topic is (without looking at records). We know this
> > > *within*
> > > > a query (or session), but not *between* queries (or sessions) from
> > > disjoint
> > > > users.
> > > >
> > > > One could argue that we should spend time defining such a metadata
> > > > repository, which is a reasonable argument. But that's also a fairly
> > > large
> > > > effort. I wonder if we might be able to cut some corners in a
> clean-ish
> > > > forwards-compatible way, so that developers don't have to wait for us
> > to
> > > > fully implement (or integrate with) something like a Hive metadata
> > store.
> > > >
> > > > In person, one thing you mentioned, Julian, was using hints, rather
> > than
> > > > stuff baked into the syntax. If that's stomach-able, we could support
> > > > partitioning through hints, until we have a full blown metadata
> store.
> > > >
> > > > Thoughts?
> > > >
> > > > Cheers,
> > > > Chris
> > > >
> > > > On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <julian@hydromatic.net>
> > > > wrote:
> > > >
> > > >>
> > > >> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpan47@gmail.com>
wrote:
> > > >> >
> > > >> > I am wondering if I can get an average that's per 30 min window
> > > >> averages?
> > > >> > I.e. the following is the input events in a stream:
> > > >> >  {10:01, ORCL, 10, 10}
> > > >> >  {10:02, MSFT, 30, 30}
> > > >> >  {10:03, ORCL, 100, 110}
> > > >> >  {10:17, MSFT, 45, 75}
> > > >> >  {10:59, ORCL, 20, 130}
> > > >> >  {11:02, ORCL, 50, 50}
> > > >> > Can I get the non-overlapping window average from 10:00-10:29,
and
> > > >> > 10:30-10:59, ... ? Could you give an example how to define that
> > window
> > > >> > operation in your model? Note that in this use case, although
I
> may
> > > >> have 1
> > > >> > trading record per minute from the stream, I only generate 2
> average
> > > >> > records from 10:00-11:00.
> > > >>
> > > >> That takes a bit of messy date arithmetic, made even more messy by
> the
> > > >> fact that you can't regard a SQL timestamp as a "milliseconds" value
> > as
> > > one
> > > >> would in C or Java, nor can you divide it. Hence the trick of
> > > subtracting a
> > > >> constant timestamp, which yields an interval value, and then doing
> > > integer
> > > >> division on that interval to find the number of 30-minute periods
> > since
> > > the
> > > >> epoch.
> > > >>
> > > >> select stream rowtime, ticker, amount,
> > > >>   sum(amount) over (
> > > >>     order by rowtime
> > > >>     partition by ticker,
> > > >>       (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30)
> > > >> from StockTrades;
> > > >>
> > > >> If I were doing this kind of calculation often I'd define a UDF, or
> > even
> > > >> that user-defined window SPI I mentioned earlier.
> > > >>
> > > >> > {quote}
> > > >> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department
> > > >> VARCHAR(20))
> > > >> >  PARTITION BY HASHCODE (department);
> > > >> > {quote}
> > > >> > That's good! At least it resolves my question on: "which field
is
> > the
> > > >> > partition key". However, I still have the question on the number
> of
> > > >> > partitions. As I stated that when the system currently does not
> > have a
> > > >> > "auto-scaling" feature, the number of partitions for a stream
has
> to
> > > be
> > > >> > explicitly specified. Where do you suggest to put this information
> > in
> > > >> w/o
> > > >> > breaking SQL syntax?
> > > >>
> > > >> I imagine you could start the system on Tuesday with 10 partitions
> per
> > > >> stream and restart it on Wednesday with 8 or 12? You wouldn't want
> to
> > > >> change the SQL, because that's in the application. But you could
> > change
> > > the
> > > >> definition of the stream, either the DDL or by changing some other
> > > system
> > > >> configuration. Then partitioning function (applied by the system to
> > > route
> > > >> the record) could, say, take the value of p modulo of the current
> > > number of
> > > >> streams.
> > > >>
> > > >> Julian
> > > >>
> > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message