calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacques Nadeau <jacq...@apache.org>
Subject Re: Metadata providers for memory use and degree of parallelism
Date Tue, 24 Feb 2015 18:59:20 GMT
some thoughts

- We have generic (specific) terms we use to explain these concepts: phase
(major fragment) & slice (minor fragment or thread).
- It isn't clear to me why Parallelism needs to expose stageLeaf.  We are
obviously aware of this fact but I'm not sure why it should be in the
framework as a specialized concept.

Note that for planning we also don't separate out the sending and receiving
side of an exchange because it is often useful to reason about both
concepts at the same time.  For example affinity mapping.

To be clear, we mean phase (major fragment) as a unit between two exchanges
(or leaf fragment and root fragments which are delineated by an exchange).
Note that this is different from what we mean by stages which is a separate
concept that describes memory transition states.  For example, you might
have hash join.  The join will separate the build side versus the probe
side as two separate stages.  Other blocking or partially blocking
operators may also separate stages and memory accounting needs to
understand both stages and phases.

So maybe the question is, do you delineate these as two different concepts
or combine them into one (memory usage stages and parallelization change
phases (e.g. exchanges)).


On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <jhyde@apache.org> wrote:

> Jesus,
>
> That makes sense. We basically need two carts: one in front of the
> horse (before we've determined parallelism), and one behind (after we
> have determined parallelism).
>
> As I said to Jacques, you could also use the "behind" cart with a
> place-holder value of parallelism. But you have to be careful that you
> don't use this calculation to determine parallelism.
>
> I have just checked into
> https://github.com/julianhyde/incubator-calcite/tree/calcite-603 a new
> metadata provider:
>
> interface Size {
>   Double averageRowSize();
>   List<Double> averageColumnSizes();
> }
>
> Then I propose to add the following providers. (Recall that a metadata
> providers is a mix-in interface to RelNode; each method is evaluated
> for a particular RelNode.)
>
> interface Parallelism {
>   /** Returns true if each physical operator implementing this
> relational expression
>     * belongs to a different process than its inputs. */
>   boolean isStageLeaf();
>
>   /** Returns the number of distinct splits of the data.
>     *
>     * <p>For broadcast, where each copy is the same, returns 1. */
>  int splitCount();
> }
>
> interface Memory {
>   /** Returns the expected amount of memory, in bytes, required by a
> physical operator
>    * implementing this relational expression, across all splits.
>    *
>    * <p>How much memory is used depends on the algorithm; for example,
> an implementation
>    *  Aggregate that builds a hash table requires approximately
> rowCount * averageRowSize bytes,
>    * whereas an implementation that assumes that the input is sorted
> uses only averageRowSize. */
>   Double memory();
>
>   /** Returns the cumulative amount of memory, in bytes, required by
> the physical operator
>    * implementing this relational expression, and all operators within
> the same stage,
>    * across all splits. */
>   Double cumulativeMemoryWithinStage();
>
>   /** Returns the expected cumulative amount of memory, in bytes,
> required by the physical operator
>    * implementing this relational expression, and all operators within
> the same stage,
>    * within each split.
>    *
>    * <p>Basic formula:
>    *   cumulativeMemoryWithinStageSplit
>    *     = cumulativeMemoryWithinStage / Parallelism.splitCount */
> Double cumulativeMemoryWithinStageSplit();
> }
>
> If you have not yet determined the parallelism, use
> cumulativeMemoryWithinStage; if you have determined parallelism, use
> cumulativeMemoryWithinStageSplit.
>
> What do y'all think of my terminology: split, stage, stage-leaf,
> process, memory. (It's not going to be the same as every engine, but
> if it's at least clear.)
>
> Julian
>
> On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <julian@hydromatic.net>
> wrote:
> > Yes, absolutely, that's the only sane way to approach this.
> >
> > Calcite's metadata provider model does make it possible to use an
> estimate of parallelism at one stage of planning, then use the real number
> at a later stage.
> >
> > In my next email I propose some new metadata providers. Hopefully they
> could be fitted into Drill's planning process. Also advise whether the
> terminology (processes, splits, stages, memory, size) seems intuitive even
> though not tied to a particular execution engine.
> >
> > Julian
> >
> >
> >> On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <jacques@apache.org>
> wrote:
> >>
> >> A little food for thought given our work on Drill...
> >>
> >> We punted on trying to optimize all of this at once.  Very little of the
> >> core plan exploration process is influenced directly by degree of
> >> parallelism.  We do manage between parallelized and not and use a few
> >> "hints" during planning to make some decisions where we need to.
> >> Afterwards, we do a secondary pass where we determine parallelism and
> >> memory consumption.  Afterwards, we will do a replan with more
> conservative
> >> memory settings if our first plan turns out not to fit into available
> >> cluster memory.  While we may lose some plans that are ideal, it makes
> the
> >> whole process substantially easier to reason about.
> >>
> >> We also did this because some of our initial experimentation where we
> >> included a number of these things as part of the planning process caused
> >> the planning time to get out of hand.
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <jhyde@apache.org> wrote:
> >>
> >>> I am currently helping the Hive team compute the memory usage of
> >>> operators (more precisely, a set of operators that live in the same
> >>> process) using a Calcite metadata provider.
> >>>
> >>> Part of this task is to create an “Average tuple size” metadata
> >>> provider based on a “Average column size” metadata provider. This is
> >>> fairly uncontroversial. (I’ll create a jira case when
> >>> https://issues.apache.org/ is back up, and we can discuss details such
> >>> as how to compute average size of columns computed using built-in
> >>> functions, user-defined functions, or aggregate functions.)
> >>>
> >>> Also we want to create a “CumulativeMemoryUseWithinProcess” metadata
> >>> provider. That would start at 0 for a table scan or exchange consumer,
> >>> but increase for each operator building a hash-table or using sort
> >>> buffers until we reach the edge of the process.
> >>>
> >>> But we realized that we also need to determine the degree of
> >>> parallelism, because we need to multiply AverageTupleSize not by
> >>> RowCount but by RowCountWithinPartition. (If the data set has 100M
> >>> rows, each 100 bytes and is bucketed 5 ways then each process will
> >>> need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.)
> >>>
> >>> Now, if we are already at the stage of planning where we have already
> >>> determined the degree of parallelism, then we could expose this as a
> >>> ParallelismDegree metadata provider.
> >>>
> >>> But maybe we’re getting the cart before the horse? Maybe we should
> >>> compute the degree of parallelism AFTER we have assigned operators to
> >>> processes. We actually want to choose the parallelism degree such that
> >>> we can fit all necessary data in memory. There might be several
> >>> operators, all building hash-tables or using sort buffers. The natural
> >>> break point (at least in Tez) is where the data needs to be
> >>> re-partitioned, i.e. an Exchange operator.
> >>>
> >>> So maybe we should instead compute “CumulativeMemoryUseWithinStage”.
> >>> (I'm coining the term “stage” to mean all operators within like
> >>> processes, summing over all buckets.) Let’s suppose that, in the above
> >>> data set, we have two operators, each of which needs to build a hash
> >>> table, and we have 2GB memory available to each process.
> >>> CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2
> >>> hash-tables = 20GB. So, the parallelism degree should be 20GB / 2GB =
> >>> 10.
> >>>
> >>> 10 is a better choice of parallelism degree than 5. We lose a little
> >>> (more nodes used, and more overhead combining the 10 partitions into
> >>> 1) but gain a lot (we save the cost of sending the data over the
> >>> network).
> >>>
> >>> Thoughts on this? How do other projects determine the degree of
> >>> parallelism?
> >>>
> >>> Julian
> >>>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message