samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Pettitt <cpett...@linkedin.com>
Subject Re: Review Request 44920: Remove tight coupling of Samza with Yarn. Define APIs for resource manager integration
Date Wed, 16 Mar 2016 20:48:00 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/44920/#review123902
-----------------------------------------------------------



This is a deceptively big RB :) I got to about SamzaTaskManager, but still need to review
from there. This at least gets you some early feedback up to that point and ensures I don't
lose any comments.


samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line
46)
<https://reviews.apache.org/r/44920/#comment186216>

    Discussed with Jagadish offline, but summarizing here:
    
    I think we could simplify some of the interactions across classes if we make this a concrete
task that takes a container allocator strategy interface (e.g. standard or host aware). A
few nice properties that fall out:
    
    1. The allocators don't need to know the inner workings of AbstractContainerAllocator
(ACA). In fact, the implementation of ACA can change without breaking allocators.
    2. It easy to test just the allocator strategy in isolation because it becomes more functional
in nature (e.g. no direct dependency on state in ACA).
    3. Easier to code review due to less cross-class interactions ;)



samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line
66)
<https://reviews.apache.org/r/44920/#comment186191>

    It looks like this could be final.



samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line
70)
<https://reviews.apache.org/r/44920/#comment186192>

    final if possible - especially as this is exposed to anything in the package.



samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line
78)
<https://reviews.apache.org/r/44920/#comment186193>

    final :).
    
    Given that you're not doing cas, it would be sufficient to use a volatile for isRunning
instead.



samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line
111)
<https://reviews.apache.org/r/44920/#comment186201>

    This probably doesn't warrant logging, at least not at the info level. Typically you would
reinterrupt here which would cause the next blocking call to throw InterrupedException.



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line
108)
<https://reviews.apache.org/r/44920/#comment186223>

    This should be volatile. The class says it is not thread safe, but I don't see an obvious
way for coding calling onError to ensure that the thread running run will see this change.



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line
121)
<https://reviews.apache.org/r/44920/#comment186229>

    I would make this a final AtomicBoolean and cas(false, true) at the beginning of run.
Otherwise it could be possible to run this twice even when synchronized, e.g.:
    
    ```
    synchronize(lock)
    {
        Thread t1 = new Thread(coordinator);
        t1.start();
        Thread t2 = new Thread(coordinator);
        t2.start();
    }
    ```
    
    There is no happens-before constraint between the execution of run in t1 and the execution
of run in t2 (they are both done on threads that don't hold the lock), so the check may fail.



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line
153)
<https://reviews.apache.org/r/44920/#comment186230>

    A few other fields here are trivial opportunities for being pulled up to the constructor
and made final.



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (lines
163 - 164)
<https://reviews.apache.org/r/44920/#comment186220>

    It looks like this could be pulled into the constructor. Benefits:
    
    1. We can make the processManager final.
    2. We don't need to verify that processManager is not null in shutdown.
    3. We can fail early (at construction time) if the configuration is invalid.



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line
191)
<https://reviews.apache.org/r/44920/#comment186231>

    Yes :)



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line
194)
<https://reviews.apache.org/r/44920/#comment186233>

    Jake already pointed out some (all?) of these, so I will comment just once that I agree
they should be pulled. We should use the logger to capture stack traces.



samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (lines
246 - 263)
<https://reviews.apache.org/r/44920/#comment186234>

    How paranoid do we need to be here? Do any of these interact with exernal systems? If
so, I would guard everything on the shutdown path so that we're sure that a RuntimeException
or Error doesn't cause us to miss one of these systems.



samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java (line
74)
<https://reviews.apache.org/r/44920/#comment186235>

    It looks like this should be an interface.



samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line
38)
<https://reviews.apache.org/r/44920/#comment186239>

    I sprinkled some comments on threading below. After a few of those (which I'll keep for
reference), I would suggest instead that we re-evaluate this class as a whole. We can either:
    
    1. Determine that everything can be done such that it appears atomic without a lock, in
which case we use a ConcurrentLinkedQueue in allocatedContainers, ensure that SamzaResource
and SamzaResourceRequest are immutable or they are copied on update, and drop all synchronized
statements.
    2. Determine that we actually do need locks around critical sections and thus drop the
additional layer of locking in CHM and PBQ.



samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line
46)
<https://reviews.apache.org/r/44920/#comment186236>

    This looks almost thread safe, assuming thread safety of all of the contained objects.
If that assumption holds you could a ConcurrentLinkedQueue.
    
    However, if there is something stateful going on with the objects contained here or below
then it would make more sense to drop CHM as proposed.



samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line
75)
<https://reviews.apache.org/r/44920/#comment186237>

    I generally suggest that you don't use synchronized on methods. The reason is that there
is nothing preventing somone holding an instance of ContainerRequestState from using it as
a lock elsewhere, which could cause deadlocks or a degradation in performance that is hard
to track down. The alternatives would be to either use a private final Object as a lock or
use something like a ReentrantLock.



samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line
93)
<https://reviews.apache.org/r/44920/#comment186238>

    If we wanted to get rid of synchronization here, you could use putIfAbsent to make this
an atomic operation. As is, this unnecessarily involves two lock acquisitions, which would
go away if we switch to a regular HashMap.



samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java (line 30)
<https://reviews.apache.org/r/44920/#comment186240>

    final and...



samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java (line 34)
<https://reviews.apache.org/r/44920/#comment186243>

    final.
    
    These fields are private and the only accessors are read-only, so using final makes this
object immutable and buys better guarantees about visibility across threads (https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.5)



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java (line 36)
<https://reviews.apache.org/r/44920/#comment186244>

    Agreed. This is a bit mix of atomic and mutable state. Either make it thread safe (e.g.
lock down mutability of fields), ensure that remaining mutable state can be updated atomically,
or decide it is not intended to be thread safe and get rid of additional locks / volatile
accesses that add cost with no gain. Former is preferred :).



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
(line 27)
<https://reviews.apache.org/r/44920/#comment186245>

    Fortunately this one is mostly a non-issue these days, but...
    
    It is a best practice to add an explicit serialVersionUID to Exceptions because they are
Serializable. At one point we actually serialized out Java objects all over the place and
trivial, backwards compatible changes would break the ability to load these objects. It should
be a non-issue now, but since it is such a simple change I would suggest doing it anyway.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java (line 51)
<https://reviews.apache.org/r/44920/#comment186246>

    Should host be considered as part of equality and hashCode?



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java (line 61)
<https://reviews.apache.org/r/44920/#comment186247>

    This is more of a stylistic opinion that you can choose to ignore: I typically put static
fields about instance level fields in a class. I think this was done pretty consistently in
other parts of the code I've review too.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java (lines 42
- 44)
<https://reviews.apache.org/r/44920/#comment186249>

    Please make these private if possible. Otherwise, they can be changed by any other class
in the package without going through the setter methods. It makes it trickier to review, at
the very least :).
    
    Also, this looks like a good candidate for an immutable class, using copy on write if
necessary.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java (line 43)
<https://reviews.apache.org/r/44920/#comment186250>

    Not thread safe.



samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java (line 79)
<https://reviews.apache.org/r/44920/#comment186254>

    This should be volatile, especially as I believe it is being used to convey state across
threads (e.g. whatever calls onContainerCompleted and the main thread in ClusterBasedJobCoordinator).


- Chris Pettitt


On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/44920/
> -----------------------------------------------------------
> 
> (Updated March 16, 2016, 6:23 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan (Data Infrastructure),
Navina Ramesh, and Xinyu Liu.
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Samza currently has tight coupling with Yarn. This makes it impossible to integrate with
other resource managers like Mesos, or to run standalone without any resource manager. This
RB is a step to implementing SAMZA-881.
> 
> Design Doc: https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf
> 
> 1.Proposed new APIs for a resource manager to integrate with Samza. (SAMZA-881)
>    - Defined the ContainerProcessManager abstraction, SamzaResource, SamzaResourceRequest.

>    - Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator.
>    - Re-wrote yarn specific request logic by abstracting it into a YarnContainerManager.

> 2.Defined a ClusterManagerConfig to handle configurations independent of Yarn/Mesos.
> 3.Made Samza's cluster interaction independent of Yarn. This separates Samza specific
components into samza-core and Yarn components into samza-yarn.
> 4.Readability improvements to the existing code base.
>    -Added docs for most methods, member variables and classes (including on thread-safety)
>    - Made internal variables final to document intent, visibility across threads. (trivially
by adding modifiers, or by changing where they're initialized.)
> 5.Refactored JobCoordinator into a JobModelReader.
> 
> TODO: Can go into the upcoming release. (P0)
> 1.Refactor the UI state variables and tests. Port some method re-orgs from SAMZA-867
into here.
> 2.Revise packaging structure.
> 4.Document new configs.
> 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the non-refactored namespace.
(For unit-testing, these files continue to exist)
> 
> TODO: (P1)
> 1.Build Mesos integration for Samza. Should be simpler to integrate with the newer APIs.
>   - I started on this, and I plan to refine and post an RB in one of the hack-days.
> 2.Refactor the SamzaAppState class to provide more accessors and eliminate public variables.
(This was
> a consequence of the already existing design which I've tried to be compatible with)
> 
> TODO: I plan to track these with JIRAs so that they can be done later. (P2)
> 1.Get rid of the HTTP Server in the JobCoordinator
> 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java PRE-CREATION

>   samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala PRE-CREATION

>   samza-core/src/main/scala/org/apache/samza/metrics/SamzaAppMasterMetrics.scala PRE-CREATION

>   samza-shell/src/main/bash/run-am.sh 9545a96953baaff17ad14962e02bc12aadbb1101 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnAppState.java PRE-CREATION

>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java
PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManagerFactory.java
PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerRunner.java
PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala 6bf3046a1ae4ed8f57500acae763184084ad0e09

>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaAppMasterService.scala
PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaYarnAppMasterLifecycle.scala
PRE-CREATION 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
30cf34fe1fd3f74537d16e8a51b467cd50835357 
>   samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
7f5d9f4af088589d4287c26737bae25567c157d7 
> 
> Diff: https://reviews.apache.org/r/44920/diff/
> 
> 
> Testing
> -------
> 
> 1.Tested with running test jobs in Yarn clusters of varying sizes from 1 node to 36 nodes.
> 2.Tested for failures by killing containers, and ensuring they were brought up again.
> 
> TODO:
> 1.Refactor all unit tests, and ensure updates to state variables are consistent with
the current design.
> 2.More testing with test jobs on clusters.
> 
> 
> Thanks,
> 
> Jagadish Venkatraman
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message