tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bikas Saha <bi...@hortonworks.com>
Subject dynamic range partitioning
Date Tue, 20 Aug 2013 22:18:53 GMT
(Bringing an offline discussion to the email list. More context in the
thread below)

We do plan to increase the types of events that get sent to tasks.
Currently we only send a taskcompletionevent to the consumer task that
informs it about the input.

Sending more events would need plumbing changes in the TaskUmbilical and
probably some refactoring of the event hierarchy.

Before we do that we need to think through to some degree, when and what
events make sense so that we can make a properly designed improvement to
the AM code.



Hmm, interesting. I see that there is no need to tell mappers currently
about the runtime parallelism stuff because you just inform the reducers of
what ranges to fetch. Would you be opposed to having events being sent to
map tasks?


On Tue, Aug 20, 2013 at 11:25 AM, Bikas Saha

No events go to the Map task before it completes. The only events that go
to the tasks are completion events that tell them about completion of their
inputs. So currently there is no way one can tell the mapper while its
running to do something else.


*From:* Achal Soni
*Sent:* Tuesday, August 20, 2013 11:18 AM

*To:* Bikas Saha
*Subject:* Re: Shuffle

Yes this sounds like a good solution, and similar to what I am thinking of
as well. In Pig, whenever range partitioning is concerned, a sampling job
is performed that reads through all the inputs. There is special logic when
it comes to sampling, for example for a skewed join, it does special
sampling for certain skewed keys. That is where Tez would come in by
basically generating histograms and then having this user code figure out
the ranges according to whatever metrics it wants to. We can provide a
default implementation that just determines ranges based on the amount of
data the keys are generating perhaps. (Pig may do something by
extrapolating the amount of work certain keys will cause in the next MR

I believe that in MR, the reducer goes to each map task via HTTP, and
requests for it's partition. The mapper  has a single, consolidated spill
file with all of its output data, and has an index on this spill file with
the starting offset and length of each partition (one per reducer in
traditional MR, but in your runtime parallelism scheme, may be more than
one partition) in this spill file, and serves the data likeso. I am
thinking of building upon your code for the runtime parallelism to
basically send the histogram from each mapper via event updates and then
consolidating that similar to "determineAndApplyParallelism". Then the
mappers would receive event updates with the new ranges so when it creates
the index for each partition, it can update it appropriately. Then when the
reducers come to the mapper in the copy phase, we don't need to touch the
ShuffleHandler code which handles the HTTP connection, as the partition
indices would already be updated.

I am myself just learning the code base right now, so if you find any
inconsistencies/errors do let me know!

- Achal

On Tue, Aug 20, 2013 at 10:17 AM, Bikas Saha wrote:

There is a well-known solution (or at least I know of it from an earlier
life) to dynamic range partitioning with different numbers of partitions.
Basically the “partitioner” just leaves a sorted output and a sample. Using
the sample, we can determine the right number of reducers and their ranges.
The only problem is how to serve the right section of the sorted file to
the relevant reducer. Eg. one only want to give reducer R the portion of
the sorted file from values F-K. The file server could seek the file to F
and then serve up to K. In order to seek quickly to F it needs to build an
index when writing the file. The index can get very large is its too
fine-grained. So we keep and index proportional to the expected number of
reducers. Then when the reducers are for their data then the file server
gives it a section of file that “contains” the data of interest which may
be a little more than needed. The reducer can filter on its side. Or the
server can filter on its side and send the exact range. Whichever suits the

I believe what you are suggesting below is around these lines.


NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message