samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jagadish Venkatraman <jagadish1...@gmail.com>
Subject Re: Review Request 44920: Remove tight coupling of Samza with Yarn. Define APIs for resource manager integration
Date Thu, 24 Mar 2016 21:04:38 GMT


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java,
line 78
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301262#file1301262line78>
> >
> >     final :).
> >     
> >     Given that you're not doing cas, it would be sufficient to use a volatile for
isRunning instead.

Nice find. I've fixed it to use a volatile boolean. However, I'm not modifying/fixing the
existing code in https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java
since it's going away.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java,
line 111
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301262#file1301262line111>
> >
> >     This probably doesn't warrant logging, at least not at the info level. Typically
you would reinterrupt here which would cause the next blocking call to throw InterrupedException.

Fixed :)


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java,
line 70
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301262#file1301262line70>
> >
> >     final if possible - especially as this is exposed to anything in the package.

I'll make this final.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java,
line 108
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line108>
> >
> >     This should be volatile. The class says it is not thread safe, but I don't see
an obvious way for coding calling onError to ensure that the thread running run will see this
change.

Good point. I've changed this to a volatile boolean - exceptionOccured.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java,
line 121
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line121>
> >
> >     I would make this a final AtomicBoolean and cas(false, true) at the beginning
of run. Otherwise it could be possible to run this twice even when synchronized, e.g.:
> >     
> >     ```
> >     synchronize(lock)
> >     {
> >         Thread t1 = new Thread(coordinator);
> >         t1.start();
> >         Thread t2 = new Thread(coordinator);
> >         t2.start();
> >     }
> >     ```
> >     
> >     There is no happens-before constraint between the execution of run in t1 and
the execution of run in t2 (they are both done on threads that don't hold the lock), so the
check may fail.

Great point. I've modified this to use a CAS as suggested.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java,
line 153
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line153>
> >
> >     A few other fields here are trivial opportunities for being pulled up to the
constructor and made final.

Done. I've pulled fields into the class.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java,
lines 163-164
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line163>
> >
> >     It looks like this could be pulled into the constructor. Benefits:
> >     
> >     1. We can make the processManager final.
> >     2. We don't need to verify that processManager is not null in shutdown.
> >     3. We can fail early (at construction time) if the configuration is invalid.

Thanks for explaining. Fixed. This is refactored from SamzaAppMaster.scala, I'm not however,
changing the original code in SamzaAppMaster.scala. (since it will go away soon)


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java,
line 191
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line191>
> >
> >     Yes :)

Done :) Thanks for confirming


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java,
lines 246-263
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line246>
> >
> >     How paranoid do we need to be here? Do any of these interact with exernal systems?
If so, I would guard everything on the shutdown path so that we're sure that a RuntimeException
or Error doesn't cause us to miss one of these systems.

I've guarded everything in the shutdown path so that we shut down all components.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java,
line 74
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301266#file1301266line74>
> >
> >     It looks like this should be an interface.

I played around with 3 ideas:

1. Making ContainerProcessManager, Callback as separate interfaces. Yarn, Mesos implementations
will implement them.
2. Making ContainerProcessManager an abstract class taking a Callback in its constructor and
making Callback as its static interface. Yarn, Mesos implementations will extend ContainerProcessManager.
(this is the current design I went with)
3. Making ContainerProcessManager, Callback as separate interfaces and creating a separate
AbstractContainerProcessManager abstract class that implements ContainerProcessManager. Yarn,
Mesos implementations will extend AbstractContainerProcessManager.

My goals were to

A. Ensure that an instance of a ContainerProcessManager implementation is always created with
a callback. I considered including a registerCallback(Callback) in the ContainerProcessManager
interface. But, providing the callback during construction seemed more cleaner. It makes the
lifecycle easier to reason about. (for ex, we could make the callback final) 

B. If we add more methods to the ContainerProcessManager, it would be great if we don't break
clients. Having it as an abstract class makes it easier to provide default implementations.
(until JDK8 https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html :D ). 

Out of 2,3 (both satisfy the above goals), 2 seemed simpler. Another nice effect, It associated
the Callback as a static interface inside ContainerProcessManager (as a Callback is always
meant to be used with a ContainerProcessManager) and (conversely, a ContainerProcessManager
is always meant to be used by providing a Callback).


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java,
line 38
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line38>
> >
> >     I sprinkled some comments on threading below. After a few of those (which I'll
keep for reference), I would suggest instead that we re-evaluate this class as a whole. We
can either:
> >     
> >     1. Determine that everything can be done such that it appears atomic without
a lock, in which case we use a ConcurrentLinkedQueue in allocatedContainers, ensure that SamzaResource
and SamzaResourceRequest are immutable or they are copied on update, and drop all synchronized
statements.
> >     2. Determine that we actually do need locks around critical sections and thus
drop the additional layer of locking in CHM and PBQ.

I've refactored the ContainerRequest state class to be thread-safe. I ended up going with
(2). 

Eventhough (1) maybe more efficient, I did not want to make several changes to the existing
logic (prior to this refactor). See https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java
.  I'm not porting these changes back to the existing yarn/ContainerRequestState, as it'll
be throw-away work.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java,
line 46
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line46>
> >
> >     This looks almost thread safe, assuming thread safety of all of the contained
objects. If that assumption holds you could a ConcurrentLinkedQueue.
> >     
> >     However, if there is something stateful going on with the objects contained
here or below then it would make more sense to drop CHM as proposed.

I've dropped the CHM, and made it to a Map. All accesses have to be guarded by a lock anyways.
Might as well, get rid off the additional level of locking. 

Also, another change is to make methods return a copy instead of an original object (so that
callers can mess around with the copy).


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java,
line 75
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line75>
> >
> >     I generally suggest that you don't use synchronized on methods. The reason is
that there is nothing preventing somone holding an instance of ContainerRequestState from
using it as a lock elsewhere, which could cause deadlocks or a degradation in performance
that is hard to track down. The alternatives would be to either use a private final Object
as a lock or use something like a ReentrantLock.

Great point! I've changed it to use a final object as a lock.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java,
line 93
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line93>
> >
> >     If we wanted to get rid of synchronization here, you could use putIfAbsent to
make this an atomic operation. As is, this unnecessarily involves two lock acquisitions, which
would go away if we switch to a regular HashMap.

The newer change removes the CHM, and protects it by a single lock. We have to use the lock
anyways, might as well get rid off the CHM to remove the extra layer of locking.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java, line
30
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301269#file1301269line30>
> >
> >     final and...

Good find. Fixed!


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java, line
34
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301269#file1301269line34>
> >
> >     final.
> >     
> >     These fields are private and the only accessors are read-only, so using final
makes this object immutable and buys better guarantees about visibility across threads (https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.5)

Fixed :) I'm not porting these changes back to https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java
(as the class will exist only for a short time)


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java, line
36
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301270#file1301270line36>
> >
> >     Agreed. This is a bit mix of atomic and mutable state. Either make it thread
safe (e.g. lock down mutability of fields), ensure that remaining mutable state can be updated
atomically, or decide it is not intended to be thread safe and get rid of additional locks
/ volatile accesses that add cost with no gain. Former is preferred :).
> 
> Jagadish Venkatraman wrote:
>     Great feedback Chris! I could not agree more! :-)
>     
>     SamzaAppState class is a currently a source of major problems. I did not want to
touch it (as it was not scoped in this refactoring). Upon digging further, I realize the problem
of making this thread-safe/private is slightly involved.
>     
>     1. There is a jobCoordinator object that is exposed publicly as a part of SamzaAppState.
The jobCoordinator inturn exposes a nested jobModel instance directly thorough its accessor.
The JobModel embeds a LocalityManager that mutates state during some public method calls.
Hence, The jobModel instance is *not* thread-safe and is shared concurrently across the UI
threads, the HTTP server threads in the queued thread pool,the SamzaAppMaster thread. (Created
SAMZA-899 to make the JobModel immutable)
>     
>     2. There are a bunch of state data structures that are publicly exposed in SamzaAppState.
These must be made thread-safe into accessors. These public global variables could be mutated
everywhere in Samza without regard for safety/visibility or correctness. For example, there
is an integer containerCount that is public which is manipulated by both the metrics reporter
and the callback threads. (I created SAMZA-901 to track this)
>     
>     I will work on both of these as these ASAP.
> 
> Jagadish Venkatraman wrote:
>     Just a clarification:
>     1. The JobModel instance is shared concurrently as stated in [1]. This presents a
source of *potential* problems. (I believe there is not an actual bug in the JobModel )
>     2. The containerCount is a public int that *could* be manipulated by both the reporter
and callback. I believe the current interaction does not have any races (since count is just
set at the startup once). But, having as public non-final int *could* be a source of potential
problems if it was modified elsewhere.
> 
> Chris Pettitt wrote:
>     For #2, the state of containerCount in onContainerCompleted is undefined unless start
transitively "happens-before" onContainerCompleted. It may be that that holds, but it is not
obvious to me (without spending more time working out the call graph). If it's not a problem
now it could become one with a seemingly innocent refactoring.

Agreed! I've made containerCount an atomic integer. I believe that currently solves the visibility,
atomicity problem for now for that field. 

Long term, I believe SamzaAppState should be thread-safe (using copy-on-write), with private
final variables, and accessors. However, making this thread-safe/private is a change that
is more involved. (It will be a separate effort altogether - SAMZA-902 tracks this)


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java,
line 27
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301271#file1301271line27>
> >
> >     Fortunately this one is mostly a non-issue these days, but...
> >     
> >     It is a best practice to add an explicit serialVersionUID to Exceptions because
they are Serializable. At one point we actually serialized out Java objects all over the place
and trivial, backwards compatible changes would break the ability to load these objects. It
should be a non-issue now, but since it is such a simple change I would suggest doing it anyway.

Good point. I've added a serialVersionUID to exceptions.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java, line
51
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301272#file1301272line51>
> >
> >     Should host be considered as part of equality and hashCode?

A resource represents a slice of a host. My assumption was a resource (such a slice) could
uniquely identified by a resourceID (ie) resources on the same host will have different IDs
anyways (for example - they may differ in memory, cpu, disk etc.) Do you think it's a reasonable
one?


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java,
line 61
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301273#file1301273line61>
> >
> >     This is more of a stylistic opinion that you can choose to ignore: I typically
put static fields about instance level fields in a class. I think this was done pretty consistently
in other parts of the code I've review too.

Thanks for the feedback, I agree that its good to be consistent with placement of statics.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java,
lines 42-44
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301274#file1301274line42>
> >
> >     Please make these private if possible. Otherwise, they can be changed by any
other class in the package without going through the setter methods. It makes it trickier
to review, at the very least :).
> >     
> >     Also, this looks like a good candidate for an immutable class, using copy on
write if necessary.

I've made these private and final. Thanks for the feedback!


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java,
line 43
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301275#file1301275line43>
> >
> >     Not thread safe.

I've added docs for thread-safety. I've called out that it's not thread-safe.


> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java,
line 79
> > <https://reviews.apache.org/r/44920/diff/1/?file=1301275#file1301275line79>
> >
> >     This should be volatile, especially as I believe it is being used to convey
state across threads (e.g. whatever calls onContainerCompleted and the main thread in ClusterBasedJobCoordinator).

Agreed, good find :) made it volatile for visibility.


- Jagadish


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/#review123902
-----------------------------------------------------------


On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> -----------------------------------------------------------
> 
> (Updated March 16, 2016, 6:23 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan (Data Infrastructure),
Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently has tight coupling with Yarn. This makes it impossible to integrate with
other resource managers like Mesos, or to run standalone without any resource manager. This
RB is a step to implementing SAMZA-881.
> 
> Design Doc: https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> 1.Proposed new APIs for a resource manager to integrate with Samza. (SAMZA-881)
>    - Defined the ContainerProcessManager abstraction, SamzaResource, SamzaResourceRequest.

>    - Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator.
>    - Re-wrote yarn specific request logic by abstracting it into a YarnContainerManager.

> 2.Defined a ClusterManagerConfig to handle configurations independent of Yarn/Mesos.
> 3.Made Samza's cluster interaction independent of Yarn. This separates Samza specific
components into samza-core and Yarn components into samza-yarn.
> 4.Readability improvements to the existing code base.
>    -Added docs for most methods, member variables and classes (including on thread-safety)
>    - Made internal variables final to document intent, visibility across threads. (trivially
by adding modifiers, or by changing where they're initialized.)
> 5.Refactored JobCoordinator into a JobModelReader.
> 
> TODO: Can go into the upcoming release. (P0)
> 1.Refactor the UI state variables and tests. Port some method re-orgs from SAMZA-867
into here.
> 2.Revise packaging structure.
> 4.Document new configs.
> 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the non-refactored namespace.
(For unit-testing, these files continue to exist)
> 
> TODO: (P1)
> 1.Build Mesos integration for Samza. Should be simpler to integrate with the newer APIs.
>   - I started on this, and I plan to refine and post an RB in one of the hack-days.
> 2.Refactor the SamzaAppState class to provide more accessors and eliminate public variables.
(This was
> a consequence of the already existing design which I've tried to be compatible with)
> 
> TODO: I plan to track these with JIRAs so that they can be done later. (P2)
> 1.Get rid of the HTTP Server in the JobCoordinator
> 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/metrics/SamzaAppMasterMetrics.scala PRE-CREATION

>   samza-shell/src/main/bash/run-am.sh 9545a96953baaff17ad14962e02bc12aadbb1101 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnAppState.java PRE-CREATION

>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java
PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManagerFactory.java
PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerRunner.java
PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala 6bf3046a1ae4ed8f57500acae763184084ad0e09

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaAppMasterService.scala
PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaYarnAppMasterLifecycle.scala
PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
30cf34fe1fd3f74537d16e8a51b467cd50835357 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
7f5d9f4af088589d4287c26737bae25567c157d7 
> 
> Diff: https://reviews.apache.org/r/44920/diff/
> 
> 
> Testing
> -------
> 
> 1.Tested with running test jobs in Yarn clusters of varying sizes from 1 node to 36 nodes.
> 2.Tested for failures by killing containers, and ensuring they were brought up again.
> 
> TODO:
> 1.Refactor all unit tests, and ensure updates to state variables are consistent with
the current design.
> 2.More testing with test jobs on clusters.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message