DRILL-5457: Spill implementation for Hash Aggregate closes #822 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c16e5f80 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c16e5f80 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c16e5f80 Branch: refs/heads/master Commit: c16e5f8072f3e5d18157767143f9ccc7669c4380 Parents: be43a9e Author: Boaz Ben-Zvi Authored: Mon Jun 19 19:04:30 2017 -0700 Committer: Paul Rogers Committed: Tue Jun 20 17:01:01 2017 -0700 ---------------------------------------------------------------------- .../src/resources/drill-override-example.conf | 22 + .../org/apache/drill/exec/ExecConstants.java | 22 + .../cache/VectorAccessibleSerializable.java | 56 + .../drill/exec/physical/base/AbstractBase.java | 28 +- .../exec/physical/base/PhysicalOperator.java | 15 + .../exec/physical/config/ExternalSort.java | 17 +- .../exec/physical/config/HashAggregate.java | 25 +- .../physical/impl/aggregate/HashAggBatch.java | 46 +- .../impl/aggregate/HashAggTemplate.java | 1113 +++++++++++++++--- .../physical/impl/aggregate/HashAggregator.java | 19 +- .../impl/aggregate/SpilledRecordbatch.java | 175 +++ .../physical/impl/common/ChainedHashTable.java | 10 +- .../exec/physical/impl/common/HashTable.java | 26 +- .../physical/impl/common/HashTableStats.java | 7 + .../physical/impl/common/HashTableTemplate.java | 255 ++-- .../exec/physical/impl/join/HashJoinBatch.java | 5 +- .../physical/impl/spill/RecordBatchSizer.java | 78 +- .../exec/physical/impl/spill/SpillSet.java | 59 +- .../impl/xsort/managed/ExternalSortBatch.java | 6 +- .../exec/planner/physical/AggPrelBase.java | 2 +- .../exec/planner/physical/AggPruleBase.java | 3 + .../exec/planner/physical/HashAggPrel.java | 2 +- .../exec/planner/physical/PlannerSettings.java | 5 + .../apache/drill/exec/record/RecordBatch.java | 2 +- .../server/options/SystemOptionManager.java | 4 + .../exec/util/MemoryAllocationUtilities.java | 21 +- .../apache/drill/exec/work/foreman/Foreman.java | 2 +- .../drill/exec/work/user/PlanSplitter.java | 2 +- .../src/main/resources/drill-module.conf | 35 +- .../java/org/apache/drill/TestBugFixes.java | 5 +- .../drill/TestTpchDistributedConcurrent.java | 2 +- .../physical/impl/agg/TestHashAggrSpill.java | 141 +++ .../physical/unit/BasicPhysicalOpUnitTest.java | 3 +- exec/jdbc/pom.xml | 1 + .../java/org/apache/drill/exec/rpc/RpcBus.java | 2 +- .../templates/VariableLengthVectors.java | 1 + 36 files changed, 1800 insertions(+), 417 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/distribution/src/resources/drill-override-example.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index b9d09a8..8010f85 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -142,6 +142,13 @@ drill.exec: { } }, cache.hazel.subnets: ["*.*.*.*"], + spill: { + # These options are common to all spilling operators. + # They can be overriden, per operator (but this is just for + # backward compatibility, and may be deprecated in the future) + directories : [ "/tmp/drill/spill" ], + fs : "file:///" + } sort: { purge.threshold : 100, external: { @@ -150,11 +157,26 @@ drill.exec: { batch.size : 4000, group.size : 100, threshold : 200, + # The 2 options below override the common ones + # they should be deprecated in the future directories : [ "/tmp/drill/spill" ], fs : "file:///" } } }, + hashagg: { + # The partitions divide the work inside the hashagg, to ease + # handling spilling. This initial figure is tuned down when + # memory is limited. + # Setting this option to 1 disables spilling ! + num_partitions: 32, + spill: { + # The 2 options below override the common ones + # they should be deprecated in the future + directories : [ "/tmp/drill/spill" ], + fs : "file:///" + } + }, memory: { top.max: 1000000000000, operator: { http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 18f69d5..537377d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -64,6 +64,12 @@ public interface ExecConstants { String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; + // Spill boot-time Options common to all spilling operators + // (Each individual operator may override the common options) + + String SPILL_FILESYSTEM = "drill.exec.spill.fs"; + String SPILL_DIRS = "drill.exec.spill.directories"; + // External Sort Boot configuration String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size"; @@ -86,6 +92,22 @@ public interface ExecConstants { BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false); + // Hash Aggregate Options + + String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions"; + String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions"; + LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling + String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit"; + String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit"; + LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0); + // min batches is used for tuning (each partition needs so many batches when planning the number of partitions, + // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.) + // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer + String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition"; + String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition"; + LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3); + String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories"; + String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs"; String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size"; String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index 9d0182f..d569ae5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -35,6 +35,8 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import com.codahale.metrics.MetricRegistry; @@ -138,6 +140,60 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { va = container; } + // Like above, only preserve the original container and list of value-vectors + public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException { + final VectorContainer container = new VectorContainer(); + final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input); + recordCount = batchDef.getRecordCount(); + if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) { + + if (sv2 == null) { + sv2 = new SelectionVector2(allocator); + } + sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE); + sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE); + svMode = BatchSchema.SelectionVectorMode.TWO_BYTE; + } + final List vectorList = Lists.newArrayList(); + final List fieldList = batchDef.getFieldList(); + for (SerializedField metaData : fieldList) { + final int dataLength = metaData.getBufferLength(); + final MaterializedField field = MaterializedField.create(metaData); + final DrillBuf buf = allocator.buffer(dataLength); + final ValueVector vector; + try { + buf.writeBytes(input, dataLength); + vector = TypeHelper.getNewVector(field, allocator); + vector.load(metaData, buf); + } finally { + buf.release(); + } + vectorList.add(vector); + } + container.addCollection(vectorList); + container.setRecordCount(recordCount); + myContainer.transferIn(container); // transfer the vectors + myContainer.buildSchema(svMode); + myContainer.setRecordCount(recordCount); + /* + // for debugging -- show values from the first row + Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + if (tmp0 != null && tmp1 != null && tmp2 != null) { + NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); + NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); + NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); + + try { + logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0)); + } catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); } + } + else { logger.info("HASH AGG: got nulls !!!"); } + */ + va = myContainer; + } + public void writeToStreamAndRetain(OutputStream output) throws IOException { retain = true; writeToStream(output); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index a547e26..6f42250 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.base; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -102,17 +104,31 @@ public abstract class AbstractBase implements PhysicalOperator{ this.cost = cost; } - // Not available. Presumably because Drill does not currently use - // this value, though it does appear in some test physical plans. -// public void setMaxAllocation(long alloc) { -// maxAllocation = alloc; -// } - @Override public long getMaxAllocation() { return maxAllocation; } + /** + * Any operator that supports spilling should override this method + * @param maxAllocation The max memory allocation to be set + */ + @Override + public void setMaxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/ + } + + /** + * Any operator that supports spilling should override this method (and return true) + * @return false + */ + @Override @JsonIgnore + public boolean isBufferedOperator() { return false; } + + // @Override + // public void setBufferedOperator(boolean bo) {} + @Override public String getUserName() { return userName; http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index b1954ca..980f32c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue { */ public long getMaxAllocation(); + /** + * + * @param maxAllocation The max memory allocation to be set + */ + public void setMaxAllocation(long maxAllocation); + + /** + * + * @return True iff this operator manages its memory (including disk spilling) + */ + @JsonIgnore + public boolean isBufferedOperator(); + + // public void setBufferedOperator(boolean bo); + @JsonProperty("@id") public int getOperatorId(); http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java index 17848d0..cb9679d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java @@ -49,12 +49,19 @@ public class ExternalSort extends Sort { return CoreOperatorType.EXTERNAL_SORT_VALUE; } - // Set here, rather than the base class, because this is the only - // operator, at present, that makes use of the maximum allocation. - // Remove this, in favor of the base class version, when Drill - // sets the memory allocation for all operators. - + /** + * + * @param maxAllocation The max memory allocation to be set + */ + @Override public void setMaxAllocation(long maxAllocation) { this.maxAllocation = maxAllocation; } + + /** + * The External Sort operator supports spilling + * @return true + */ + @Override + public boolean isBufferedOperator() { return true; } } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java index 4dafbe8..0614dc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java @@ -21,6 +21,7 @@ import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.planner.physical.AggPrelBase; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; @@ -34,6 +35,7 @@ public class HashAggregate extends AbstractSingle { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class); + private final AggPrelBase.OperatorPhase aggPhase; private final List groupByExprs; private final List aggrExprs; @@ -41,15 +43,19 @@ public class HashAggregate extends AbstractSingle { @JsonCreator public HashAggregate(@JsonProperty("child") PhysicalOperator child, + @JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase, @JsonProperty("keys") List groupByExprs, @JsonProperty("exprs") List aggrExprs, @JsonProperty("cardinality") float cardinality) { super(child); + this.aggPhase = aggPhase; this.groupByExprs = groupByExprs; this.aggrExprs = aggrExprs; this.cardinality = cardinality; } + public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; } + public List getGroupByExprs() { return groupByExprs; } @@ -69,7 +75,9 @@ public class HashAggregate extends AbstractSingle { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new HashAggregate(child, groupByExprs, aggrExprs, cardinality); + HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality); + newHAG.setMaxAllocation(getMaxAllocation()); + return newHAG; } @Override @@ -77,5 +85,18 @@ public class HashAggregate extends AbstractSingle { return CoreOperatorType.HASH_AGGREGATE_VALUE; } - + /** + * + * @param maxAllocation The max memory allocation to be set + */ + @Override + public void setMaxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + } + /** + * The Hash Aggregate operator supports spilling + * @return true + */ + @Override + public boolean isBufferedOperator() { return true; } } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index dc913b1..97e0599 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -55,7 +56,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; -import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; @@ -63,12 +63,13 @@ public class HashAggBatch extends AbstractRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class); private HashAggregator aggregator; - private final RecordBatch incoming; + private RecordBatch incoming; private LogicalExpression[] aggrExprs; private TypedFieldId[] groupByOutFieldIds; private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch private final List comparators; private BatchSchema incomingSchema; + private boolean wasKilled; private final GeneratorMapping UPDATE_AGGR_INSIDE = GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */, @@ -87,6 +88,7 @@ public class HashAggBatch extends AbstractRecordBatch { public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException { super(popConfig, context); this.incoming = incoming; + wasKilled = false; final int numGrpByExprs = popConfig.getGroupByExprs().size(); comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs); @@ -136,15 +138,36 @@ public class HashAggBatch extends AbstractRecordBatch { return IterOutcome.NONE; } - if (aggregator.buildComplete() && !aggregator.allFlushed()) { - // aggregation is complete and not all records have been output yet - return aggregator.outputCurrentBatch(); + // if aggregation is complete and not all records have been output yet + if (aggregator.buildComplete() || + // or: 1st phase need to return (not fully grouped) partial output due to memory pressure + aggregator.earlyOutput()) { + // then output the next batch downstream + HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch(); + // if Batch returned, or end of data - then return the appropriate iter outcome + if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; } + if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } + // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming + incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed + } + + if (wasKilled) { // if kill() was called before, then finish up + aggregator.cleanup(); + incoming.kill(false); + return IterOutcome.NONE; } - logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); + // Read and aggregate records + // ( may need to run again if the spilled partition that was read + // generated new partitions that were all spilled ) + AggOutcome out; + do { + // + // Read incoming batches and process their records + // + out = aggregator.doWork(); + } while (out == AggOutcome.CALL_WORK_AGAIN); - AggOutcome out = aggregator.doWork(); - logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); switch (out) { case CLEANUP_AND_RETURN: container.zeroVectors(); @@ -153,6 +176,7 @@ public class HashAggBatch extends AbstractRecordBatch { // fall through case RETURN_OUTCOME: return aggregator.getOutcome(); + case UPDATE_AGGREGATOR: context.fail(UserException.unsupportedError() .message(SchemaChangeException.schemaChanged( @@ -175,7 +199,6 @@ public class HashAggBatch extends AbstractRecordBatch { * @return true if the aggregator was setup successfully. false if there was a failure. */ private boolean createAggregator() { - logger.debug("Creating new aggregator."); try { stats.startSetup(); this.aggregator = createAggregatorInternal(); @@ -198,7 +221,7 @@ public class HashAggBatch extends AbstractRecordBatch { ClassGenerator cgInner = cg.getInnerGenerator("BatchHolder"); top.plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// top.saveCodeForDebugging(true); + // top.saveCodeForDebugging(true); container.clear(); @@ -266,7 +289,7 @@ public class HashAggBatch extends AbstractRecordBatch { HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators); agg.setup(popConfig, htConfig, context, this.stats, - oContext.getAllocator(), incoming, this, + oContext, incoming, this, aggrExprs, cgInner.getWorkspaceTypes(), groupByOutFieldIds, @@ -314,6 +337,7 @@ public class HashAggBatch extends AbstractRecordBatch { @Override protected void killIncoming(boolean sendUpstream) { + wasKilled = true; incoming.kill(sendUpstream); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 1615200..38f0222 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -18,82 +18,155 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.inject.Named; +import com.google.common.base.Stopwatch; + +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.VectorAccessibleSerializable; import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; + +import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.IndexPointer; + +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; + +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.planner.physical.AggPrelBase; + +import org.apache.drill.exec.proto.UserBitShared; + import org.apache.drill.exec.record.MaterializedField; + import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.BatchSchema; + import org.apache.drill.exec.record.VectorContainer; + +import org.apache.drill.exec.record.TypedFieldId; + +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; + import org.apache.drill.exec.vector.AllocationHelper; + import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ObjectVector; import org.apache.drill.exec.vector.ValueVector; + import org.apache.drill.exec.vector.VariableWidthVector; +import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE; + public abstract class HashAggTemplate implements HashAggregator { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); + protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); -// private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024; -// private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000; - private static final int VARIABLE_WIDTH_VALUE_SIZE = 50; + private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; + private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; private static final boolean EXTRA_DEBUG_1 = false; private static final boolean EXTRA_DEBUG_2 = false; -// private static final String TOO_BIG_ERROR = -// "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; -// private boolean newSchema = false; + private static final boolean EXTRA_DEBUG_SPILL = false; + + // Fields needed for partitioning (the groups into partitions) + private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup()) + private int partitionMask; // numPartitions - 1 + private int bitsInMask; // number of bits in the MASK + private int nextPartitionToReturn = 0; // which partition to return the next batch from + // The following members are used for logging, metrics, etc. + private int rowsInPartition = 0; // counts #rows in each partition + private int rowsNotSpilled = 0; + private int rowsSpilled = 0; + private int rowsSpilledReturned = 0; + private int rowsReturnedEarly = 0; + + private boolean isTwoPhase = false; // 1 phase or 2 phase aggr? + private boolean is2ndPhase = false; + private boolean canSpill = true; // make it false in case can not spill + private ChainedHashTable baseHashTable; + private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory + private int earlyPartition = 0; // which partition to return early + + private long memoryLimit; // max memory to be used by this oerator + private long estMaxBatchSize = 0; // used for adjusting #partitions + private long estRowWidth = 0; + private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars + private long minBatchesPerPartition; // for tuning - num partitions and spill decision + private long plannedBatches = 0; // account for planned, but not yet allocated batches + private int underlyingIndex = 0; private int currentIndex = 0; private IterOutcome outcome; -// private int outputCount = 0; private int numGroupedRecords = 0; - private int outBatchIndex = 0; + private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount() + private int lastBatchOutputCount = 0; private RecordBatch incoming; -// private BatchSchema schema; + private BatchSchema schema; private HashAggBatch outgoing; private VectorContainer outContainer; -// private FragmentContext context; + + private FragmentContext context; + private OperatorContext oContext; private BufferAllocator allocator; -// private HashAggregate hashAggrConfig; - private HashTable htable; - private ArrayList batchHolders; + private HashTable htables[]; + private ArrayList batchHolders[]; + private int outBatchIndex[]; + + // For handling spilling + private SpillSet spillSet; + SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming" + private OutputStream outputStream[]; // an output stream for each spilled partition + private int spilledBatchesCount[]; // count number of batches spilled, in each partition + private String spillFiles[]; + private int cycleNum = 0; // primary, secondary, tertiary, etc. + private int originalPartition = -1; // the partition a secondary reads from + + private static class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; } + + private ArrayList spilledPartitionsList; + private int operatorId; // for the spill file name + private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put() private IndexPointer outStartIdxHolder; private IndexPointer outNumRecordsHolder; private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields - - ErrorCollector collector = new ErrorCollectorImpl(); + private TypedFieldId[] groupByOutFieldIds; private MaterializedField[] materializedValueFields; private boolean allFlushed = false; private boolean buildComplete = false; + private boolean handlingSpills = false; // True once starting to process spill files private OperatorStats stats = null; private HashTableStats htStats = new HashTableStats(); @@ -103,7 +176,15 @@ public abstract class HashAggTemplate implements HashAggregator { NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, - RESIZING_TIME; + RESIZING_TIME, + NUM_PARTITIONS, + SPILLED_PARTITIONS, // number of partitions spilled to disk + SPILL_MB, // Number of MB of data spilled to disk. This amount is first written, + // then later re-read. So, disk I/O is twice this amount. + // For first phase aggr -- this is an estimate of the amount of data + // returned early (analogous to a spill in the 2nd phase). + SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY + ; // duplicate for hash ag @@ -121,7 +202,6 @@ public abstract class HashAggTemplate implements HashAggregator { private int batchOutputCount = 0; private int capacity = Integer.MAX_VALUE; - private boolean allocatedNextBatch = false; @SuppressWarnings("resource") public BatchHolder() { @@ -145,8 +225,8 @@ public abstract class HashAggTemplate implements HashAggregator { if (vector instanceof FixedWidthVector) { ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); } else if (vector instanceof VariableWidthVector) { - ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE, - HashTable.BATCH_SIZE); + // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap ! + ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE); } else if (vector instanceof ObjectVector) { ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE); } else { @@ -166,20 +246,23 @@ public abstract class HashAggTemplate implements HashAggregator { } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { - updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); + try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); } maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch); return true; } private void setup() { - setupInterior(incoming, outgoing, aggrValuesContainer); + try { setupInterior(incoming, outgoing, aggrValuesContainer); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);} } private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) { outStartIdxHolder.value = batchOutputCount; outNumRecordsHolder.value = 0; for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { - outputRecordValues(i, batchOutputCount); + try { outputRecordValues(i, batchOutputCount); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);} if (EXTRA_DEBUG_2) { logger.debug("Outputting values to output index: {}", batchOutputCount); } @@ -204,24 +287,23 @@ public abstract class HashAggTemplate implements HashAggregator { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, - @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { + @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden - public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden - public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { + public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } - @Override public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, - OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, - LogicalExpression[] valueExprs, List valueFieldIds, TypedFieldId[] groupByOutFieldIds, - VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException { + OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, + LogicalExpression[] valueExprs, List valueFieldIds, TypedFieldId[] groupByOutFieldIds, + VectorContainer outContainer) throws SchemaChangeException, IOException { if (valueExprs == null || valueFieldIds == null) { throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables."); @@ -230,15 +312,34 @@ public abstract class HashAggTemplate implements HashAggregator { throw new IllegalArgumentException("Wrong number of workspace variables."); } -// this.context = context; + this.context = context; this.stats = stats; - this.allocator = allocator; + this.allocator = oContext.getAllocator(); + this.oContext = oContext; this.incoming = incoming; -// this.schema = incoming.getSchema(); this.outgoing = outgoing; this.outContainer = outContainer; + this.operatorId = hashAggrConfig.getOperatorId(); + + is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; + isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1; + canSpill = isTwoPhase; // single phase can not spill + + // Typically for testing - force a spill after a partition has more than so many batches + minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION); + + // Set the memory limit + memoryLimit = allocator.getLimit(); + // Optional configured memory limit, typically used only for testing. + long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY); + if (configLimit > 0) { + logger.warn("Memory limit was changed to {}",configLimit); + memoryLimit = Math.min(memoryLimit, configLimit); + allocator.setLimit(memoryLimit); // enforce at the allocator + } -// this.hashAggrConfig = hashAggrConfig; + // All the settings that require the number of partitions were moved into delayedSetup() + // which would be called later, after the actuall data first arrives // currently, hash aggregation is only applicable if there are group-by expressions. // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no @@ -266,112 +367,278 @@ public abstract class HashAggTemplate implements HashAggregator { } } - ChainedHashTable ht = + spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); + baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); - this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - + this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; - batchHolders = new ArrayList(); - // First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + + // Set the number of partitions from the configuration (raise to a power of two, if needed) + numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS); + if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1"); + } + numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 + + if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch + else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); + } + long memAvail = memoryLimit - allocator.getAllocatedMemory(); + if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) + } else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { + numPartitions /= 2; + if ( numPartitions < 2) { + if ( is2ndPhase ) { + canSpill = false; // 2nd phase needs at least 2 to make progress + logger.warn("Spilling was disabled - not enough memory available for internal partitioning"); + } + break; + } + } + } + logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", + numPartitions, canSpill ? "Can" : "Cannot"); + + // The following initial safety check should be revisited once we can lower the number of rows in a batch + // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) + if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(AbstractBase.MAX_ALLOCATION); // 10_000_000_000L + } + // Based on the number of partitions: Set the mask and bit count + partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F + bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + + // Create arrays (one entry per partition) + htables = new HashTable[numPartitions] ; + batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; + outBatchIndex = new int[numPartitions] ; + outputStream = new OutputStream[numPartitions]; + spilledBatchesCount = new int[numPartitions]; + spillFiles = new String[numPartitions]; + spilledPartitionsList = new ArrayList(); + + plannedBatches = numPartitions; // each partition should allocate its first batch + + // initialize every (per partition) entry in the arrays + for (int i = 0; i < numPartitions; i++ ) { + try { + this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); + this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (ClassTransformationException e) { + throw UserException.unsupportedError(e) + .message("Code generation error - likely an error in the code.") + .build(logger); + } catch (IOException e) { + throw UserException.resourceError(e) + .message("IO Error while creating a hash table.") + .build(logger); + } catch (SchemaChangeException sce) { + throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce); + } + this.batchHolders[i] = new ArrayList(); // First BatchHolder is created when the first put request is received. + } + } + /** + * get new incoming: (when reading spilled files like an "incoming") + * @return The (newly replaced) incoming + */ + @Override + public RecordBatch getNewIncoming() { return newIncoming; } + + private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException { + baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming + this.incoming = newIncoming; + currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file + nextPartitionToReturn = 0; + for (int i = 0; i < numPartitions; i++ ) { + htables[i].reinit(newIncoming); + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = new ArrayList(); + } + outBatchIndex[i] = 0; + outputStream[i] = null; + spilledBatchesCount[i] = 0; + spillFiles[i] = null; + } + } + + /** + * Update the estimated max batch size to be used in the Hash Aggr Op. + * using the record batch size to get the row width. + * @param incoming + */ + private void updateEstMaxBatchSize(RecordBatch incoming) { + if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change + RecordBatchSizer sizer = new RecordBatchSizer(incoming); + logger.trace("Incoming sizer: {}",sizer); + // An empty batch only has the schema, can not tell actual length of varchars + // else use the actual varchars length, each capped at 50 (to match the space allocation) + estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); + estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE; + + // Get approx max (varchar) column width to get better memory allocation + maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); + maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); + + logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}", + isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth); + + if ( estMaxBatchSize > memoryLimit ) { + logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit); + } + } + + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { - try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - outside: - while (true) { - // loop through existing records, aggregating the values as necessary. - if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + + while (true) { + + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { + this.schema = incoming.getSchema(); + currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch + // Calculate the number of partitions based on actual incoming data + delayedSetup(); + } + + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { + logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { + if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } - for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { - logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); + checkGroupAndAggrValues(currentIndex); + // If adding a group discovered a memory pressure during 1st phase, then start + // outputing some partition downstream in order to free memory. + if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { + logger.debug("Processed {} records", underlyingIndex); + } - if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { + v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { + outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { + long beforeAlloc = allocator.getAllocatedMemory(); + + // Get the next RecordBatch from the incoming (i.e. upstream operator) + outcome = outgoing.next(0, incoming); + + // If incoming batch is bigger than our estimate - adjust the estimate to match + long afterAlloc = allocator.getAllocatedMemory(); + long incomingBatchSize = afterAlloc - beforeAlloc; + if ( estMaxBatchSize < incomingBatchSize) { + logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize); + estMaxBatchSize = incomingBatchSize; } + } - try { + if (EXTRA_DEBUG_1) { + logger.debug("Received IterOutcome of {}", outcome); + } - while (true) { - // Cleanup the previous batch since we are done processing it. - for (VectorWrapper v : incoming) { - v.getValueVector().clear(); - } - IterOutcome out = outgoing.next(0, incoming); - if (EXTRA_DEBUG_1) { - logger.debug("Received IterOutcome of {}", out); - } - switch (out) { - case OUT_OF_MEMORY: - case NOT_YET: - this.outcome = out; - return AggOutcome.RETURN_OUTCOME; - - case OK_NEW_SCHEMA: - if (EXTRA_DEBUG_1) { - logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - } -// newSchema = true; - this.cleanup(); - // TODO: new schema case needs to be handled appropriately - return AggOutcome.UPDATE_AGGREGATOR; - - case OK: - resetIndex(); - if (incoming.getRecordCount() == 0) { - continue; - } else { - checkGroupAndAggrValues(currentIndex); - incIndex(); - - if (EXTRA_DEBUG_1) { - logger.debug("Continuing outside loop"); - } - continue outside; - } - - case NONE: - // outcome = out; - - buildComplete = true; - - updateStats(htable); - - // output the first batch; remaining batches will be output - // in response to each next() call by a downstream operator - - outputCurrentBatch(); - - // return setOkAndReturn(); - return AggOutcome.RETURN_OUTCOME; - - case STOP: - default: - outcome = out; - return AggOutcome.CLEANUP_AND_RETURN; - } + // Handle various results from getting the next batch + switch (outcome) { + case OUT_OF_MEMORY: + case NOT_YET: + return AggOutcome.RETURN_OUTCOME; + + case OK_NEW_SCHEMA: + if (EXTRA_DEBUG_1) { + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); } + this.cleanup(); + // TODO: new schema case needs to be handled appropriately + return AggOutcome.UPDATE_AGGREGATOR; - } finally { - // placeholder... - } + case OK: + currentBatchRecordCount = incoming.getRecordCount(); // size of next batch + + resetIndex(); // initialize index (a new batch needs to be processed) + + if (EXTRA_DEBUG_1) { + logger.debug("Continue to start processing the next batch"); + } + break; + + case NONE: + resetIndex(); // initialize index (in case spill files need to be processed) + + buildComplete = true; + + updateStats(htables); + + // output the first batch; remaining batches will be output + // in response to each next() call by a downstream operator + AggIterOutcome aggOutcome = outputCurrentBatch(); + + if ( aggOutcome == AggIterOutcome.AGG_RESTART ) { + // Output of first batch returned a RESTART (all new partitions were spilled) + return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition + } + + if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; } + + return AggOutcome.RETURN_OUTCOME; + + case STOP: + default: + return AggOutcome.CLEANUP_AND_RETURN; } - } finally { } } + /** + * Allocate space for the returned aggregate columns + * (Note DRILL-5588: Maybe can eliminate this allocation (and copy)) + * @param records + */ private void allocateOutgoing(int records) { // Skip the keys and only allocate for outputting the workspace values // (keys will be output through splitAndTransfer) @@ -382,14 +649,8 @@ public abstract class HashAggTemplate implements HashAggregator { while (outgoingIter.hasNext()) { @SuppressWarnings("resource") ValueVector vv = outgoingIter.next().getValueVector(); -// MajorType type = vv.getField().getType(); - /* - * In build schema we use the allocation model that specifies exact record count - * so we need to stick with that allocation model until DRILL-2211 is resolved. Using - * 50 as the average bytes per value as is used in HashTable. - */ - AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE, 0); + AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0); } } @@ -400,45 +661,82 @@ public abstract class HashAggTemplate implements HashAggregator { @Override public int getOutputCount() { - // return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { - if (htable != null) { - htable.clear(); - htable = null; + if ( schema == null ) { return; } // not set up; nothing to clean + if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled + (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); + } + // clean (and deallocate) each partition + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { + try { + outputStream[i].close(); + outputStream[i] = null; + spillSet.delete(spillFiles[i]); + spillFiles[i] = null; + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); + } + } } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + SpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.spillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); + } + } + // Delete the currently handled (if any) spilled file + if ( newIncoming != null ) { newIncoming.close(); } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } - if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ { + assert htables[part] != null; + htables[part].reset(); + if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } + batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -// this.outcome = IterOutcome.OK; -// for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -// } -// return AggOutcome.RETURN_OUTCOME; -// } - private final void incIndex() { underlyingIndex++; - if (underlyingIndex >= incoming.getRecordCount()) { + if (underlyingIndex >= currentBatchRecordCount) { currentIndex = Integer.MAX_VALUE; return; } - currentIndex = getVectorIndex(underlyingIndex); + try { currentIndex = getVectorIndex(underlyingIndex); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);} } private final void resetIndex() { @@ -446,71 +744,337 @@ public abstract class HashAggTemplate implements HashAggregator { incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { + return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { + if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition + int currPartSize = batchHolders[currPart].size(); + if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 + // first find the largest spilled partition + int maxSizeSpilled = -1; + int indexMaxSpilled = -1; + for (int isp = 0; isp < numPartitions; isp++ ) { + if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) { + maxSizeSpilled = batchHolders[isp].size(); + indexMaxSpilled = isp; + } + } + // Give the current (if already spilled) some priority + if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) { + maxSizeSpilled = currPartSize ; + indexMaxSpilled = currPart; + } + // now find the largest non-spilled partition + int maxSize = -1; + int indexMax = -1; + // Use the largest spilled (if found) as a base line, with a factor of 4 + if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) { + indexMax = indexMaxSpilled; + maxSize = 4 * maxSizeSpilled ; + } + for ( int insp = 0; insp < numPartitions; insp++) { + if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) { + indexMax = insp; + maxSize = batchHolders[insp].size(); + } + } + // again - priority to the current partition + if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) { + return currPart; + } + if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch! + return -1; // try skipping this spill + } + return indexMax; + } + + /** + * Iterate through the batches of the given partition, writing them to a file + * + * @param part The partition (number) to spill + */ + private void spillAPartition(int part) { + + ArrayList currPartition = batchHolders[part]; + rowsInPartition = 0; + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size()); + } + + if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill + + // If this is the first spill for this partition, create an output stream + if ( ! isSpilled(part) ) { + + spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null); + + try { + outputStream[part] = spillSet.openForOutput(spillFiles[part]); + } catch (IOException ioe) { + throw UserException.resourceError(ioe) + .message("Hash Aggregation failed to open spill file: " + spillFiles[part]) + .build(logger); + } + } + + for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) { + + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + rowsInPartition += numPendingOutput; // for logging + rowsSpilled += numPendingOutput; + + allocateOutgoing(numPendingOutput); + + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + int numOutputRecords = outNumRecordsHolder.value; + + this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + // set the value count for outgoing batch value vectors + /* int i = 0; */ + for (VectorWrapper v : outgoing) { + v.getValueVector().getMutator().setValueCount(numOutputRecords); + /* + // print out the first row to be spilled ( varchar, varchar, bigint ) + try { + if (i++ < 2) { + NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } else { + NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } + } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); } + */ + } + + outContainer.setRecordCount(numPendingOutput); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false); + VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator); + Stopwatch watch = Stopwatch.createStarted(); + try { + outputBatch.writeToStream(outputStream[part]); + } catch (IOException ioe) { + throw UserException.dataWriteError(ioe) + .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString()) + .build(logger); + } + outContainer.zeroVectors(); + logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput); + } + + spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches + + logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part); + } + + private void addBatchHolder(int part) { + BatchHolder bh = newBatchHolder(); - batchHolders.add(bh); + batchHolders[part].add(bh); if (EXTRA_DEBUG_1) { - logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size()); } bh.setup(); } - // Overridden in the generated class when created as plain Java code. - + // These methods are overridden in the generated class when created as plain Java code. protected BatchHolder newBatchHolder() { return new BatchHolder(); } + /** + * Output the next batch from partition "nextPartitionToReturn" + * + * @return iteration outcome (e.g., OK, NONE ...) + */ @Override - public IterOutcome outputCurrentBatch() { - if (outBatchIndex >= batchHolders.size()) { - this.outcome = IterOutcome.NONE; - return outcome; + public AggIterOutcome outputCurrentBatch() { + + // when incoming was an empty batch, just finish up + if ( schema == null ) { + logger.trace("Incoming was empty; output is an empty batch."); + this.outcome = IterOutcome.NONE; // no records were read + allFlushed = true; + return AggIterOutcome.AGG_NONE; } - // get the number of records in the batch holder that are pending output - int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput(); + // Initialization (covers the case of early output) + ArrayList currPartition = batchHolders[earlyPartition]; + int currOutBatchIndex = outBatchIndex[earlyPartition]; + int partitionToReturn = earlyPartition; + + if ( ! earlyOutput ) { + // Update the next partition to return (if needed) + // skip fully returned (or spilled) partitions + while (nextPartitionToReturn < numPartitions) { + // + // If this partition was spilled - spill the rest of it and skip it + // + if ( isSpilled(nextPartitionToReturn) ) { + spillAPartition(nextPartitionToReturn); // spill the rest + SpilledPartition sp = new SpilledPartition(); + sp.spillFile = spillFiles[nextPartitionToReturn]; + sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn]; + sp.cycleNum = cycleNum; // remember the current cycle + sp.origPartn = nextPartitionToReturn; // for debugging / filename + sp.prevOrigPartn = originalPartition; // for debugging / filename + spilledPartitionsList.add(sp); + + reinitPartition(nextPartitionToReturn); // free the memory + long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]); + spillSet.tallyWriteBytes(posn); // for the IO stats + try { + outputStream[nextPartitionToReturn].close(); + } catch (IOException ioe) { + throw UserException.resourceError(ioe) + .message("IO Error while closing output stream") + .build(logger); + } + outputStream[nextPartitionToReturn] = null; + } + else { + currPartition = batchHolders[nextPartitionToReturn]; + currOutBatchIndex = outBatchIndex[nextPartitionToReturn]; + // If curr batch (partition X index) is not empty - proceed to return it + if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) { + break; + } + } + nextPartitionToReturn++; // else check next partition + } + + // if passed the last partition - either done or need to restart and read spilled partitions + if (nextPartitionToReturn >= numPartitions) { + // The following "if" is probably never used; due to a similar check at the end of this method + if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions + allFlushed = true; + this.outcome = IterOutcome.NONE; + if ( is2ndPhase ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled + (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); + } + return AggIterOutcome.AGG_NONE; // then return NONE + } + // Else - there are still spilled partitions to process - pick one and handle just like a new incoming + buildComplete = false; // go back and call doWork() again + handlingSpills = true; // beginning to work on the spill files + // pick a spilled partition; set a new incoming ... + SpilledPartition sp = spilledPartitionsList.remove(0); + // Create a new "incoming" out of the spilled partition spill file + newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet); + originalPartition = sp.origPartn; // used for the filename + logger.trace("Reading back spilled original partition {} as an incoming",originalPartition); + // Initialize .... new incoming, new set of partitions + try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e); } + // update the cycle num if needed + // The current cycle num should always be one larger than in the spilled partition + if ( cycleNum == sp.cycleNum ) { + cycleNum = 1 + sp.cycleNum; + stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats + // report first spill or memory stressful situations + if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); } + if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); } + if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); } + if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); } + if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); } + } + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.", + sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size()); + } + return AggIterOutcome.AGG_RESTART; + } + + partitionToReturn = nextPartitionToReturn ; - if (numPendingOutput == 0) { - this.outcome = IterOutcome.NONE; - return outcome; } + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + // The following accounting is for logging, metrics, etc. + rowsInPartition += numPendingOutput ; + if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; } + else { rowsSpilledReturned += numPendingOutput; } + if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; } + allocateOutgoing(numPendingOutput); - batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); int numOutputRecords = outNumRecordsHolder.value; if (EXTRA_DEBUG_1) { logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value); } - this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); // set the value count for outgoing batch value vectors for (VectorWrapper v : outgoing) { v.getValueVector().getMutator().setValueCount(numOutputRecords); } -// outputCount += numOutputRecords; - this.outcome = IterOutcome.OK; - logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords); + if ( EXTRA_DEBUG_SPILL && is2ndPhase ) { + logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned, + rowsNotSpilled+rowsSpilledReturned, + rowsSpilled); + } lastBatchOutputCount = numOutputRecords; - outBatchIndex++; - if (outBatchIndex == batchHolders.size()) { - allFlushed = true; + outBatchIndex[partitionToReturn]++; + // if just flushed the last batch in the partition + if (outBatchIndex[partitionToReturn] == currPartition.size()) { + + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows", + earlyOutput ? "(Early)" : "", + partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition); + } + rowsInPartition = 0; // reset to count for the next partition + + // deallocate memory used by this partition, and re-initialize + reinitPartition(partitionToReturn); - logger.debug("HashAggregate: All batches flushed."); + if ( earlyOutput ) { - // cleanup my internal state since there is nothing more to return - this.cleanup(); + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory()); + } + outBatchIndex[earlyPartition] = 0; // reset, for next time + earlyOutput = false ; // done with early output + } + else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ? + + allFlushed = true; // next next() call will return NONE + + logger.trace("HashAggregate: All batches flushed."); + + // cleanup my internal state since there is nothing more to return + this.cleanup(); + } } - return this.outcome; + return AggIterOutcome.AGG_OK; } @Override @@ -522,11 +1086,33 @@ public abstract class HashAggTemplate implements HashAggregator { public boolean buildComplete() { return buildComplete; } + @Override + public boolean earlyOutput() { return earlyOutput; } public int numGroupedRecords() { return numGroupedRecords; } + /** + * Generate a detailed error message in case of "Out Of Memory" + * @return err msg + */ + private String getOOMErrorMsg() { + String errmsg; + if ( !isTwoPhase ) { + errmsg = "Single Phase Hash Aggregate operator can not spill." ; + } else if ( ! canSpill ) { // 2nd phase, with only 1 partition + errmsg = "Too little memory available to operator to facilitate spilling."; + } else { // a bug ? + errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions + + ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches; + if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; } + } + errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". "; + + return errmsg; + } + // Check if a group is present in the hash table; if not, insert it in the hash table. // The htIdxHolder contains the index of the group in the hash table container; this same // index is also used for the aggregation values maintained by the hash aggregate. @@ -535,6 +1121,8 @@ public abstract class HashAggTemplate implements HashAggregator { throw new IllegalArgumentException("Invalid incoming row index."); } + assert ! earlyOutput; + /** for debugging Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector(); BigIntVector vv0 = null; @@ -546,44 +1134,189 @@ public abstract class HashAggTemplate implements HashAggregator { holder.value = vv0.getAccessor().get(incomingRowIdx) ; } */ + /* + if ( handlingSpills && ( incomingRowIdx == 0 ) ) { + // for debugging -- show the first row from a spilled batch + Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + + if (tmp0 != null && tmp1 != null && tmp2 != null) { + NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); + NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); + NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); + logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx)); + } + } + */ + // The hash code is computed once, then its lower bits are used to determine the + // partition to use, and the higher bits determine the location in the hash table. + int hashCode; + try { + htables[0].updateBatches(); + hashCode = htables[0].getHashCode(incomingRowIdx); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException("Unexpected schema change", e); + } - htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */); + // right shift hash code for secondary (or tertiary...) spilling + for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; } - int currentIdx = htIdxHolder.value; + int currentPartition = hashCode & partitionMask ; + hashCode >>>= bitsInMask; + HashTable.PutStatus putStatus = null; + long allocatedBefore = allocator.getAllocatedMemory(); - // get the batch index and index within the batch - if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) { - addBatchHolder(); + // Insert the key columns into the hash table + try { + putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode); + } catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException("Unexpected schema change", e); } - BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK); - int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; + int currentIdx = htIdxHolder.value; - // Check if we have almost filled up the workspace vectors and add a batch if necessary - if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) { - htable.addNewKeyBatch(); - addBatchHolder(); - bh.allocatedNextBatch = true; + long addedMem = allocator.getAllocatedMemory() - allocatedBefore; + if ( addedMem > 0 ) { + logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition); } + // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch + // (for the aggregate columns) needs to be created + if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) { + try { + long allocatedBeforeAggCol = allocator.getAllocatedMemory(); + + addBatchHolder(currentPartition); + + if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch + long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore; + logger.trace("MEMORY CHECK AGG: added {} total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem); + // resize the batch estimate if needed (e.g., varchars may take more memory than estimated) + if ( totalAddedMem > estMaxBatchSize ) { + logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem); + estMaxBatchSize = totalAddedMem; + } + } catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill + } + } + BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK); + int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) { numGroupedRecords++; } + + // =================================================================================== + // If the last batch just became full - that is the time to check the memory limits !! + // If exceeded, then need to spill (if 2nd phase) or output early (1st) + // (Skip this if cannot spill; in such case an OOM may be encountered later) + // =================================================================================== + if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) { + + plannedBatches++; // planning to allocate one more batch + + // calculate the (max) new memory needed now + long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize + for ( HashTable ht : htables ) { + hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize(); + } + + // Plan ahead for at least MIN batches, to account for size changing, and some overhead + long maxMemoryNeeded = minBatchesPerPartition * plannedBatches * + ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) + + hashTableDoublingSizeNeeded; + + // log a detailed debug message explaining why a spill may be needed + logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + + "Memory needed {}, Est batch size {}, mem limit {}", + allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition, + batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit); + // + // Spill if the allocated memory plus the memory needed exceeds the memory limit. + // + if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) { + + // Pick a "victim" partition to spill or return + int victimPartition = chooseAPartitionToFlush(currentPartition); + + // In case no partition has more than one batch -- try and "push the limits"; maybe next + // time the spill could work. + if ( victimPartition < 0 ) { return; } + + if ( is2ndPhase ) { + long before = allocator.getAllocatedMemory(); + + spillAPartition(victimPartition); + logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition); + + // Re-initialize (free memory, then recreate) the partition just spilled/returned + reinitPartition(victimPartition); + + // in some "edge" cases (e.g. testing), spilling one partition may not be enough + if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) { + int victimPartition2 = chooseAPartitionToFlush(victimPartition); + if ( victimPartition2 < 0 ) { return; } + long after = allocator.getAllocatedMemory(); + spillAPartition(victimPartition2); + reinitPartition(victimPartition2); + logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}", + before, after, allocator.getAllocatedMemory(), maxMemoryNeeded); + logger.trace("Second Partition Spilled: {}",victimPartition2); + } + } + else { + // 1st phase need to return a partition early in order to free some memory + earlyOutput = true; + earlyPartition = victimPartition; + + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("picked partition {} for early output", victimPartition); + } + } + } + } } - private void updateStats(HashTable htable) { - htable.getStats(htStats); + /** + * Updates the stats at the time after all the input was read. + * Note: For spilled partitions, their hash-table stats from before the spill are lost. + * And the SPILLED_PARTITIONS only counts the spilled partitions in the primary, not SECONDARY etc. + * @param htables + */ + private void updateStats(HashTable[] htables) { + if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files + long numSpilled = 0; + HashTableStats newStats = new HashTableStats(); + // sum the stats from all the partitions + for (int ind = 0; ind < numPartitions; ind++) { + htables[ind].getStats(newStats); + htStats.addStats(newStats); + if (isSpilled(ind)) { + numSpilled++; + } + } this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime); + this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions); + if ( is2ndPhase ) { + this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled); + } + if ( rowsReturnedEarly > 0 ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early + (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0)); + } } // Code-generated methods (implemented in HashAggBatch) - public abstract void doSetup(@Named("incoming") RecordBatch incoming); + public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException; - public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); + public abstract int getVectorIndex(@Named("recordIndex") int recordIndex) throws SchemaChangeException; - public abstract boolean resetValues(); + public abstract boolean resetValues() throws SchemaChangeException; }