samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yi Pan \(Data Infrastructure\)" <yi...@linkedin.com>
Subject Re: Review Request 47835: SAMZA-914: Initial draft for Java programming APIs on operators supporting DAGs
Date Fri, 02 Sep 2016 08:22:07 GMT


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java,
line 147
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line147>
> >
> >     I think it would be useful to call out why SinkFunction is the only one that
needs a MessageCollector and TaskCoordinator. 
> >     
> >     I believe the reason for TaskCoordinator is because Sink is terminal, so that
is the only place you need to commit(). I'm not sure about the MessageCollector though. It
seems like it should be consistent with StreamOperator. Either both should have collectors
or neither... unless I'm missing something.

sink needs the MessageCollector since it is terminal and needs to send the messages to Kafka
topic. StreamOperator are generating in-memory outputs only, hence, no need for MessageCollector


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java,
line 152
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line152>
> >
> >     It's odd that the Java Function class is used for the 1 parameter case but we
define a completely independent interface for the multi-parameter case. 
> >     
> >     Maybe there should be a new @FunctionalInterface for this case.
> >     http://stackoverflow.com/questions/27872387/can-a-java-lambda-have-more-than-1-parameter

Thanks for the suggestion. Using @FunctionalInterface does make the SinkFunction more generic.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java,
line 385
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487016#file1487016line385>
> >
> >     Should this have generics so users can have specific types at the very beginning
or is that not possible until at least one user-defined op has processed and declared the
types?

The issue is that the IncomingMessageEnvelope is not typed. Hence, it is not possible to figure
out the type of the key and value in the incoming message in the framework before creating
the SystemMessageStream object and pass it to the user. The development of Data and Schema
classes are targeted to build an abstract data representation and access layer s.t. in the
process, serde is hidden from the user. More detailed discussion in SAMZA-848, SAMZA-842,
and SAMZA-429. Ideally, the SystemMessageStream should come w/ Message<K extends Data,
M extends Data>, via Data, we can access the fields in K and M w/o the need to know the
serde. However, w/ the current implementation, that approach also has drawbacks: a) instead
of having a indirect dependency on avro via serde implementation class, we are directly depending
on avro, which brings in dependency management issue w/ serde implementation; b) for each
serde, now we need to have an implementation of Data to wrap on top of it. I started
  to think that this may actually be better to be incorporated into serde class interface
APIs, instead of the current intermediate approach. For now, I am not going to use this class
and will always assume that incoming message is Message<Object, Object> and need user
to convert it to known deserialized class (as we do in user code today).


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java, line 28
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487017#file1487017line28>
> >
> >     Naming:
> >     Maybe it's just me and maybe I'm not seeing the big picture of use cases, but
I still don't find this class name intuitive. Scan is a verb that makes some sense if the
input is a table but I've never heard someone describe a "scan" of a stream. Further, to me,
scan is a "read" operation and it would be unintuitive for it to have side effects like extracting
keys and timestamps.   
> >     
> >     Also, it seems like this will only be used at the entry points of the DAG. If
so, it should be the antithesis of "sink" so I'd call it "source"
> >     Source.createWithExtractors(te, ke)
> >     Source.createWithKeyExtractor(ke)
> >     Source.createWithTimestampExtractor(te)
> >     
> >     Alternatively, if this could be used in the middle of the DAG, it could be 
> >     MessageStream.createWithExtractors(te, ke)
> >     MessageStream.createWithKeyExtractor(ke)
> >     MessageStream.createWithTimestampExtractor(te)
> >     
> >     Neither of the above are verbs so here are some other options:
> >     Preprocess
> >     Normalize
> >     Decorate
> >     Annotate
> >     Extract
> >     Enrich (or any of its synonyms)
> >     
> >     I know they may be less concise, but they all feel clearer to me. Normalize
and Enrich are my current favorites.

Chris had a similar comment and I have removed this Scan class completely from the API. Thanks
for point it out as well!


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java, lines
51-63
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487053#file1487053line51>
> >
> >     I know this isn't part of the current patch but this smells funky. All the @Overrides
are Unsupported..., and then there are a bunch of static equivalents. 
> >     
> >     Seems like the @Overrides could delegate to the static methods or they should
be removed, but as it stands, it seems messy.

Yeah, as I mentioned in the replies above, I would rather open another JIRA to address this
Data/AvroData and Schema implementation issue.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java,
line 39
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487055#file1487055line39>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Same as my response above.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java,
line 38
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487056#file1487056line38>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Good catch. That was missed when converting samza-sql-core to samza-operator module. I will
open another JIRA to fix it.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java,
line 28
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487057#file1487057line28>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Same here.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java,
line 28
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487058#file1487058line28>
> >
> >     Naming:
> >     Class is in the operators package but has Sql in the name.

Same here.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java,
line 33
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487061#file1487061line33>
> >
> >     Since this extends Message, I expected to see some @Override annotations, unless
Message is an empty abstract class.

Message actually is an interface class.


> On Aug. 31, 2016, 2:18 a.m., Jake Maes wrote:
> > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java,
line 40
> > <https://reviews.apache.org/r/47835/diff/13/?file=1487078#file1487078line40>
> >
> >     Naming:
> >     Since this has been moved out of sql-core, the class should probably also be
renamed to just "AvroSerdeTest"

Sure. Tracking separately.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review147209
-----------------------------------------------------------


On Aug. 26, 2016, 8:43 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2016, 8:43 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina
Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to SAMZA-914:
https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 5d066c5867e9df9e94e60bde825dedf10703b399

>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java PRE-CREATION

>   samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java PRE-CREATION

>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java PRE-CREATION

>   samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java PRE-CREATION

>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message