drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ve...@apache.org
Subject [4/5] drill git commit: DRILL-1993: Fix allocation issues in HashTable and HashAgg to reduce memory waste
Date Fri, 16 Jan 2015 23:14:58 GMT
DRILL-1993: Fix allocation issues in HashTable and HashAgg to reduce memory waste


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bd4d669d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bd4d669d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bd4d669d

Branch: refs/heads/master
Commit: bd4d669d1b836d6990eb3701e81c56f3d109db18
Parents: 8cdab2e
Author: vkorukanti <venki.korukanti@gmail.com>
Authored: Fri Jan 9 01:41:53 2015 -0800
Committer: vkorukanti <venki.korukanti@gmail.com>
Committed: Fri Jan 16 14:03:32 2015 -0800

----------------------------------------------------------------------
 .../impl/aggregate/HashAggTemplate.java         | 40 +++++++++++++++++---
 .../physical/impl/common/ChainedHashTable.java  |  1 -
 .../exec/physical/impl/common/HashTable.java    |  3 ++
 .../physical/impl/common/HashTableTemplate.java | 28 ++++++++++----
 .../apache/drill/exec/vector/ObjectVector.java  |  6 +++
 5 files changed, 63 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/bd4d669d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index dc2b05c..2fd5ce1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -44,7 +44,10 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.ObjectVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 import javax.inject.Named;
@@ -128,7 +131,24 @@ public abstract class HashAggTemplate implements HashAggregator {
         MaterializedField outputField = materializedValueFields[i];
         // Create a type-specific ValueVector for this value
         vector = TypeHelper.getNewVector(outputField, allocator);
-        vector.allocateNew();
+
+        // Try to allocate space to store BATCH_SIZE records. Key stored at index i in HashTable
has its workspace
+        // variables (such as count, sum etc) stored at index i in HashAgg. HashTable and
HashAgg both have
+        // BatchHolders. Whenever a BatchHolder in HashAgg reaches its capacity, a new BatchHolder
is added to
+        // HashTable. If HashAgg can't store BATCH_SIZE records in a BatchHolder, it leaves
empty slots in current
+        // BatchHolder in HashTable, causing the HashTable to be space inefficient. So it
is better to allocate space
+        // to fit as close to as BATCH_SIZE records.
+        if (vector instanceof FixedWidthVector) {
+          ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
+        } else if (vector instanceof VariableWidthVector) {
+          ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE
* HashTable.BATCH_SIZE,
+              HashTable.BATCH_SIZE);
+        } else if (vector instanceof ObjectVector) {
+          ((ObjectVector)vector).allocateNew(HashTable.BATCH_SIZE);
+        } else {
+          vector.allocateNew();
+        }
+
         capacity = Math.min(capacity, vector.getValueCapacity());
 
         aggrValuesContainer.add(vector);
@@ -149,10 +169,14 @@ public abstract class HashAggTemplate implements HashAggregator {
       outStartIdxHolder.value = batchOutputCount;
       outNumRecordsHolder.value = 0;
       boolean status = true;
-      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
-        if (outputRecordValues(i, batchOutputCount)) {
+
+      // Output records starting from 'batchOutputCount' in current batch until there are
no more records
+      // or output vectors have no space left. In destination vectors, start filling records
from 0th position.
+      while(batchOutputCount <= maxOccupiedIdx) {
+        if (outputRecordValues(batchOutputCount, outNumRecordsHolder.value)) {
           if (EXTRA_DEBUG_2) {
-            logger.debug("Outputting values to output index: {}", batchOutputCount);
+            logger.debug("Outputting values from input index {} to output index: {}",
+                batchOutputCount, outNumRecordsHolder.value);
           }
           batchOutputCount++;
           outNumRecordsHolder.value++;
@@ -256,7 +280,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     numGroupByOutFields = groupByOutFieldIds.length;
     batchHolders = new ArrayList<BatchHolder>();
-    addBatchHolder();
+    // First BatchHolder is created when the first put request is received.
 
     doSetup(incoming);
   }
@@ -494,7 +518,11 @@ public abstract class HashAggTemplate implements HashAggregator {
       logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex,
numOutputRecords);
 
       lastBatchOutputCount = numOutputRecords;
-      outBatchIndex++;
+      // If there are no more records to output, go to the next batch. If there are any records
left refer to the
+      // same BatchHolder. Remaining records will be outputted in next outputCurrentBatch()
call(s).
+      if (batchHolders.get(outBatchIndex).getNumPendingOutput() == 0) {
+        outBatchIndex++;
+      }
       if (outBatchIndex == batchHolders.size()) {
         allFlushed = true;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/bd4d669d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 322fd1f..b5cfdca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -163,7 +163,6 @@ public class ChainedHashTable {
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       // create a type-specific ValueVector for this key
       ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
-      vv.allocateNew();
       htKeyFieldIds[i] = htContainerOrig.add(vv);
 
       i++;

http://git-wip-us.apache.org/repos/asf/drill/blob/bd4d669d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 6966ba1..1ec74bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -51,6 +51,9 @@ public interface HashTable {
   static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
   static final public int BATCH_MASK = 0x0000FFFF;
 
+  /** Variable width vector size in bytes */
+  public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE;
+
   public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
       RecordBatch incomingBuild, RecordBatch incomingProbe,
       RecordBatch outgoing, VectorContainer htContainerOrig);

http://git-wip-us.apache.org/repos/asf/drill/blob/bd4d669d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index c80e97a..ba980d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -30,8 +30,10 @@ import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
 
 import javax.inject.Named;
 import java.util.ArrayList;
@@ -116,15 +118,24 @@ public abstract class HashTableTemplate implements HashTable {
     private BatchHolder(int idx) {
       this.batchIndex = idx;
 
-      if (idx == 0) {  // first batch holder can use the original htContainer
-        htContainer = htContainerOrig;
-      } else { // otherwise create a new one using the original's fields
-        htContainer = new VectorContainer();
-        for (VectorWrapper<?> w : htContainerOrig) {
-          ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+      htContainer = new VectorContainer();
+      for (VectorWrapper<?> w : htContainerOrig) {
+        ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+
+        // Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better
to allocate space for
+        // "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is
created when either BATCH_SIZE
+        // records are inserted or "key" vectors ran out of space. Allocating too less space
for "key" vectors will
+        // result in unused space in "hashValues" and "links" vectors in the BatchHolder.
Also for each new
+        // BatchHolder we create a SV4 vector of BATCH_SIZE in HashJoinHelper.
+        if (vv instanceof FixedWidthVector) {
+          ((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
+        } else if (vv instanceof VariableWidthVector) {
+          ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
+        } else {
           vv.allocateNew();
-          htContainer.add(vv);
         }
+
+        htContainer.add(vv);
       }
 
       links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
@@ -454,7 +465,7 @@ public abstract class HashTableTemplate implements HashTable {
 
     // Create the first batch holder
     batchHolders = new ArrayList<BatchHolder>();
-    addBatchHolder();
+    // First BatchHolder is created when the first put request is received.
 
     doSetup(incomingBuild, incomingProbe);
 
@@ -753,6 +764,7 @@ public abstract class HashTableTemplate implements HashTable {
 
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex,
int numRecords) {
     assert batchIdx < batchHolders.size();
+
     if (!batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords))
{
       return false;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/bd4d669d/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index b68f089..3c15db3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -97,6 +97,12 @@ public class ObjectVector extends BaseValueVector{
     addNewArray();
   }
 
+  public void allocateNew(int valueCount) throws OutOfMemoryRuntimeException {
+    while (maxCount < valueCount) {
+      addNewArray();
+    }
+  }
+
   @Override
   public boolean allocateNewSafe() {
     allocateNew();


Mime
View raw message