samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jake Maes <jacob.m...@gmail.com>
Subject Re: Review Request 45144: SAMZA-906 Host Affinity - Minimize task reassignment when container count changes
Date Wed, 23 Mar 2016 15:56:50 GMT


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, line 40
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309649#file1309649line40>
> >
> >     Please make this final (though this is unrelated to your change.) SAMZA-902
tracks refactoring to make Samza classes more thread-safe.

I was going to do this, but it is assigned on line 108.


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, line 142
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309649#file1309649line142>
> >
> >     Why do we need a setter here? I recommend moving this setter to the constructor
of LocalityManager. 
> >     
> >     The following properties nicely fall out :
> >     1. We could make taskAssignmentManager final as a result.
> >     2. It becomes easy to reason about this class as the object created is consistent
right away. As opposed to waiting for the creator to call a setter() before you can call get()
on the taskAssignmentManager.
> >     3. It makes my life for SAMZA-902 easier ;)

Because I wasn't sure how tightly I wanted to associate the TaskAssignmentManager and the
LocalityManager. :-)

- At one end of the spectrum, they could be completely independent, but that would complicate
a number of method signatures in the JobCoordinator
 
- At the other end of the spectrum, there could be one LocalityManager that is composed of
a TaskLocalityManager and a ContainerLocalityManager, each of which handle the coordinator
stream interactions. This looks best structurally, but since the current LocalityManager is
used in SamzaContainer and the TaskAssignmentManager will not, it's questionable whether this
structure fits the usage pattern.

- I chose the middle, where the TaskAssignmentManager is a field of the LocalityManger, which
loosely associates them. 
 
The latter 2 options both simplify the JobCoordinator, and after the upcoming diff, allow
us to pass one manager into the balance() method which contains both the task and container
mappings. This will be useful for more intelligent implementations of the balance() method,
which might try to reassign tasks from containers that were previously running on the same
host to a new container also on that host. 

Anyway, you're right, if I'm going to use approach 3, I should commit to it and make the field
final! Thanks for the suggestion.


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
line 89
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309651#file1309651line89>
> >
> >     allContainers.size is used in 4 places. Maybe, use a local variable called -
prevContainerCount? It also makes it easy to read that allContainers.size() is actually referring
to container count prior to the balance.

Five places, actually. Thanks for noticing this. I'll do exactly that.


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
line 180
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309651#file1309651line180>
> >
> >     This logic is well written :-)
> >     
> >     It will really help if you can illustrate with a small example as a comment.
> >     1. when counts increase.
> >     2. when counts decrease.
> 
> Jagadish Venkatraman wrote:
>     Here's an idea, save it for later if you think it's a total diversion. But, I believe
it's worth investigating.
>     
>     How about making the TaskAssignmentManager immutable? That way, the TaskAssignment
manager is given the list of assignments when it starts and the assignments does not change.
>     
>     The following are the advantages:
>     1. Currently, the map that tracks <taskNameToContainerId> can be made final.
>     2. The mapping can be initialized once during construction. The constructor could
take care of deleting the old mappings, and re-refreshing the new mappings into the coordinator
stream.
>     3. Do we change the map after the JobCoordinator starts/stops containers? If not,
then it makes sense for it to be immutable.
>     4. We can fail early during construction time.
>     5. Methods that return the taskNameToContainerId can now return a cached copy as
opposed to re-bootstrapping from coordinator stream each time.
> 
> Jagadish Venkatraman wrote:
>     I just realized that it's hard to implement this. I believe that the TaskAssignmentManager
has been modelled according to the LocalityManager that obtains the container locality. Never
mind.

There are some comments to that effect in the unit tests. I could copy them over, but they
sort of dominate the file.

About immutability; the one way to do it is to remove the map all together. It really only
enables the log messages, which I could put in the grouper.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review124812
-----------------------------------------------------------


On March 22, 2016, 1:20 a.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 22, 2016, 1:20 a.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to minimize
the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which can be implemented
pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() functionality added.
This is because the balance logic is specific to a grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the JobCoordinator
code cleaner, but also associates the 2 managers for Host Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a container.
Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the new functionality
without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with taskname groupers,
old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java acf93525ea5c97df187bbe7977e2ae9fea65b32b

>   samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGroupBalancer.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
211b64215f26db49cd4411ff3fb41231145307d7 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
4d093b500b7f3b582446634ced3e9d0b28371616 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 06a96ad6ed786c22924017f894413bfa1ea34c06

>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGroupBalancer.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java PRE-CREATION

>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc

>   samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added the first
time the job is run. It is then used as expected to reduce task reassignment as the container
count was adjusted from 4->3->5 on subsequent runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message