samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jake Maes <jma...@apache.org>
Subject Re: Review Request 47994: SAMZA-915: implementation of StreamPipeline and operator runtime impl classes
Date Tue, 11 Oct 2016 22:35:32 GMT


> On Oct. 7, 2016, 9:04 p.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java,
line 61
> > <https://reviews.apache.org/r/47994/diff/3/?file=1522711#file1522711line61>
> >
> >     3 thoughts on this line:
> >     1. Why should this be static? Wouldn't this preclude you from having two tasks
run the same operator DAG in the same container/process?
> >     
> >     2. And why here instead of the MessageStream or ChainedOperators classes? I
would expect the topology to be an instantiated thing rather than a global map. At a minimum
since this map and ChainedOperators encode similar information (subscribers to an operator
or message stream) they should be consolidated to one source of truth for structural/topology
info.
> >     
> >     3. Does the order of the Operators in the list have any meaning? e.g. does it
implicitly define the order of processing, or is it just for consistency, or is the List used
to allow duplicates?
> 
> Yi Pan (Data Infrastructure) wrote:
>     Hi, Jake, thanks for the comments. Let me try to answer it one-by-one:
>     1. The key to this map is the MessageStream object, which will be separate instances
for each input topic partition. Hence, two tasks w/ the same operator DAG will only share
the SystemMessageStream and will have their own MessageStream and operator objects. Not sure
why sharing the same topology info between two tasks is necessary.
>     2. The reason I put this map in Operators.java is due to packaging and access mode.
In the implementation, I tried to achieve the following two goals: a) restrict the direct
dependency from any operator.api class to operator.impl s.t. we can potentially package API
classes separately. Hence, creating the operator map directly in ChainedOperators in impl
class is not chosen; b) don't expose any internal classes (i.e. Operator class is not exposed
to user at all) via public API classes and methods. Hence, recording the subscribers in MessageStream
class is not chosen since it inevitably requires a public access method in this API class
to get the list of operators, which should not be exposed/accessed by the programmer. The
existance of the multiple layers of topology is strictly following the three-layers in the
API design: programming layer (MessageStream/Windows/...), representation layer (Operators,
etc. in operator.api.internal), and implementation layer (OperatorImpl, Chain
 edOperators, etc.). In each layer, the map is the single source of truth. Classes in different
layers only access the map in its own layer. A single consolidated source of truth will break
the layering design and does not allow packaging the API-only classes separately. Hope this
explains the motivation and thoughts behind the design choices. I am open to any better suggestion
to achive the above two goals.
>     3. So far, I don't see a strong reason for or against a List vs Set. Maybe it would
be better to keep it as Collection s.t. we have freedom in choosing its implementation? 
>     
>     I will keep this issue open to see whether we can find any better ideas for now.
> 
> Jake Maes wrote:
>     Thanks for the explanation. It makes a little more sense now, but I still don't see
the need for global maps. I'll give an example scenario to illustrate why they worry me: 
>     ```
>     As you mentioned, the key to the map is the MessageStream object which is always
intended to be unique. Months/years later, someone changes MessageStream by adding an equals()
and hashcode() implementation based on some cononical name within the DAG so MessageStreams
can be considered equivalent even if they are distinct objects. After this change, the map
starts associating the wrong operator with some message streams. 
>     ```
>     
>     It feels like an easy trap to fall into. 
>     
>     However, if the topology was not a global map, we could avoid conflicts when a particular
MessageStream is used in multiple definitions or instances (1 for each SSP) of an operator
DAG because each one essentially represents its own namespace for the objects it contains.
This could be useful if, for example, we had a design where the DAG is first defined (essentially
defining a Type) and then instantiated for each SSP. I think this design fits well with the
representation/implementation layers you described. 
>     
>     
>     Ultimately I think we want to get to a place where the user's code looks something
like the following:
>     ```
>     OperatorExpression expression = OperatorExpressionBuilder.new().headStream()
>       .filter()
>       .map()
>       .udf()
>       .build()
>     expression.run();
>     ```
>     When they call run() it gets automatically wired into an OperatorTask and submitted
via some modified JobRunner. 
>     
>     To me, this topology info belongs somewhere in/underneath the OperatorExpression.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Hi, Jake, I agree w/ your long term view on the OperatorExpression idea. We just
talked a bit about how the programming API w/ standalone StreamProcessor would look like and
it pretty much seems very close to what you have illustrated here, which does not expose any
partition/task concept in the programming APIs. This brings in other new requirements that
has not been included in the scope of this RB yet. For example, how to describe what is the
partition and grouping logic needed to process two topics, in addition to the processing logic.
I would prefer to keep this RB's scope within each task context and we have already opened
another ticket for a higher level APIs to allow programming API at standalone mode.
>     
>     P.S. regarding to the argument against global maps, I understand your concern now.
It would be better if the subscribers of a MessageStream object are kept within, to avoid
the problem that you mentioned in the example. The tradeoff is that we may have to expose
unnecessary methods to the end user (i.e. programmers). It might be OK as long as we returns
a unmodifiable collection in MessageStream.getSubscribers(). However, in the instantiation
code in OperatorFactory.getOperator(), it seems to me that we still need to have a global
registry for all Operator classes that have been instantiated to avoid creating two separate
implementation instances of the same operator in join/merge. Example would be: a and b are
both SystemMessageStream objects, a.join(b).window() would be instantiated from two input
sources a and b, each has the chain of: a.partialJoinOp -> windowOp, and b.partialJoinOp
-> windowOp. Note that the downstream windowOp *MUST* be the same instance of operator,
inst
 ead of two instances each has its own state. There has to be some global registry to mark
whether a certain Operator object has already been instantiated w/ a specific implementation
object or not. The ownership of subscribers within each upstream operator's namespae actually
places adversarily in this case, not like the example you used in MessageStream. Any thoughts?

>> I would prefer to keep this RB's scope within each task context

Totally fine with me. 

>> It would be better if the subscribers of a MessageStream object are kept within,
to avoid the problem that you mentioned in the example. The tradeoff is that we may have to
expose unnecessary methods to the end user (i.e. programmers)

Yeah, I go back and fourth on this. It really depends on how you want to model the streams.
In some systems, the streams know who their subscribers are. In others, the subscribers know
what streams they subscribe to. In a system where the entire topology is governed as a collective,
the subscriber information can belong in the topology itself (OperatorExpression in my example
above)

>> There has to be some global registry to mark whether a certain Operator object has
already been instantiated w/ a specific implementation object or not.

I see the need for a registry, but rather than *global* scope, I'd say it should be *expression*
scope. 


Regardless of the above discussions, the topology mapping currently isn't part of the API
and is therefore easier to change. So I don't mean to hold up this patch.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47994/#review151818
-----------------------------------------------------------


On Oct. 9, 2016, 5:48 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47994/
> -----------------------------------------------------------
> 
> (Updated Oct. 9, 2016, 5:48 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake Maes, Navina
Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-915
>     https://issues.apache.org/jira/browse/SAMZA-915
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-915: implementation of ChainedOperators and operator runtime impl classes
> 
> 
> Diffs
> -----
> 
>   samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java PRE-CREATION

>   samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java PRE-CREATION

>   samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java PRE-CREATION

> 
> Diff: https://reviews.apache.org/r/47994/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build.
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message