drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [2/2] git commit: DRILL-1111: 1. Create new batch holder for hash table if an insertion fails due to space constraints in existing batch holder. 2. Use allocateNew() for hash aggr and hash table allocations. 3. Create new output batch if output values fa
Date Mon, 07 Jul 2014 22:50:12 GMT
DRILL-1111: 1. Create new batch holder for hash table if an insertion fails due to space constraints
in existing batch holder. 2. Use allocateNew() for hash aggr and hash table allocations. 3.
Create new output batch if output values fails the first time due to space constraints. 4.
Use splitAndTransfer to transfer the keys from hash table's container to the output batch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dcc94a39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dcc94a39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dcc94a39

Branch: refs/heads/master
Commit: dcc94a3980498761f81d8e87cc04097ebcfdb3f7
Parents: a59debb
Author: Jacques Nadeau <jacques@apache.org>
Authored: Wed Jun 25 09:00:16 2014 -0700
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon Jul 7 14:50:05 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   |   9 +-
 .../impl/aggregate/HashAggTemplate.java         | 152 ++++++++++---------
 .../physical/impl/aggregate/HashAggregator.java |   3 +-
 .../physical/impl/common/ChainedHashTable.java  |   2 +-
 .../exec/physical/impl/common/HashTable.java    |   5 +-
 .../physical/impl/common/HashTableTemplate.java |  57 +++++--
 .../exec/physical/impl/join/HashJoinBatch.java  |   2 +-
 .../planner/sql/handlers/DefaultSqlHandler.java |   2 +-
 8 files changed, 139 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 3609c02..b30a357 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -193,8 +193,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate>
{
     ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
 
     container.clear();
-    List<VectorAllocator> keyAllocators = Lists.newArrayList();
-    List<VectorAllocator> valueAllocators = Lists.newArrayList();
 
     int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().length
: 0;
     int numAggrExprs = (popConfig.getAggrExprs() != null) ? popConfig.getAggrExprs().length
: 0;
@@ -213,7 +211,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate>
{
 
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
-      keyAllocators.add(VectorAllocator.getAllocator(vv, 200));
 
       // add this group-by vector to the output container
       groupByOutFieldIds[i] = container.add(vv);
@@ -229,7 +226,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate>
{
 
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
-      valueAllocators.add(VectorAllocator.getAllocator(vv, 200));
       aggrOutFieldIds[i] = container.add(vv);
 
       aggrExprs[i] = new ValueVectorWriteExpression(aggrOutFieldIds[i], expr, true);
@@ -251,9 +247,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate>
{
               oContext.getAllocator(), incoming, this,
               aggrExprs,
               cgInner.getWorkspaceTypes(),
-              groupByOutFieldIds,
-              keyAllocators.toArray(new VectorAllocator[keyAllocators.size()]),
-              valueAllocators.toArray(new VectorAllocator[valueAllocators.size()]));
+              groupByOutFieldIds, 
+              this.container); 
 
     return agg;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index eaf2811..0a44f3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import javax.inject.Named;
@@ -28,10 +29,8 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -43,7 +42,6 @@ import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
-import org.apache.drill.exec.physical.impl.common.HashTableTemplate.BatchHolder;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -54,8 +52,6 @@ import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.expr.holders.BigIntHolder;
 
 import com.google.common.collect.Lists;
 
@@ -80,8 +76,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   private RecordBatch incoming;
   private BatchSchema schema;
   private HashAggBatch outgoing;
-  private VectorAllocator[] keyAllocators;
-  private VectorAllocator[] valueAllocators;
+  private VectorContainer outContainer;
   private FragmentContext context;
   private BufferAllocator allocator;
 
@@ -89,6 +84,9 @@ public abstract class HashAggTemplate implements HashAggregator {
   private HashTable htable;
   private ArrayList<BatchHolder> batchHolders;
   private IntHolder htIdxHolder; // holder for the Hashtable's internal index returned by
put()
+  private IntHolder outStartIdxHolder;
+  private IntHolder outNumRecordsHolder;
+  private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
 
   List<VectorAllocator> wsAllocators = Lists.newArrayList();  // allocators for the
workspace vectors
   ErrorCollector collector = new ErrorCollectorImpl();
@@ -132,7 +130,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         MaterializedField outputField = materializedValueFields[i];
         // Create a type-specific ValueVector for this value
         vector = TypeHelper.getNewVector(outputField, allocator) ;
-        VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE)
;
+        vector.allocateNew();
 
         aggrValuesContainer.add(vector) ;
       }
@@ -149,16 +147,27 @@ public abstract class HashAggTemplate implements HashAggregator {
       setupInterior(incoming, outgoing, aggrValuesContainer);
     }
 
-    private boolean outputValues() {
-      for (int i = 0; i <= maxOccupiedIdx; i++) {
+    private boolean outputValues(IntHolder outStartIdxHolder, IntHolder outNumRecordsHolder)
{
+      outStartIdxHolder.value = batchOutputCount;
+      outNumRecordsHolder.value = 0;
+      boolean status = true;
+      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
         if (outputRecordValues(i, batchOutputCount) ) {
           if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount)
;
           batchOutputCount++;
+          outNumRecordsHolder.value++;
         } else {
-          return false;
+          status = false;
+          break;
         }
       }
-      return true;
+      // It's not a failure if only some records were output (at least 1) .. since out-of-memory
+      // conditions may prevent all records from being output; the caller has the responsibility
to
+      // allocate more memory and continue outputting more records
+      if (!status && outNumRecordsHolder.value > 0) {
+        status = true;
+      }
+      return status;
     }
 
     private void clear() {
@@ -169,10 +178,10 @@ public abstract class HashAggTemplate implements HashAggregator {
       return maxOccupiedIdx + 1;
     }
 
-    private int getOutputCount() {
-      return batchOutputCount;
+    private int getNumPendingOutput() {
+      return getNumGroups() - batchOutputCount;
     }
-
+    
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -193,8 +202,8 @@ public abstract class HashAggTemplate implements HashAggregator {
                     BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
                     LogicalExpression[] valueExprs,
                     List<TypedFieldId> valueFieldIds,
-                    TypedFieldId[] groupByOutFieldIds,
-                    VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
+                    TypedFieldId[] groupByOutFieldIds, 
+                    VectorContainer outContainer) 
     throws SchemaChangeException, ClassTransformationException, IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
@@ -209,9 +218,8 @@ public abstract class HashAggTemplate implements HashAggregator {
     this.allocator = allocator;
     this.incoming = incoming;
     this.schema = incoming.getSchema();
-    this.keyAllocators = keyAllocators;
-    this.valueAllocators = valueAllocators;
     this.outgoing = outgoing;
+    this.outContainer = outContainer;
 
     this.hashAggrConfig = hashAggrConfig;
 
@@ -226,6 +234,9 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
 
     this.htIdxHolder = new IntHolder();
+    this.outStartIdxHolder = new IntHolder();
+    this.outNumRecordsHolder = new IntHolder();
+    
     materializedValueFields = new MaterializedField[valueFieldIds.size()];
 
     if (valueFieldIds.size() > 0) {
@@ -239,6 +250,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     ChainedHashTable ht = new ChainedHashTable(htConfig, context, allocator, incoming, null
/* no incoming probe */, outgoing) ;
     this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
 
+    numGroupByOutFields = groupByOutFieldIds.length;
     batchHolders = new ArrayList<BatchHolder>();
     addBatchHolder();
 
@@ -302,7 +314,7 @@ public abstract class HashAggTemplate implements HashAggregator {
               // outcome = out;
 
               buildComplete = true;
-
+              
               updateStats(htable);
 
               // output the first batch; remaining batches will be output
@@ -333,18 +345,17 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
-  private void allocateOutgoing(int numOutputRecords) {
-
-    for (VectorAllocator a : keyAllocators) {
-      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a,
numOutputRecords);
-      a.alloc(numOutputRecords);
+  private void allocateOutgoing() {
+    // Skip the keys and only allocate for outputting the workspace values
+    // (keys will be output through splitAndTransfer)
+    Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+    for (int i=0; i < numGroupByOutFields; i++) {
+      outgoingIter.next();
     }
-
-    for (VectorAllocator a : valueAllocators) {
-      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a,
numOutputRecords);
-      a.alloc(numOutputRecords);
+    while (outgoingIter.hasNext()) {
+      ValueVector vv = outgoingIter.next().getValueVector();
+      vv.allocateNew();
     }
-
   }
 
   @Override
@@ -366,6 +377,8 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
     htIdxHolder = null;
     materializedValueFields = null;
+    outStartIdxHolder = null;
+    outNumRecordsHolder = null;
 
     if (batchHolders != null) {
       for (BatchHolder bh : batchHolders) {
@@ -376,12 +389,6 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
   }
 
-  private AggOutcome tooBigFailure(){
-    context.fail(new Exception(TOO_BIG_ERROR));
-    this.outcome = IterOutcome.STOP;
-    return AggOutcome.CLEANUP_AND_RETURN;
-  }
-
   private final AggOutcome setOkAndReturn(){
     if(first){
       this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -416,54 +423,44 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     bh.setup();
   }
-
-  // output the keys and values for a particular batch holder
-  private boolean outputKeysAndValues(int batchIdx) {
-
-    allocateOutgoing(batchIdx);
-
-    if (! this.htable.outputKeys(batchIdx)) {
-      return false;
-    }
-    if (! batchHolders.get(batchIdx).outputValues()) {
-      return false;
-    }
-
-    outBatchIndex = batchIdx+1;
-
-    if (outBatchIndex == batchHolders.size()) {
-      allFlushed = true;
-    }
-
-    return true;
-  }
-
+  
   public IterOutcome outputCurrentBatch() {
     if (outBatchIndex >= batchHolders.size()) {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }
 
-    // get the number of groups in the batch holder corresponding to this batch index
-    int batchOutputRecords = batchHolders.get(outBatchIndex).getNumGroups();
+    // get the number of records in the batch holder that are pending output
+    int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
     
-    if (!first && batchOutputRecords == 0) {
+    if (!first && numPendingOutput == 0) {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }
 
-    allocateOutgoing(batchOutputRecords);
+    allocateOutgoing();
 
-    boolean outputKeysStatus = this.htable.outputKeys(outBatchIndex) ;
-    boolean outputValuesStatus = batchHolders.get(outBatchIndex).outputValues();
+    boolean outputKeysStatus = true;
+    boolean outputValuesStatus = true;
+    
+    outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder,
outNumRecordsHolder);
+    int numOutputRecords = outNumRecordsHolder.value;
+    
+    if (EXTRA_DEBUG_1) {
+      logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value,
outNumRecordsHolder.value);
+    }
+    if (outputValuesStatus) {
+      outputKeysStatus = this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value,
outNumRecordsHolder.value) ;
+    }
+    
     if (outputKeysStatus && outputValuesStatus) {
 
       // set the value count for outgoing batch value vectors
       for(VectorWrapper<?> v : outgoing) {
-        v.getValueVector().getMutator().setValueCount(batchOutputRecords);
+        v.getValueVector().getMutator().setValueCount(numOutputRecords);
       }
 
-      outputCount += batchOutputRecords;
+      outputCount += numOutputRecords;
 
       if(first){
         this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -471,9 +468,9 @@ public abstract class HashAggTemplate implements HashAggregator {
         this.outcome = IterOutcome.OK;
       }
 
-      logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex,
batchOutputRecords);
+      logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex,
numOutputRecords);
 
-      lastBatchOutputCount = batchOutputRecords;
+      lastBatchOutputCount = numOutputRecords;
       outBatchIndex++;
       if (outBatchIndex == batchHolders.size()) {
         allFlushed = true;
@@ -484,8 +481,20 @@ public abstract class HashAggTemplate implements HashAggregator {
         this.cleanup();
       }
     } else {
-      if (!outputKeysStatus) context.fail(new Exception("Failed to output keys for current
batch !"));
-      if (!outputValuesStatus) context.fail(new Exception("Failed to output values for current
batch !"));
+      if (!outputKeysStatus) {
+        logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex);

+        for(VectorWrapper<?> v : outContainer) {
+          logger.debug("At the time of failure, size of valuevector in outContainer = {}.",
v.getValueVector().getValueCapacity());
+        }        
+        context.fail(new Exception("Failed to output keys for current batch !"));
+      }
+      if (!outputValuesStatus) {
+        logger.debug("Failed to output values for current batch index: {} ", outBatchIndex);
+        for(VectorWrapper<?> v : outContainer) {
+          logger.debug("At the time of failure, size of valuevector in outContainer = {}.",
v.getValueVector().getValueCapacity());
+        }
+        context.fail(new Exception("Failed to output values for current batch !"));
+      }
       this.outcome = IterOutcome.STOP;
     }
 
@@ -524,7 +533,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     }
     */
 
-    HashTable.PutStatus putStatus = htable.put(incomingRowIdx, htIdxHolder) ;
+    HashTable.PutStatus putStatus = htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count
*/) ;
 
     if (putStatus != HashTable.PutStatus.PUT_FAILED) {
       int currentIdx = htIdxHolder.value;
@@ -561,6 +570,7 @@ public abstract class HashAggTemplate implements HashAggregator {
 
     }
 
+    logger.debug("HashAggr Put failed ! incomingRowIdx = {}, hash table size = {}.", incomingRowIdx,
htable.size());
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index b94f299..421bd53 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public interface HashAggregator {
@@ -48,7 +49,7 @@ public interface HashAggregator {
                              HashAggBatch outgoing, LogicalExpression[] valueExprs, 
                              List<TypedFieldId> valueFieldIds,
                              TypedFieldId[] keyFieldIds,
-                             VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
+                             VectorContainer outContainer) 
     throws SchemaChangeException, IOException, ClassTransformationException;
 
   public abstract IterOutcome getOutcome();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index e1179d0..7522488 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -141,7 +141,7 @@ public class ChainedHashTable {
       final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
       // create a type-specific ValueVector for this key
       ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
-      VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE);
+      vv.allocateNew();
       htKeyFieldIds[i] = htContainerOrig.add(vv);
       
       i++;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 9f5d4f8..375836a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.common;
 
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -48,7 +49,7 @@ public interface HashTable {
                     RecordBatch incomingBuild, RecordBatch incomingProbe, 
                     RecordBatch outgoing, VectorContainer htContainerOrig);
 
-  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder);
+  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder, int retryCount);
   
   public int containsKey(int incomingRowIdx, boolean isProbe);
 
@@ -60,7 +61,7 @@ public interface HashTable {
 
   public void clear();
 
-  public boolean outputKeys(int batchIdx);
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex,
int numRecords);
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 0849e6f..45b9852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.common;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import javax.inject.Named;
 
@@ -27,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
@@ -36,7 +38,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public abstract class HashTableTemplate implements HashTable {
 
@@ -124,7 +125,7 @@ public abstract class HashTableTemplate implements HashTable {
         htContainer = new VectorContainer();
         for (VectorWrapper<?> w : htContainerOrig) {
           ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
-          VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE);
+          vv.allocateNew();
           htContainer.add(vv);
         }
       }
@@ -275,14 +276,28 @@ public abstract class HashTableTemplate implements HashTable {
       hashValues = newHashValues;
     }
 
-    private boolean outputKeys() {
+    private boolean outputKeys(VectorContainer outContainer, int outStartIndex, int numRecords)
{
 
       /** for debugging
         BigIntVector vv0 = getValueVector(0);
         BigIntHolder holder = new BigIntHolder();
       */
-
-      for (int i = 0; i <= maxOccupiedIdx; i++) {
+      
+      // set the value count for htContainer's value vectors before the transfer ..
+      setValueCount();
+      
+      Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+      
+      for (VectorWrapper<?> sourceWrapper : htContainer) {
+        ValueVector sourceVV = sourceWrapper.getValueVector();
+        ValueVector targetVV = outgoingIter.next().getValueVector();
+        TransferPair tp = sourceVV.makeTransferPair(targetVV);
+        tp.splitAndTransfer(outStartIndex, numRecords);
+      }
+      
+/*
+      logger.debug("Attempting to output keys for batch index: {} from index {} to maxOccupiedIndex
{}.", this.batchIndex, 0, maxOccupiedIdx);
+      for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
         if (outputRecordKeys(i, batchOutputCount) ) {
           if (EXTRA_DEBUG) logger.debug("Outputting keys to output index: {}", batchOutputCount)
;
 
@@ -297,9 +312,17 @@ public abstract class HashTableTemplate implements HashTable {
           return false;
         }
       }
+ */     
       return true;
     }
 
+    private void setValueCount() {
+      for (VectorWrapper<?> vw : htContainer) {
+        ValueVector vv = vw.getValueVector();
+        vv.getMutator().setValueCount(maxOccupiedIdx + 1); 
+      }
+    }
+    
     private void dump(int idx) {
       while (true) {
         int idxWithinBatch = idx & BATCH_MASK;
@@ -443,8 +466,23 @@ public abstract class HashTableTemplate implements HashTable {
 
         return rounded;
   }
+  
+  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder, int retryCount) {
+    HashTable.PutStatus putStatus = put(incomingRowIdx, htIdxHolder) ;
+    int count = retryCount;
+    int numBatchHolders;
+    while (putStatus == PutStatus.PUT_FAILED && count > 0) {
+      logger.debug("Put into hash table failed .. Retrying with new batch holder...");
+      numBatchHolders = batchHolders.size();
+      this.addBatchHolder();
+      freeIndex = numBatchHolders * BATCH_SIZE;
+      putStatus = put(incomingRowIdx, htIdxHolder);
+      count--;
+    }
+    return putStatus;
+  }
 
-  public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) {
+  private PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) {
 
     int hash = getHashBuild(incomingRowIdx);
     hash = Math.abs(hash);
@@ -467,7 +505,8 @@ public abstract class HashTableTemplate implements HashTable {
 
       if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch))
{
         // update the start index array
-        startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
+        boolean status = startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()),
currentIdx);
+        assert status : "Unable to set start indices in the hash table.";
         htIdxHolder.value = currentIdx;
         return PutStatus.KEY_ADDED;
       }
@@ -649,9 +688,9 @@ public abstract class HashTableTemplate implements HashTable {
     numResizing++;
   }
 
-  public boolean outputKeys(int batchIdx) {
+  public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex,
int numRecords) {
     assert batchIdx < batchHolders.size();
-    if (! batchHolders.get(batchIdx).outputKeys()) {
+    if (! batchHolders.get(batchIdx).outputKeys(outContainer, outStartIndex, numRecords))
{
       return false;
     }
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index e24f250..5fc3125 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -333,7 +333,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP>
{
                     // For every record in the build batch , hash the key columns
                     for (int i = 0; i < currentRecordCount; i++) {
 
-                        HashTable.PutStatus status = hashTable.put(i, htIndex);
+                        HashTable.PutStatus status = hashTable.put(i, htIndex, 1 /* retry
count */);
 
                         if (status != HashTable.PutStatus.PUT_FAILED) {
                             /* Use the global index returned by the hash table, to store

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dcc94a39/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 14db66c..2fcdef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -201,7 +201,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
      * Finally, Make sure that the no rels are repeats.
      * This could happen in the case of querying the same table twice as Optiq may canonicalize
these.
      */
-    phyRelNode = ExcessiveExchangeIdentifier.removeExcessiveEchanges(phyRelNode, targetSliceSize);
+    phyRelNode = RelUniqifier.uniqifyGraph(phyRelNode);
 
     return phyRelNode;
   }


Mime
View raw message