samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jake Maes <jacob.m...@gmail.com>
Subject Re: Review Request 45144: SAMZA-906 Host Affinity - Minimize task reassignment when container count changes
Date Fri, 25 Mar 2016 00:21:24 GMT


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> >

Thanks for catching these. Patch coming as soon as the unit tests are done.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java,
line 37
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312392#file1312392line37>
> >
> >     nit: the key in the example shows containerId = 1 and the values show containerId
= 139. They should be the same, right?

The 1 in the key is the message version, so no conflict there.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
line 245
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line245>
> >
> >     It seems like the logic here is assuming:
> >     1) we always try to "fill up" each container from 0-n based on average count
> >     2) in successive runs, there can't be any task-container mapping sequence like:
> >     3 tasks -> container 0
> >     3 tasks -> container 1
> >     1 task  -> container 2
> >     3 tasks -> container 3
> >     In the above case, when a new container 4 is added, there will not be balanced
assignment since container 4 will get 3 tasks.
> >     
> >     I would suggest that we don't assume what would be the result from the previous
balance and the calculation is always based on the whole set of tasks <-> containers.
A simple logic would be:
> >     - initialize the set of re-assignable tasks (i.e. from containers that are removed,
from containers that are over the expected average)
> >     - sort all containers based on the current assigned number of tasks, minus the
re-assignable tasks
> >     - start assigning the re-assigning tasks from the end of the sorted container
list, till it reaches the expected average.
> >     - stop the whole algorithm till the re-assignable tasks set is empty.

Great attention to detail!
I didn't see that corner case.
I'll add a unit test for this and fix it.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java,
line 62
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312389#file1312389line62>
> >
> >     Do we have a use case for write-only SamzaTaskAssignmentManager? We needed it
for LocalityManager because SamzaContainer includes an instance of LocalityManager which is
write-only. If we don't have the write-only use case, we should just remove the write-only
flag.

Yeah, classic case of over-engineering. https://en.wikipedia.org/wiki/You_aren%27t_gonna_need_it
I'll remove it. :-)


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
line 99
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line99>
> >
> >     Question: why do we care that the minimum movement only applies to the case
where container number changes? Even if the container numbers are the same, won't it be benefitial
that we just honor the previous task assignment for stateful jobs, instead of re-computing?

I'm glad you said that. That's actually what this code does. It reuses the existing map if
the container count doesn't change. It also enables custom mappings, wherein some tool writes
a mapping to the coordinator stream for the current expected container count.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
line 227
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line227>
> >
> >     What if taskNamesToAssign.isEmpty()==true here?

It can't if the math is right in calculateTaskCountPerContainer()

I'll add an assertion to make sure.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
line 231
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line231>
> >
> >     What if at the end, taskNamesToAssign is not empty?

Same answer as above. I'll make sure with an assertion.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review125157
-----------------------------------------------------------


On March 24, 2016, 9:33 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 24, 2016, 9:33 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to minimize
the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which can be implemented
pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() functionality added.
This is because the balance logic is specific to a grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the JobCoordinator
code cleaner, but also associates the 2 managers for Host Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a container.
Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the new functionality
without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with taskname groupers,
old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java acf93525ea5c97df187bbe7977e2ae9fea65b32b

>   samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
211b64215f26db49cd4411ff3fb41231145307d7 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
4d093b500b7f3b582446634ced3e9d0b28371616 
>   samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 06a96ad6ed786c22924017f894413bfa1ea34c06

>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java PRE-CREATION

>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc

>   samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added the first
time the job is run. It is then used as expected to reduce task reassignment as the container
count was adjusted from 4->3->5 on subsequent runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message