drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ben-Zvi <...@git.apache.org>
Subject [GitHub] drill pull request #1248: DRIL-6027: Implement Spilling for the Hash-Join
Date Wed, 02 May 2018 02:24:21 GMT
GitHub user Ben-Zvi opened a pull request:


    DRIL-6027: Implement Spilling for the Hash-Join

    This PR covers the work to enable the Hash-Join operator (*HJ*) to spill - when its limited
memory becomes too small to hold the incoming data. 
     @ilooner is a co-contributor of this work.
    Below is a high level description of the main changes, to help the reviewers. More design
detail is available in the design document (https://docs.google.com/document/d/1-c_oGQY4E5d58qJYv_zc7ka834hSaB3wDQwqKcMoSAI/)
    Some of this work follows a prior similar work done for the Hash-Aggregate (*HAG*) operator;
some similarity to the HAG is mentioned to help reviewrs familiar with those changes.
    h2. Partitions:
    Just like the HAG spilling, the main idea to enable spilling is to split the incoming
rows into separate *Partitions*, such that the HJ can gradually adopt to a memory pressure
situation by picking an in-memory partition and spilling it as the need arises, thus freeing
some memory.
    Unlike the HAG, the HJ has two incomings - the build/inner/right and the probe/outer/left.
The HJ partitions its Build side first, and if needed, may spill some of these partitions
as data is read. Later the Probe side is read and partitioned the same way, where outer partitions
matching spilled inner partitions are spilled as well - unconditionally.
    h6. {{HashPartition}} class:
    A new class {{HashPartition}} was created to encapsulate the work of each partition; this
class handles the pair - the build-side partition and its matching probe-side partition. Most
of its code was extracted from prior code in {{HashJoinBatch}}.
    h4. Hash Values:
    The hash-values are computed at first time, then saved into a special column (named "Hash_Values"),
which may be spilled, etc. This avoids recomputation (unlike the HAG, which recomputes). After
reading a batch from a spill file, this Hash-values vector is separated (into {{read_HV_vector}})
and used instead of computing the hash values.
    h4. Build Hash Table:
    Unlike the HAG - the hash-table (and "helper") are built (per each inner partition) only
*after* that whole partition was read into memory. (This avoids wasted work, in case the partition
needs to spill). Another improvement: As the number of entries is known at that final time
(ignoring duplicates), then the hash table can be initially sized right, avoiding the need
for later costly resizings (see {{hashTable.updateInitialCapacity()}}). 
    h4. Same as the HAG:
    * Using the {{SpillSet}} class.
    * Recursive spilling. (Nearly the same code - see {{innerNext()}} in {{HashJoinBatch.java}}).
Except that the HJ may have duplicate entries - so when the spill cycle has consumed more
than 20 bits of the hash value, then err.
    * Option controlling the number of partitions (and when that number is 1 --> spilling
is disabled).
    h6. Avoid copying:
    Copying the incoming build data into the partitions' batches is a new extra step, adding
some overhead. To match performance with prior Drill, in case of a single partition (no spilling,
no memory checks) -- the incoming vectors are used as is, without copying. Future work may
extend this for the general case (involving memory checks, etc.)
    h2. Memory Calculations:
    h4. Initial memory allocation:
    The HJ was made a "buffered" operator (see {{isBufferedOperator()}}, just like the HAG
and the External Sort), hence gets assigned an equal memory share (out of the "memory per
query per node"; see {{setupBufferedOpsMemoryAllocations()}}). Except when the number of partitions
is forced to be 1, when it "falls back" to the "old uncontrolled" behavior (similar to what
was done for the HAG).
    h4. Memory Calculator:
    The memory calculator is knowlegable of the current and future memory needs (including
current memory usage of all the partitions, an outgoing batch, an incoming outer batch, and
the hash tables and "helpers"). The calculator is used first to find an optimal number of
partitions (starting from a number controll by {{hashjoin_num_partitions}}, default 32, and
lowering if that number requires too much memory). The second use of the calculator is to
determine if a spill is needed, prior to allocating more memory (see {{shouldSpill()}}). This
chack is performed at two places: When reading the build side and about to allocate a new
batch (see {{appendInnerRow()}}). And when hash tables (and helpers) are allocated for the
in-memory partitions (in {{executeBuildPhase()}}).
    h6. Implementation:
    The {{HashJoinMemoryCalculator}} is an interface, implemented by {{HashJoinMemoryCalculatorImpl}}
for regular work. For testing, we can limit the number of batches {{max_batches_in_memory}}
- and then anther implementation takes over - {{HashJoinMechanicalMemoryCalculator}}, which
uses the number of batches as the spilling trigger. When memory checks are disabled (e.g.,
when using a single partition), then a special no-op calculator is used instead - {{NoopBuildSidePartitioningImpl}}.
    The memory estimates rely on statistics derived from the actual data. Thus the HJ now,
during schema discovery, also tries to "sniff forward" and look at batches with real-data
(see {{sniffNonEmptyBatch}}). 
    h2. Generated Code:
    The prior HJ code that generated code (used to move data from Probe and Build incomings
into the Outgoing batch) was eliminated, replaced by various {{appendRow}} methods (see {{VectorContainer.java}})
that are based on the new {{copyEntry()}} method supported by all vector types.
    h2. HashTable using VectorContainer:
    The HashTable was modified to use {{VectorContainer}} instead of {{RecordBatch}}. Thus
a {{getContainer()}} method was added to the {{AbstractRecordBatch}} and all its subclasses.
    h2: {{HashJoinProbe}}:
    The {{HashJoinProbe[template]}} were eliminated (partly because of the removal of the
generated code) - the remaining code was mostly merged into the end of {{HashJoinBatch.java}}.
    h2. Other:
    * The batches used by the partitions are internal to the HJ. Their size is controlled
by an option {{num_rows_in_batch}}, defaule 1024.
    * Not using the {{HyperContainer}} anymore, just a list of containers.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ben-Zvi/drill DRILL-6027

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1248
commit ab63a01f46779296ab6cefc4b8c93aae37cc9d19
Author: Ben-Zvi <bben-zvi@...>
Date:   2018-02-14T20:36:39Z

    DRILL-6027: Initial implementation of HashJoin spill, without memory limits checks yet

commit 42fb19ba6fcc38b105601a167c32ea7ca137c407
Author: Timothy Farkas <timothyfarkas@...>
Date:   2018-02-13T00:49:29Z

     - Added memory claculator
     - Added unit tests and docs.
     - Fixed IOB caused by output vector allocation.
     - Don't double count records that were spilled in HashJoin

commit 35f6b9cca7ee23de50ef4863ee9aec7f3998985f
Author: Ben-Zvi <bben-zvi@...>
Date:   2018-04-28T04:59:25Z

      - Added fallback option for HashJoin.
      - No copy of incoming for single partition, and avoid HT resize.
      - Fix memory leak when cancelling while spill file is read
      - get correct schema when probe side is empty



View raw message