drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Phillips <sphill...@maprtech.com>
Subject Re: Memory plannig in Drill
Date Thu, 09 Oct 2014 18:26:42 GMT
In response to Jinfeng's comments, we are already working on the staging
problem, so major fragments that cannot yet be consumed by down stream
fragments will not run or consume memory until the downstream fragments are
ready to consume. The design details have not been finalized, but the basic
idea is that senders will send an initial batch for the purpose of
propagating schema information, and then will not continue until receiving
an ack to indicate that the downstream is ready to consume.

On Thu, Oct 9, 2014 at 10:53 AM, Julian Hyde <julianhyde@gmail.com> wrote:

> The standard thinking is to declare “barriers” between operators. When the
> operators below the barrier complete, they start releasing their memory.
> The operators above the barrier don’t need to allocate any memory until the
> barrier has been reached.
>
> It is an old idea, so there should be plenty in the literature about it. I
> found
> http://www.transaction.de/fileadmin/downloads/pdf/Whitepapers/Whitepaper-Parallel-Query-Processing-in-Database-Systems-Multicore-Architectures-Transbase-english.pdf
> and a quick read indicates that it seems to be talking about the right
> concepts. It is about multicore but for resource allocation purposes
> distributed query execution work in much the same way.
>
> We had a “barrier” operator in LucidDB that fulfilled this function.
> https://github.com/LucidDB/luciddb/blob/master/fennel/exec/BarrierExecStream.h
>
> Julian
>
>
> On Oct 9, 2014, at 9:53 AM, Parth Chandra <pchandra@maprtech.com> wrote:
>
> > Thanks for the reference to the paper Tim. Definitely worth a read.
> > And yes, I think this is important enough a topic that we need a thorough
> > discussion so the more inputs we can get, the better.
> >
> >
> > Parth
> >
> > On Thu, Oct 9, 2014 at 12:37 AM, Timothy Chen <tnachen@gmail.com> wrote:
> >
> >> Hi Parth,
> >>
> >> Thanks for providing an update, this is really great to see more
> >> design discussions on the list!
> >>
> >> The pipeline chains definitely makes lot of sense, and I still
> >> remember discussions offline around this in the past.
> >>
> >> The global memory efficency seems like a scheduling problem, as the
> >> delay of a chain only benefits if there are other chains in-flight,
> >> and be at a chain level or at a query level.
> >>
> >> I don't have much to add yet, but love to see how we can start simple on
> >> this.
> >>
> >> One paper that is relevant that I just started to read is from the
> >> recent OSDI 14, will chime in more once I grasp it.
> >>
> >>
> https://www.usenix.org/conference/osdi14/technical-sessions/presentation/boutin
> >>
> >> Tim
> >>
> >>
> >> On Wed, Oct 8, 2014 at 11:03 PM, Parth Chandra <pchandra@maprtech.com>
> >> wrote:
> >>> Hi everyone,
> >>>
> >>> Aman, Jinfeng and I had an initial offline discussion about memory
> >> planning
> >>> in Drill. I’m summarizing the discussion below and hoping to initiate
a
> >>> discussion around some of these ideas.
> >>>
> >>>
> >>> Order of Execution
> >>> -------------------------
> >>> Assertion: For memory management Order of Execution is a fundamental
> >> issue.
> >>>
> >>> One of the problems with memory usage in the execution engine is that
> all
> >>> operators start up simultaneously and start allocating memory even
> though
> >>> the downstream operators may not be ready to consume their output.
> >>>
> >>> For example, in the plan below :
> >>>
> >>>      Agg
> >>>       |
> >>>      HJ2
> >>>     /    \
> >>>    HJ1    Scan3
> >>>   /   \
> >>> Scan1   Scan2
> >>>
> >>>
> >>> the scan operators all begin reading data simultaneously. In this case,
> >> the
> >>> Hash Joins are blocking operations and the output of Scan3 cannot be
> >>> consumed until HJ1 is ready to emit its results. If, say, Scan2 is on
> the
> >>> build side of the hash table for HJ1, then HJ1 will not emit any
> records
> >>> until Scan2 completes its operation. If Scan3 starts immediately, it is
> >>> consuming memory that could be utilized by Scan2. Instead, if we delay
> >> the
> >>> start of Scan3 until after HJ1 is ready to emit records, we can utilize
> >>> memory more efficiently.
> >>>
> >>> To address this, we can think of the query plan in terms of pipeline
> >>> chains, where a pipeline chain is a chain of operators terminated by a
> >>> blocking operator.
> >>>
> >>> In the example, there would be three pipeline chains :
> >>> PC1 : Scan1-HJ1-HJ2-Agg
> >>> PC2 : Scan 2
> >>> PC3 : Scan 3
> >>>
> >>> Now, we can see that we can start PC1 and PC2, but PC3 can be delayed
> >> until
> >>> PC2 is completed and PC1 has reached HJ2.
> >>>
> >>> One thing we need to consider is that multiple major fragments can be
> >> part
> >>> of a single pipeline chain. All these major fragments can begin
> execution
> >>> if the pipeline chain is ready to begin execution.
> >>>
> >>> We need to think this one through, though. There are probably many
> >> details
> >>> to be hashed out, though one thing is certain: the planner has to
> provide
> >>> some more information to the execution engine in terms of the ordering
> of
> >>> the pipeline chains. In other words, implementing this needs work on
> both
> >>> the planner and the execution engine. We also need to work out the
> >> details
> >>> of how the idea of a pipeline chain will be reconciled with the idea of
> >>> major/minor fragments which are currently the units of execution.
> >>>
> >>> Fragment memory limit
> >>> —----------------------------
> >>>   We have implemented a simple method to limit the use of memory by a
> >>> single fragment in 0.6 (it is disabled by default). This prevents a
> >> single
> >>> fragment from hogging too much memory while other fragments may be
> >> starved.
> >>> However the current implementation has some drawbacks:
> >>>  i) The fragment memory allocators have no idea of how much memory is
> >>> really needed by the fragment. The fragment limit is therefore
> determined
> >>> by dividing the available memory *equally* among the fragments. This is
> >> not
> >>> a fair method; a better choice would be to allocate fragment limits
> based
> >>> on the relative needs of the executing fragments.
> >>>  ii) The idea of limiting memory use by a fragment is a little too
> >> narrow,
> >>> since the purpose is to allow many queries, not fragments, to be able
> to
> >>> run together. The current mechanism favours queries that may have many
> >>> fragments over queries with fewer fragments which may have equivalent
> >>> memory needs.
> >>>
> >>>   To address this, we need to assign memory limits per query instead of
> >>> per fragment. In addition, we have some estimates at the query level
> for
> >>> the amount of memory that the query may need. We should probably change
> >> the
> >>> memory limit implementation to use this estimate and assign limits
> >>> proportionately. In addition, the limit should be calculated per query
> >> (and
> >>> assigned to all fragments of the same query). It might be even better
> if
> >> we
> >>> could estimate the memory requirement per fragment and use that as the
> >>> limit.
> >>>
> >>>    Again, some work needs to be done to figure out what data is
> >> available
> >>> that the allocators can use and what data can be calculated/estimated
> at
> >>> the planning stage to allow the allocators to distribute memory fairly.
> >>>
> >>>
> >>>    All critiques and criticisms are welcome, we’re hoping to get a good
> >>> discussion going around this.
> >>>
> >>>
> >>> Parth
> >>
>
>


-- 
 Steven Phillips
 Software Engineer

 mapr.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message