Repository: drill
Updated Branches:
refs/heads/master d105950a7 -> e1649dd7d
DRILL-5737: Hash Agg uses more than the allocated memory under certain low memory conditions
Note: Provide a new config parameter HASHAGG_FALLBACK_ENABLED which is set to true by default.
When 2 Phase HashAgg doesn't have enough memory to hold 2 partitions then based on this flag
it either fallsback to old behavior of consuming unbounded memory or it fails the query.
close apache/drill#920
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1649dd7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1649dd7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1649dd7
Branch: refs/heads/master
Commit: e1649dd7d9fb2c30632f4df6ea17c483379c9775
Parents: d105950
Author: Sorabh Hamirwasia <shamirwasia@maprtech.com>
Authored: Tue Aug 22 18:20:51 2017 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Mon Sep 4 23:29:28 2017 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 2 +
.../impl/aggregate/HashAggTemplate.java | 19 +++++-
.../server/options/SystemOptionManager.java | 1 +
.../src/main/resources/drill-module.conf | 6 ++
.../physical/impl/agg/TestHashAggrSpill.java | 66 ++++++++++++++++++++
5 files changed, 91 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 95ee00e..4aaa537 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -106,6 +106,8 @@ public interface ExecConstants {
LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,
2, 5);
String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
+ String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled";
+ BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY);
String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 9f2c2fa..a3b1ceb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -379,11 +379,13 @@ public abstract class HashAggTemplate implements HashAggregator {
*/
private void delayedSetup() {
+ final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
+
// Set the number of partitions from the configuration (raise to a power of two, if needed)
numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
if ( numPartitions == 1 ) {
canSpill = false;
- logger.warn("Spilling was disabled due to configuration setting of num_partitions to
1");
+ logger.warn("Spilling is disabled due to configuration setting of num_partitions to
1");
}
numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of
2
@@ -401,9 +403,20 @@ public abstract class HashAggTemplate implements HashAggregator {
while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024)
> memAvail ) {
numPartitions /= 2;
if ( numPartitions < 2) {
- if ( is2ndPhase ) {
+ if (is2ndPhase) {
canSpill = false; // 2nd phase needs at least 2 to make progress
- logger.warn("Spilling was disabled - not enough memory available for internal
partitioning");
+
+ if (fallbackEnabled) {
+ logger.warn("Spilling is disabled - not enough memory available for internal
partitioning. Falling back"
+ + " to use unbounded memory");
+ } else {
+ throw UserException.resourceError()
+ .message(String.format("Not enough memory for internal partitioning and
fallback mechanism for "
+ + "HashAgg to use unbounded memory is disabled. Either enable fallback
config %s using Alter "
+ + "session/system command or increase memory limit for Drillbit",
+ ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY))
+ .build(logger);
+ }
}
break;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index d2dfc2a..4e362ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -118,6 +118,7 @@ public class SystemOptionManager extends BaseOptionManager implements
OptionMana
ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR,
ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR,
ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning
+ ExecConstants.HASHAGG_FALLBACK_ENABLED_VALIDATOR, // for enable/disable unbounded HashAgg
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f387cda..26f0722 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -370,6 +370,12 @@ drill.exec.options: {
debug.validate_vectors :false,
drill.exec.functions.cast_empty_string_to_null: false,
drill.exec.hashagg.min_batches_per_partition : 3,
+ # Setting to control if HashAgg should fallback to older behavior of consuming
+ # unbounded memory. In case of 2 phase Agg when available memory is not enough
+ # to start at least 2 partitions then HashAgg fallbacks to this case. It can be
+ # enabled by setting this flag to true. By default it's set to false such that
+ # query will fail if there is not enough memory
+ drill.exec.hashagg.fallback.enabled: false,
drill.exec.storage.file.partition.column.label: "dir",
drill.exec.storage.implicit.filename.column.label: "filename",
drill.exec.storage.implicit.filepath.column.label: "filepath",
http://git-wip-us.apache.org/repos/asf/drill/blob/e1649dd7/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index fe6fcbc..fb43f3e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.agg;
import ch.qos.logback.classic.Level;
import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -34,6 +35,7 @@ import org.junit.Test;
import java.util.List;
+import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -138,4 +140,68 @@ public class TestHashAggrSpill extends BaseTestQuery {
runAndDump(client, sql, 1_100_000, 3, 2);
}
}
+
+ /**
+ * Test when memory limit is set to very low value even to hold one partition in 2 Phase
Hash Agg scenario and
+ * fallback mechanism to use unbounded memory is disabled then Query Fails in HashAgg
with Resource Error.
+ * @throws Exception
+ */
+ @Test
+ public void testHashAggrFailWithFallbackDisabed() throws Exception {
+ LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+ .toConsole()
+ .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+ ;
+
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
+ .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
+ .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+ .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+ .maxParallelization(2)
+ .saveProfiles()
+ ;
+ try (LogFixture logs = logBuilder.build();
+ ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
+ QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+ fail();
+ } catch (Exception ex) {
+ assertTrue(ex instanceof UserRemoteException);
+ assertTrue(((UserRemoteException)ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.RESOURCE);
+ }
+ }
+
+ /**
+ * Test when memory limit is set to very low value even to hold one partition in 2 Phase
Hash Agg scenario and
+ * fallback mechanism to use unbounded memory is enabled then query completes successfully
without spilling.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
+ LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+ .toConsole()
+ .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+ ;
+
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
+ .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
+ .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+ .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, true)
+ .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+ .maxParallelization(2)
+ .saveProfiles()
+ ;
+ try (LogFixture logs = logBuilder.build();
+ ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
+ runAndDump(client, sql, 1_200_000, 0, 0);
+ } catch (Exception ex) {
+ fail();
+ }
+ }
}
\ No newline at end of file
|