samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jake Maes <jacob.m...@gmail.com>
Subject Re: Review Request 45144: SAMZA-906 Host Affinity - Minimize task reassignment when container count changes
Date Mon, 28 Mar 2016 15:07:09 GMT


> On March 25, 2016, 10:26 p.m., Navina Ramesh wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, line 41
> > <https://reviews.apache.org/r/45144/diff/7/?file=1314982#file1314982line41>
> >
> >     Why should the TaskAssignmentManager be a part of the LocalityManager? It doesn't
seem to be doing much other than providing an accessor getTaskAssignmentManager to GroupByContainerCount.
An extension of AbstractCoordinaotStreamManager typically performs only one function. This
is deviation that I think is not necessary. 
> >     
> >     The TaskAssignmentManager could be instantiated in JobCoordinator and passed
to the "balance" call. Do we really need to couple them together?
> 
> Jake Maes wrote:
>     Explained above in response to Jagadish's review:
>     "
>     1. At one end of the spectrum, they could be completely independent, but that would
complicate a number of method signatures in the JobCoordinator
>     2. At the other end of the spectrum, there could be one LocalityManager that is composed
of a TaskLocalityManager and a ContainerLocalityManager, each of which handle the coordinator
stream interactions. This looks best structurally, but since the current LocalityManager is
used in SamzaContainer and the TaskAssignmentManager will not, it's questionable whether this
structure fits the usage pattern.
>     3. I chose the middle, where the TaskAssignmentManager is a field of the LocalityManger,
which loosely associates them.
>     
>     The latter 2 options both simplify the JobCoordinator, and after the upcoming diff,
allow us to pass one manager into the balance() method which contains both the task and container
mappings. This will be useful for more intelligent implementations of the balance() method,
which might try to reassign tasks from containers that were previously running on the same
host to a new container also on that host. 
>     "
> 
> Navina Ramesh wrote:
>     Oops.. I didn't see your previous responses. My comments was mostly questioning whether
such an explicit association is required. 
>     
>     On point 1, I don't think it is a major change in the JobCoordinator interfaces.
They are all mostly private methods and are going to be refactored as a part of Jagadish's
work. 
>     
>     Unlike TaskAssignmentManager, LocalityManager is meant to be accessible by both Samza
container and job coordinator. This is not the case with TaskAssignmentManager as it is fully
controlled (read & write) only by the Job Coordinator. So, I don't see this affecting
the SamzaContainer in anyway. If at all, I feel that the SamzaContainer now has a loop-hole
to access the TaskAssignmentManager :)
>     
>     In my view, "balance" seems like a specific implementation of TaskNameGrouper that
could have been called BalancedGroupByContainerCount. Granted, this will cause inconvenience
to existing users of GroupByContainerCount to change their config.
> 
> Jake Maes wrote:
>     I wasn't concerned about the JC interfaces as much as code readability/maintenance.
The LocalityManager is in the signature of many of the methods in JC. TaskAssignmentManager
would need to be in almost all of the same signatures. 
>     
>     Agreed on the usage pattern, though I don't assume the availability of the TaskAssignmentManager
in the containers is "necessarily" risky. Regardless, it would be even cleaner if the coordinator
is ONLY written centrally by the AM/leader. That's why I'll be filing a JIRA to move the LocalityManager
writes to the AM/leader. Details to follow...
>     
>     Balance is an extension of group() but should follow the same strategy. GroupByContainerCount.group()
assigns tasks to the containers in round-robin fashion. Balance does the same, but minimizes
movement from the previous assignments. A different implementation of TaskNameGrouper/BalancingTaskNameGrouper
would employ a different strategy for both.
>     
>     Also, the balance() method necessarily has to have a different signature than group()
because there's no way (or would break some abstractions) to pass the locality/mapping info
in from the Factory.
> 
> Navina Ramesh wrote:
>     >> though I don't assume the availability of the TaskAssignmentManager in the
containers is "necessarily" risky. 
>     It does pose a potential risk to future implementations. For example, opening up
the read & write interface of any CoordinatorStreamManager is actually a poor design choice
(historically). This needs to be changed. 
>     
>     >> Regardless, it would be even cleaner if the coordinator is ONLY written
centrally by the AM/leader. That's why I'll be filing a JIRA to move the LocalityManager writes
to the AM/leader.
>     We can discuss this offline. Short explanation: Write access was intentionally provided
to Samza containers in LocalityManager because we want to write the locality of the running
container to the coordinator stream ONLY after it successfully gets into the RunLoop. Since
the AM/JC/leader does not have any feedback indicating it is in the run loop, it should remain
in the container. 
>     
>     >> A different implementation of TaskNameGrouper/BalancingTaskNameGrouper would
employ a different strategy for both.
>     I agree. 
>     
>     >> the balance() method necessarily has to have a different signature than
group() 
>     Sure. The workaround is to use a factory overload as well, though it is an overkill.
I am only suggesting that you can instantiate the TaskAssignmentManager in the JC and pass
it down to the grouper. That way, it is only a couple of interface changes to the private
method and there is no risk in the container.
> 
> Navina Ramesh wrote:
>     Messed up the markdown! Reposting:
>     *though I don't assume the availability of the TaskAssignmentManager in the containers
is "necessarily" risky.*
>     > It does pose a potential risk to future implementations. For example, opening
up the read & write interface of any CoordinatorStreamManager is actually a poor design
choice (historically). This needs to be changed.
>     
>     *Regardless, it would be even cleaner if the coordinator is ONLY written centrally
by the AM/leader. That's why I'll be filing a JIRA to move the LocalityManager writes to the
AM/leader.*
>     > We can discuss this offline. Short explanation: Write access was intentionally
provided to Samza containers in LocalityManager because we want to write the locality of the
running container to the coordinator stream ONLY after it successfully gets into the RunLoop.
Since the AM/JC/leader does not have any feedback indicating it is in the run loop, it should
remain in the container.
>     
>     *A different implementation of TaskNameGrouper/BalancingTaskNameGrouper would employ
a different strategy for both*
>     > I agree.
>     
>     *the balance() method necessarily has to have a different signature than group()*
>     > Sure. The workaround is to use a factory overload as well, though it is an overkill.
I am only suggesting that you can instantiate the TaskAssignmentManager in the JC and pass
it down to the grouper. That way, it is only a couple of interface changes to the private
method and there is no risk in the container.
> 
> Jake Maes wrote:
>     Can you provide examples of the historical problems caused by opening read &
write to the coordinator stream. Without them, I disagree that it's poor design choice, especially
if one agrees that the coordinator stream should only be written centrally. 
>     
>     The original intent for writing the locality from the container was not overlooked.
That's why the proposal to move it to the AM/leader will depend on SAMZA-871
> 
> Jake Maes wrote:
>     > Sure. The workaround is to use a factory overload as well, though it is an overkill.
I am only suggesting 
>     > that you can instantiate the TaskAssignmentManager in the JC and pass it down
to the grouper. That way, 
>     > it is only a couple of interface changes to the private method and there is
no risk in the container.
>     
>     I could do that, but it would muddle up the signatures of a bunch of JC methods with
unnecessary args, especially since balance() would need to take both the LocalityManager AND
the TaskAssignmentManager as arguments (see explanation in my original reply). Both pieces
of locality info could be needed for some implementations of balance. Since they're both locality,
I think it's reasonable to have both in the class called LocalityManager and simplify a bunch
of method/interface signatures. If there's significant enough risk of the TaskAssignmentManager
being used in the containers, I think the compromise is passing it into the LocalityManager
constructor from the JC, but not in the container. 
>     
>     In any case, we are always one code change away from someone using TaskAssignmentManager
in the containers, no matter how easy or hard it is. That's why we have code reviews! :-)
> 
> Navina Ramesh wrote:
>     *Can you provide examples of the historical problems caused by opening read &
write to the coordinator stream. Without them, I disagree that it's poor design choice, especially
if one agrees that the coordinator stream should only be written centrally. *
>     *My reference to a poor design choice was with regard putting a "manager" for accessing
a stream, without being able to scope the level of access to read and write.
>     
>     If features in SAMZA-871 are available, it makes sense to move the "write" of locality
to the JC/AM. 
>     
>     *That's why we have code reviews!*
>     It gets profoundly hard to understand design choices when the developer who made
the choice is no longer active in the code reviews. :) It's better to keep the changes simple
and intuitive.

Seems like there's a lot of misconceptions and stylistic disagreements that are inhibiting
this review. Lets discuss in person.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review125481
-----------------------------------------------------------


On March 25, 2016, 5:30 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 25, 2016, 5:30 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to minimize
the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which can be implemented
pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() functionality added.
This is because the balance logic is specific to a grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the JobCoordinator
code cleaner, but also associates the 2 managers for Host Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a container.
Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the new functionality
without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with taskname groupers,
old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java acf93525ea5c97df187bbe7977e2ae9fea65b32b

>   samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
211b64215f26db49cd4411ff3fb41231145307d7 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
4d093b500b7f3b582446634ced3e9d0b28371616 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 06a96ad6ed786c22924017f894413bfa1ea34c06

>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java PRE-CREATION

>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc

>   samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added the first
time the job is run. It is then used as expected to reduce task reassignment as the container
count was adjusted from 4->3->5 on subsequent runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message