samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yi Pan \(Data Infrastructure\)" <yi...@linkedin.com>
Subject Re: Review Request 47994: SAMZA-915: implementation of StreamPipeline and operator runtime impl classes
Date Tue, 11 Oct 2016 04:20:11 GMT


> On Oct. 7, 2016, 9:04 p.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java,
line 61
> > <https://reviews.apache.org/r/47994/diff/3/?file=1522711#file1522711line61>
> >
> >     3 thoughts on this line:
> >     1. Why should this be static? Wouldn't this preclude you from having two tasks
run the same operator DAG in the same container/process?
> >     
> >     2. And why here instead of the MessageStream or ChainedOperators classes? I
would expect the topology to be an instantiated thing rather than a global map. At a minimum
since this map and ChainedOperators encode similar information (subscribers to an operator
or message stream) they should be consolidated to one source of truth for structural/topology
info.
> >     
> >     3. Does the order of the Operators in the list have any meaning? e.g. does it
implicitly define the order of processing, or is it just for consistency, or is the List used
to allow duplicates?
> 
> Yi Pan (Data Infrastructure) wrote:
>     Hi, Jake, thanks for the comments. Let me try to answer it one-by-one:
>     1. The key to this map is the MessageStream object, which will be separate instances
for each input topic partition. Hence, two tasks w/ the same operator DAG will only share
the SystemMessageStream and will have their own MessageStream and operator objects. Not sure
why sharing the same topology info between two tasks is necessary.
>     2. The reason I put this map in Operators.java is due to packaging and access mode.
In the implementation, I tried to achieve the following two goals: a) restrict the direct
dependency from any operator.api class to operator.impl s.t. we can potentially package API
classes separately. Hence, creating the operator map directly in ChainedOperators in impl
class is not chosen; b) don't expose any internal classes (i.e. Operator class is not exposed
to user at all) via public API classes and methods. Hence, recording the subscribers in MessageStream
class is not chosen since it inevitably requires a public access method in this API class
to get the list of operators, which should not be exposed/accessed by the programmer. The
existance of the multiple layers of topology is strictly following the three-layers in the
API design: programming layer (MessageStream/Windows/...), representation layer (Operators,
etc. in operator.api.internal), and implementation layer (OperatorImpl, Chain
 edOperators, etc.). In each layer, the map is the single source of truth. Classes in different
layers only access the map in its own layer. A single consolidated source of truth will break
the layering design and does not allow packaging the API-only classes separately. Hope this
explains the motivation and thoughts behind the design choices. I am open to any better suggestion
to achive the above two goals.
>     3. So far, I don't see a strong reason for or against a List vs Set. Maybe it would
be better to keep it as Collection s.t. we have freedom in choosing its implementation? 
>     
>     I will keep this issue open to see whether we can find any better ideas for now.
> 
> Jake Maes wrote:
>     Thanks for the explanation. It makes a little more sense now, but I still don't see
the need for global maps. I'll give an example scenario to illustrate why they worry me: 
>     ```
>     As you mentioned, the key to the map is the MessageStream object which is always
intended to be unique. Months/years later, someone changes MessageStream by adding an equals()
and hashcode() implementation based on some cononical name within the DAG so MessageStreams
can be considered equivalent even if they are distinct objects. After this change, the map
starts associating the wrong operator with some message streams. 
>     ```
>     
>     It feels like an easy trap to fall into. 
>     
>     However, if the topology was not a global map, we could avoid conflicts when a particular
MessageStream is used in multiple definitions or instances (1 for each SSP) of an operator
DAG because each one essentially represents its own namespace for the objects it contains.
This could be useful if, for example, we had a design where the DAG is first defined (essentially
defining a Type) and then instantiated for each SSP. I think this design fits well with the
representation/implementation layers you described. 
>     
>     
>     Ultimately I think we want to get to a place where the user's code looks something
like the following:
>     ```
>     OperatorExpression expression = OperatorExpressionBuilder.new().headStream()
>       .filter()
>       .map()
>       .udf()
>       .build()
>     expression.run();
>     ```
>     When they call run() it gets automatically wired into an OperatorTask and submitted
via some modified JobRunner. 
>     
>     To me, this topology info belongs somewhere in/underneath the OperatorExpression.

Hi, Jake, I agree w/ your long term view on the OperatorExpression idea. We just talked a
bit about how the programming API w/ standalone StreamProcessor would look like and it pretty
much seems very close to what you have illustrated here, which does not expose any partition/task
concept in the programming APIs. This brings in other new requirements that has not been included
in the scope of this RB yet. For example, how to describe what is the partition and grouping
logic needed to process two topics, in addition to the processing logic. I would prefer to
keep this RB's scope within each task context and we have already opened another ticket for
a higher level APIs to allow programming API at standalone mode.

P.S. regarding to the argument against global maps, I understand your concern now. It would
be better if the subscribers of a MessageStream object are kept within, to avoid the problem
that you mentioned in the example. The tradeoff is that we may have to expose unnecessary
methods to the end user (i.e. programmers). It might be OK as long as we returns a unmodifiable
collection in MessageStream.getSubscribers(). However, in the instantiation code in OperatorFactory.getOperator(),
it seems to me that we still need to have a global registry for all Operator classes that
have been instantiated to avoid creating two separate implementation instances of the same
operator in join/merge. Example would be: a and b are both SystemMessageStream objects, a.join(b).window()
would be instantiated from two input sources a and b, each has the chain of: a.partialJoinOp
-> windowOp, and b.partialJoinOp -> windowOp. Note that the downstream windowOp *MUST*
be the same instance of operator, instead of
  two instances each has its own state. There has to be some global registry to mark whether
a certain Operator object has already been instantiated w/ a specific implementation object
or not. The ownership of subscribers within each upstream operator's namespae actually places
adversarily in this case, not like the example you used in MessageStream. Any thoughts?


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47994/#review151818
-----------------------------------------------------------


On Oct. 9, 2016, 5:48 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47994/
> -----------------------------------------------------------
> 
> (Updated Oct. 9, 2016, 5:48 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina
Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-915
>     https://issues.apache.org/jira/browse/SAMZA-915
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-915: implementation of ChainedOperators and operator runtime impl classes
> 
> 
> Diffs
> -----
> 
>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java PRE-CREATION

> 
> Diff: https://reviews.apache.org/r/47994/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build.
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message