samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Milinda Pathirage" <mili...@apache.org>
Subject Re: Review Request 34500: SAMZA-552 Operator API change: builder and simplified operator classes
Date Wed, 05 Aug 2015 17:34:30 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review94271
-----------------------------------------------------------


I went through old discussions and also went through Calcite's RelBuilder (https://github.com/milinda/incubator-calcite/blob/master/core/src/main/java/org/apache/calcite/tools/RelBuilder.java)
to look at our TopologyBuilder from SQL query plan perspective. Below are my thoughts.

* I agree with Guozhang that we should first focus on simple use cases and I think we should
not try to integrate support for building complex DAGs which contains multiple complex queries
via this builder API.
* IMHO, TopologyBuilder is closer to query execution than to the query. And if we need people
to compose SQL queries through a Java API, its better to have an API similar to jOOQ (http://www.jooq.org)
for streaming SQL.
* AFAIK, **split** mentioned in one of Yi's comment doesn't occurs in SQL query plans because
SQL operators always has one output (@Yi please correct me if I am wrong).
* IMHO, supporting something similar to views through the builder API may be useful. We can
allow to refer the result from builder (may be not through *build* method but via method like
*buildView*) method as inputs to the other queries to facilitate this .

So I'm proposing builder similar to following based on Calcite's RelBuilder API:

```java
TopologyBuilder builder = TopologyBuilder.create(..);

OperatorRouter router = builder.scan("stream1")
                          .window(10, 2)
                          .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...)
                          .scan("stream2")
                          .window(10, 2)
                          .aggregate(builder.groupKey(...), builder.aggregateCall(...), ...)
                          .join(JoinType.INNER, builder.condition(...))
                          .scan("stream2")
                          .project(..)
                          .window(10, 2)
                          .join(joinType, condition)
                          .partition(partionKey, number)
                          .modify(Operation.INSERT, ..)
```

* In above mentioned API, *beginStream* is renamed to *scan* to take to API closer to physical
plan.
* *scan* in the middle means a start of a new input or input sub-query
* *join* takes last two sub-trees (sub-queries) as inputs
* *modify* is used to insert/update tuples to streams or tables
* Builder will provide utility methods to create conditions, function calls, aggregates and
```GROUP BY``` clauses.
* Above assumes that there is no multi-output operators.
* Reusable sub-queries are not present in the above example, I'll think about it and introduce
a mechanism to re-use sub-queries (Possibly introducing the view concept)

Please feel free to comment on this.

- Milinda Pathirage


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda Pathirage,
Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION

>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION

> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message