samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xinyu Liu <xinyuliu...@gmail.com>
Subject Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs
Date Tue, 01 Nov 2016 17:48:20 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review154420
-----------------------------------------------------------


Ship it!




THe RB looks great! I posted a couple of comments for further discussion, not to block the
progress.


samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java (line 45)
<https://reviews.apache.org/r/47835/#comment223958>

    This might be discussed before: why we use systemstreampartition instead of systemstream
as the source? I think having an abstraction layer of stream will be much more convenient
to the users to user this api. For example, instead of 
    
    sources.foreach(source => source.map(...).filter()...)// per ssp handling
    
    The api can be:
    SystemMessageStream source = souces.get("system x", "stream y");
    source.map(...).filter()...
    
    This might not be a big deal if we only got one input stream. Things could get messy if
we have more. For example, if we want to join, it will be much convenient to specify:
    
    SystemMessageStream source1 = souces.get("system x", "stream 1");
    SystemMessageStream source2 = souces.get("system x", "stream 2");
    source1.join(source2, joinFunc)
    
    Otherwise the user needs to join each partion and repeat the same logic for each partition.
I couldn't think a case that a ssp level api will be useful since most the current stream
processors operates on streams, not partitions.



samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java (line 33)
<https://reviews.apache.org/r/47835/#comment223973>

    The defination of early and late triggers seems to be a little different from what google
dataflow has today. In Tyler's blog (stream 102), early and late are relevant if there is
a watermark, which indicates the completion of input stream regarding to an event time. Early
triggers happen before the watermark to solve the watermark being too slow problem and late
triggers happen after the watermark to solve the watermark being too fast problem (late arrivals).
For us, since there is no notion of watermark so far, it seems the semantics of early/late
triggers are not well defined. I cannot tell when the window is closed (early or late tigger?)
or can late trigger happens before early trigger? Seems the early and late meanings are loosely
defined without a reference. Happy to talk about this more.


- Xinyu Liu


On Oct. 4, 2016, 12:45 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Oct. 4, 2016, 12:45 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina
Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to SAMZA-914:
https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 5d066c5867e9df9e94e60bde825dedf10703b399

>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowFn.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/internal/WindowOutput.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/operators/api/TestTriggerBuilder.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/TestWindows.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/operators/api/data/TestIncomingSystemMessage.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/data/TestLongOffset.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestTrigger.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorAdaptorTask.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java PRE-CREATION

>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION

>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION

>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message