Repository: drill
Updated Branches:
refs/heads/master 4c99f0cdd -> d77ab3183
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index 8f0f770..7dbc9a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
*/
public class TestHashAggrSpill {
- private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle,
long spilledPartitions) throws Exception {
+ private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle,
long fromSpilledPartitions, long toSpilledPartitions) throws Exception {
String plan = client.queryBuilder().sql(sql).explainJson();
QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
@@ -63,7 +63,7 @@ public class TestHashAggrSpill {
long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
assertEquals(spillCycle, opCycle);
long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
- assertEquals(spilledPartitions, op_spilled_partitions);
+ assertTrue( op_spilled_partitions >= fromSpilledPartitions && op_spilled_partitions
<= toSpilledPartitions );
/* assertEquals(3, ops.size());
for ( int i = 0; i < ops.size(); i++ ) {
ProfileParser.OperatorProfile hag = ops.get(i);
@@ -77,134 +77,107 @@ public class TestHashAggrSpill {
}
/**
- * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling
- * ("normal spill" means spill-cycle = 1 )
+ * A template for Hash Aggr spilling tests
*
* @throws Exception
*/
- @Test
- public void testHashAggrSpill() throws Exception {
+ private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel,
boolean fallback ,boolean predict,
+ String sql, long expectedRows, int cycle, int fromPart, int toPart)
throws Exception {
LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- ;
+ .toConsole()
+ //.logger("org.apache.drill.exec.physical.impl.aggregate", Level.INFO)
+ .logger("org.apache.drill", Level.WARN)
+ ;
FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
- .maxParallelization(2)
- .saveProfiles()
- //.keepLocalFiles()
- ;
+ .sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem)
+ .sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions)
+ .sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches)
+ .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
+ .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+ // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+ .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, fallback)
+ .sessionOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_KEY,predict)
+
+ .maxParallelization(maxParallel)
+ .saveProfiles()
+ //.keepLocalFiles()
+ ;
+ String sqlStr = sql != null ? sql : // if null then use this default query
+ "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
+
try (LogFixture logs = logBuilder.build();
ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
- runAndDump(client, sql, 1_200_000, 1, 1);
+ runAndDump(client, sqlStr, expectedRows, cycle, fromPart,toPart);
}
}
-
/**
- * Test "secondary" spilling -- Some of the spilled partitions cause more spilling as
they are read back
- * (Hence spill-cycle = 2 )
+ * Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
+ * ("normal spill" means spill-cycle = 1 )
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleHashAggrSpill() throws Exception {
+ testSpill(68_000_000, 16, 2, 2, false, true, null,
+ 1_200_000, 1,2, 3
+ );
+ }
+ /**
+ * Test with "needed memory" prediction turned off
+ * (i.e., do exercise code paths that catch OOMs from the Hash Table and recover)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNoPredictHashAggrSpill() throws Exception {
+ testSpill(58_000_000, 16, 2, 2, false,false /* no prediction */,
+ null,1_200_000, 1,1, 1
+ );
+ }
+ /**
+ * Test Secondary and Tertiary spill cycles - Happens when some of the spilled partitions
cause more spilling as they are read back
*
* @throws Exception
*/
@Test
public void testHashAggrSecondaryTertiarySpill() throws Exception {
- LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- .logger("org.apache.drill.exec.cache", Level.INFO)
- ;
- FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false)
- // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
- .maxParallelization(1)
- .saveProfiles()
- //.keepLocalFiles()
- ;
- try (LogFixture logs = logBuilder.build();
- ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K`
GROUP BY empid_s44, dept_i, branch_i";
- runAndDump(client, sql, 1_100_000, 3, 2);
- }
+ testSpill(58_000_000, 16, 3, 1, false,true,
+ "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K`
GROUP BY empid_s44, dept_i, branch_i",
+ 1_100_000, 3,2, 2
+ );
}
-
/**
- * Test when memory limit is set to very low value even to hold one partition in 2 Phase
Hash Agg scenario and
- * fallback mechanism to use unbounded memory is disabled then Query Fails in HashAgg
with Resource Error.
+ * Test with the "fallback" option disabled: When not enough memory available to allow
spilling, then fail (Resource error) !!
+ *
* @throws Exception
*/
@Test
public void testHashAggrFailWithFallbackDisabed() throws Exception {
- LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- ;
- FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- .maxParallelization(2)
- .saveProfiles()
- ;
- try (LogFixture logs = logBuilder.build();
- ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
- QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
- fail();
+ try {
+ testSpill(34_000_000, 4, 5, 2, false /* no fallback */, true, null,
+ 1_200_000, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
+ fail(); // in case the above test did not throw
} catch (Exception ex) {
assertTrue(ex instanceof UserRemoteException);
assertTrue(((UserRemoteException)ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.RESOURCE);
+ // must get here for the test to succeed ...
}
}
-
/**
- * Test when memory limit is set to very low value even to hold one partition in 2 Phase
Hash Agg scenario and
- * fallback mechanism to use unbounded memory is enabled then query completes successfully
without spilling.
+ * Test with the "fallback" option ON: When not enough memory is available to allow spilling
(internally need enough memory to
+ * create multiple partitions), then behave like the pre-1.11 Hash Aggregate: Allocate
unlimited memory, no spill.
*
* @throws Exception
*/
@Test
public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
- LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- ;
- FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, true)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- .maxParallelization(2)
- .saveProfiles()
- ;
- try (LogFixture logs = logBuilder.build();
- ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
- runAndDump(client, sql, 1_200_000, 0, 0);
- } catch (Exception ex) {
- fail();
- }
+ testSpill(34_000_000, 4, 5, 2, true /* do fallback */,true, null,
+ 1_200_000, 0 /* no spill due to fallback to pre-1.11 */,0, 0
+ );
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index d2ff805..e8db1e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -33,7 +33,6 @@ import org.apache.drill.exec.ops.OperExecContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.record.BatchSchema;
@@ -91,15 +90,8 @@ public class TestSortImpl extends DrillTest {
.setQueryId(queryId)
.build();
SortConfig sortConfig = new SortConfig(opContext.getConfig());
- DrillbitEndpoint ep = DrillbitEndpoint.newBuilder()
- .setAddress("foo.bar.com")
- .setUserPort(1234)
- .setControlPort(1235)
- .setDataPort(1236)
- .setVersion("1.11")
- .build();
- SpillSet spillSet = new SpillSet(opContext.getConfig(), handle,
- popConfig, ep);
+
+ SpillSet spillSet = new SpillSet(opContext.getConfig(), handle, popConfig);
PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 4e17eda..e97493e 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -198,11 +198,11 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private static String createErrorMsg(final BufferAllocator allocator, final int rounded,
final int requested) {
if (rounded != requested) {
return String.format(
- "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current
allocation: %d",
- rounded, requested, allocator.getAllocatedMemory());
+ "Unable to allocate buffer of size %d (rounded from %d) due to memory limit (%d).
Current allocation: %d",
+ rounded, requested, allocator.getLimit(), allocator.getAllocatedMemory());
} else {
- return String.format("Unable to allocate buffer of size %d due to memory limit. Current
allocation: %d",
- rounded, allocator.getAllocatedMemory());
+ return String.format("Unable to allocate buffer of size %d due to memory limit (%d).
Current allocation: %d",
+ rounded, allocator.getLimit(), allocator.getAllocatedMemory());
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 4418212..21959f6 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -198,7 +198,7 @@ public final class ${className} extends BaseDataValueVector implements
<#if type
try {
values.allocateNew(totalBytes, valueCount);
bits.allocateNew(valueCount);
- } catch(DrillRuntimeException e) {
+ } catch(RuntimeException e) {
clear();
throw e;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 4527da8..e5432da 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -362,7 +362,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements
V
try {
data = allocator.buffer(totalBytes);
offsetVector.allocateNew(valueCount + 1);
- } catch (DrillRuntimeException e) {
+ } catch (RuntimeException e) {
clear();
throw e;
}
|