storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rodrigo Valladares <rodrigovallada...@gmail.com>
Subject Re: What is LoadAwareShuffleGrouping?
Date Tue, 26 Apr 2016 18:26:12 GMT
The balancing is done based on the left capacity of the queue. A read
somewhere on the code (I don;t remember) that the load mapping gives us a
number between 0 and 1 with 0 meaning no tuples in the queue and 1 its a
full capacity.

So int val = (int)(101 - (load.get(targets[i]) * 100));  gives us the left
capacity normalized between 1 and 101 (I think its summing 1 to avoid the
case where all executors are fully loaded in which case you would end up
with a zero division).

loads array store the left capacity and total store the sum of the left
capacity.

If you generate random number between 0 and total in a uniform distribution
the chance for this number to fall between 0 and l1, with l1 being the load
in executor 1, would be l1/total. The chance of it falling between l1 and
l1 + l2 would be l2/total, etc.

That is what the for loop is doing.

2016-04-26 12:26 GMT-05:00 Yamini Joshi <yamini.1691@gmail.com>:

> This is the core function:
>
> https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java
> public List<Integer> chooseTasks(int taskId, List<Object> values,
> LoadMapping load) {
>    if ((lastUpdate + 1000) < System.currentTimeMillis()) {
>      int local_total = 0;
>      for (int i = 0; i < targets.length; i++) {
>        int val = (int)(101 - (load.get(targets[i]) * 100));
>        loads[i] = val;
>        local_total += val;
>      }
>      total = local_total;
>      lastUpdate = System.currentTimeMillis();
>      }
>      int selected = random.nextInt(total);
>      int sum = 0;
>      for (int i = 0; i < targets.length; i++) {
>        sum += loads[i];
>        if (selected < sum) {
>          return rets[i];
>        }
>     }
>     return rets[rets.length-1];
> }
>
> My question: If we have to select the target task with min/max load, why
> are using a random value from the total load and comparing all target tasks
> to it?
>
> Best regards,
> Yamini Joshi
>
> On Tue, Apr 26, 2016 at 11:04 AM, Rodrigo Valladares <
> rodrigovalladares@gmail.com> wrote:
>
>> I was doing some research on this topic. I checked the code a while back
>> and from what I understood  this grouping balances the tuples according to
>> the current queue of each executor.
>>
>> For example Bolt A have 4 executors 2 with a queue at 50% capacity and 2
>> at 40% capacity. The distribution would be proportional to "left capacity
>> on the queue" (1 - capacity) so for executor 1 and 2 it would be:
>> 0.5/(0.5*2 +0.6*2) .
>>
>> I think it was built taking heterogeneous clusters in consideration. In
>> cases where some machines are faster than others shuffle grouping perform
>> very poorly since it does not take into consideration that executors have
>> different latencies. You might expect that slower machine will built a
>> queue faster on its executors so this balancing would address this
>> limitation. I did not develop or tested this so I can't assure you it will
>> work like this, but I think this the idea behind it.
>>
>> 2016-04-26 10:46 GMT-05:00 Tom Brown <tombrown52@gmail.com>:
>>
>>> Hi Yamini,
>>>
>>> Your question piqued my interest, so I decided to see what I could
>>> figure out. After reading this JIRA
>>> https://issues.apache.org/jira/browse/STORM-162, I am still confused.
>>> Hopefully someone else on the list will be able to help us.
>>>
>>> Is LoadAwareShuffleGrouping a way of changing a basic shuffle operation
>>> (e.g. randomized round robin) to include downstream load as an input? Or is
>>> it an extension of a fields grouping that could allow the same tuple to be
>>> sent to different downstream tasks (A or B) depending on which has the
>>> higher load?
>>>
>>> --Tom
>>>
>>> On Tue, Apr 26, 2016 at 8:53 AM, Yamini Joshi <yamini.1691@gmail.com>
>>> wrote:
>>>
>>>> Hello everyone!
>>>>
>>>> I am new to Storm and I was looking into different stream grouping when
>>>> I came across LoadAwareShuffleGrouping. Can someone tell me what it is
>>>> exactly? Has it been included in the latest storm build? All my google
>>>> searches on this point to JIRA tickets.
>>>>
>>>> Any help is appreciated.
>>>>
>>>> Thank you.
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Rodrigo Valladares Cotta
>> Master's Student, Computer Science
>> University of Nebraska-Lincoln
>>
>
>


-- 
Rodrigo Valladares Cotta
Master's Student, Computer Science
University of Nebraska-Lincoln

Mime
View raw message