calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <>
Subject Re: Metadata providers for memory use and degree of parallelism
Date Fri, 27 Feb 2015 18:14:48 GMT
I've committed CALCITE-603 to master and I have pushed a new
1.1.0-incubating-SNAPSHOT to Apache Nexus.


On Feb 25, 2015, at 1:05 AM, Jesus Camachorodriguez <> wrote:


The providers and corresponding interfaces proposed in CALCITE-603 LGTM; I
think they cover the needs that were arising when we were moving forward
with cost-optimization/algorithm-selection in Hive's optimization phase.


On 2/24/15, 7:29 PM, "Julian Hyde" <> wrote:

I think your "phase" concept matches my "stage". I'll use "phase" too.

Agree, we should not separate the sender and receiver of an Exchange
into separate RelNodes. I didn't mean to give that impression. Maybe I
should call it "isPhaseTransition" rather than "isStageLeaf".

Hive/Tez does not have a concept of threads (as distinct from
processes). But I think the "split" concept will serve both Hive and

So maybe the question is, do you delineate these as two different
concepts or combine them into one (memory usage stages and
parallelization change phases (e.g. exchanges)).

Really good question. However, I'm going to punt. I think there is
more complexity over the horizon when we start modeling blocking
operators, phased pipelines where phase 1 starts releasing memory as
phase 2 starts allocating it.

"isStageLeaf" allows us to model a collection of consecutive operators
that function as a single unit for purposes of memory allocation, and
that's a good start. (If you want to detect a change in distribution,
look at the Distribution metadata.)

On Tue, Feb 24, 2015 at 10:59 AM, Jacques Nadeau <>

some thoughts

- We have generic (specific) terms we use to explain these concepts:
(major fragment) & slice (minor fragment or thread).
- It isn't clear to me why Parallelism needs to expose stageLeaf.  We
obviously aware of this fact but I'm not sure why it should be in the
framework as a specialized concept.

Note that for planning we also don't separate out the sending and
side of an exchange because it is often useful to reason about both
concepts at the same time.  For example affinity mapping.

To be clear, we mean phase (major fragment) as a unit between two
(or leaf fragment and root fragments which are delineated by an
Note that this is different from what we mean by stages which is a
concept that describes memory transition states.  For example, you might
have hash join.  The join will separate the build side versus the probe
side as two separate stages.  Other blocking or partially blocking
operators may also separate stages and memory accounting needs to
understand both stages and phases.

So maybe the question is, do you delineate these as two different
or combine them into one (memory usage stages and parallelization change
phases (e.g. exchanges)).

On Tue, Feb 24, 2015 at 10:45 AM, Julian Hyde <> wrote:


That makes sense. We basically need two carts: one in front of the
horse (before we've determined parallelism), and one behind (after we
have determined parallelism).

As I said to Jacques, you could also use the "behind" cart with a
place-holder value of parallelism. But you have to be careful that you
don't use this calculation to determine parallelism.

I have just checked into a new
metadata provider:

interface Size {
Double averageRowSize();
List<Double> averageColumnSizes();

Then I propose to add the following providers. (Recall that a metadata
providers is a mix-in interface to RelNode; each method is evaluated
for a particular RelNode.)

interface Parallelism {
/** Returns true if each physical operator implementing this
relational expression
  * belongs to a different process than its inputs. */
boolean isStageLeaf();

/** Returns the number of distinct splits of the data.
  * <p>For broadcast, where each copy is the same, returns 1. */
int splitCount();

interface Memory {
/** Returns the expected amount of memory, in bytes, required by a
physical operator
 * implementing this relational expression, across all splits.
 * <p>How much memory is used depends on the algorithm; for example,
an implementation
 *  Aggregate that builds a hash table requires approximately
rowCount * averageRowSize bytes,
 * whereas an implementation that assumes that the input is sorted
uses only averageRowSize. */
Double memory();

/** Returns the cumulative amount of memory, in bytes, required by
the physical operator
 * implementing this relational expression, and all operators within
the same stage,
 * across all splits. */
Double cumulativeMemoryWithinStage();

/** Returns the expected cumulative amount of memory, in bytes,
required by the physical operator
 * implementing this relational expression, and all operators within
the same stage,
 * within each split.
 * <p>Basic formula:
 *   cumulativeMemoryWithinStageSplit
 *     = cumulativeMemoryWithinStage / Parallelism.splitCount */
Double cumulativeMemoryWithinStageSplit();

If you have not yet determined the parallelism, use
cumulativeMemoryWithinStage; if you have determined parallelism, use

What do y'all think of my terminology: split, stage, stage-leaf,
process, memory. (It's not going to be the same as every engine, but
if it's at least clear.)


On Tue, Feb 24, 2015 at 10:34 AM, Julian Hyde <>

Yes, absolutely, that's the only sane way to approach this.

Calcite's metadata provider model does make it possible to use an

estimate of parallelism at one stage of planning, then use the real
at a later stage.

In my next email I propose some new metadata providers. Hopefully

could be fitted into Drill's planning process. Also advise whether the
terminology (processes, splits, stages, memory, size) seems intuitive
though not tied to a particular execution engine.


On Feb 24, 2015, at 10:23 AM, Jacques Nadeau <>


A little food for thought given our work on Drill...

We punted on trying to optimize all of this at once.  Very little

of the

core plan exploration process is influenced directly by degree of
parallelism.  We do manage between parallelized and not and use a


"hints" during planning to make some decisions where we need to.
Afterwards, we do a secondary pass where we determine parallelism


memory consumption.  Afterwards, we will do a replan with more


memory settings if our first plan turns out not to fit into


cluster memory.  While we may lose some plans that are ideal, it


whole process substantially easier to reason about.

We also did this because some of our initial experimentation where


included a number of these things as part of the planning process


the planning time to get out of hand.

On Mon, Feb 23, 2015 at 1:26 PM, Julian Hyde <>


I am currently helping the Hive team compute the memory usage of
operators (more precisely, a set of operators that live in the same
process) using a Calcite metadata provider.

Part of this task is to create an ³Average tuple size² metadata
provider based on a ³Average column size² metadata provider. This


fairly uncontroversial. (I¹ll create a jira case when is back up, and we can discuss details


as how to compute average size of columns computed using built-in
functions, user-defined functions, or aggregate functions.)

Also we want to create a ³CumulativeMemoryUseWithinProcess²


provider. That would start at 0 for a table scan or exchange


but increase for each operator building a hash-table or using sort
buffers until we reach the edge of the process.

But we realized that we also need to determine the degree of
parallelism, because we need to multiply AverageTupleSize not by
RowCount but by RowCountWithinPartition. (If the data set has 100M
rows, each 100 bytes and is bucketed 5 ways then each process will
need memory for 20M rows, i.e 20M rows * 100 bytes/row = 2GB.)

Now, if we are already at the stage of planning where we have


determined the degree of parallelism, then we could expose this as


ParallelismDegree metadata provider.

But maybe we¹re getting the cart before the horse? Maybe we should
compute the degree of parallelism AFTER we have assigned operators


processes. We actually want to choose the parallelism degree such


we can fit all necessary data in memory. There might be several
operators, all building hash-tables or using sort buffers. The


break point (at least in Tez) is where the data needs to be
re-partitioned, i.e. an Exchange operator.

So maybe we should instead compute


(I'm coining the term ³stage² to mean all operators within like
processes, summing over all buckets.) Let¹s suppose that, in the


data set, we have two operators, each of which needs to build a


table, and we have 2GB memory available to each process.
CumulativeMemoryUseWithinStage is 100M rows * 100 bytes/row * 2
hash-tables = 20GB. So, the parallelism degree should be 20GB /

2GB =


10 is a better choice of parallelism degree than 5. We lose a


(more nodes used, and more overhead combining the 10 partitions


1) but gain a lot (we save the cost of sending the data over the

Thoughts on this? How do other projects determine the degree of


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message