drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prog...@apache.org
Subject [1/2] drill git commit: DRILL-5694: Handle OOM in HashAggr by spill and retry, reserve memory, spinner
Date Fri, 22 Sep 2017 23:40:33 GMT
Repository: drill
Updated Branches:
  refs/heads/master 4c99f0cdd -> d77ab3183


http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index 8f0f770..7dbc9a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHashAggrSpill {
 
-    private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle,
long spilledPartitions) throws Exception {
+    private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle,
long fromSpilledPartitions, long toSpilledPartitions) throws Exception {
         String plan = client.queryBuilder().sql(sql).explainJson();
 
         QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
@@ -63,7 +63,7 @@ public class TestHashAggrSpill {
         long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
         assertEquals(spillCycle, opCycle);
         long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
-        assertEquals(spilledPartitions, op_spilled_partitions);
+        assertTrue( op_spilled_partitions >= fromSpilledPartitions && op_spilled_partitions
<= toSpilledPartitions );
         /* assertEquals(3, ops.size());
         for ( int i = 0; i < ops.size(); i++ ) {
             ProfileParser.OperatorProfile hag = ops.get(i);
@@ -77,134 +77,107 @@ public class TestHashAggrSpill {
     }
 
     /**
-     * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling
-     * ("normal spill" means spill-cycle = 1 )
+     *  A template for Hash Aggr spilling tests
      *
      * @throws Exception
      */
-    @Test
-    public void testHashAggrSpill() throws Exception {
+    private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel,
boolean fallback ,boolean predict,
+                           String sql, long expectedRows, int cycle, int fromPart, int toPart)
throws Exception {
         LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
-            .toConsole()
-            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
-            ;
+          .toConsole()
+          //.logger("org.apache.drill.exec.physical.impl.aggregate", Level.INFO)
+          .logger("org.apache.drill", Level.WARN)
+          ;
 
         FixtureBuilder builder = ClusterFixture.builder()
-            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000)
-            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
-            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
-            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
-            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
-            // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
-            .maxParallelization(2)
-            .saveProfiles()
-            //.keepLocalFiles()
-            ;
+          .sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem)
+          .sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions)
+          .sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches)
+          .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
+          .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+          // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+          .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, fallback)
+          .sessionOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_KEY,predict)
+
+          .maxParallelization(maxParallel)
+          .saveProfiles()
+          //.keepLocalFiles()
+          ;
+        String sqlStr = sql != null ? sql :  // if null then use this default query
+          "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
+
         try (LogFixture logs = logBuilder.build();
              ClusterFixture cluster = builder.build();
              ClientFixture client = cluster.clientFixture()) {
-            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
-            runAndDump(client, sql, 1_200_000, 1, 1);
+            runAndDump(client, sqlStr, expectedRows, cycle, fromPart,toPart);
         }
     }
-
     /**
-     *  Test "secondary" spilling -- Some of the spilled partitions cause more spilling as
they are read back
-     *  (Hence spill-cycle = 2 )
+     * Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
+     * ("normal spill" means spill-cycle = 1 )
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testSimpleHashAggrSpill() throws Exception {
+        testSpill(68_000_000, 16, 2, 2, false, true, null,
+          1_200_000, 1,2, 3
+          );
+    }
+    /**
+     * Test with "needed memory" prediction turned off
+     * (i.e., do exercise code paths that catch OOMs from the Hash Table and recover)
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testNoPredictHashAggrSpill() throws Exception {
+        testSpill(58_000_000, 16, 2, 2, false,false /* no prediction */,
+             null,1_200_000, 1,1, 1
+        );
+    }
+    /**
+     * Test Secondary and Tertiary spill cycles - Happens when some of the spilled partitions
cause more spilling as they are read back
      *
      * @throws Exception
      */
     @Test
     public void testHashAggrSecondaryTertiarySpill() throws Exception {
-        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
-            .toConsole()
-            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
-            .logger("org.apache.drill.exec.cache", Level.INFO)
-            ;
 
-        FixtureBuilder builder = ClusterFixture.builder()
-            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000)
-            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
-            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
-            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
-            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
-            .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false)
-            // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
-            .maxParallelization(1)
-            .saveProfiles()
-            //.keepLocalFiles()
-            ;
-        try (LogFixture logs = logBuilder.build();
-             ClusterFixture cluster = builder.build();
-             ClientFixture client = cluster.clientFixture()) {
-            String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K`
GROUP BY empid_s44, dept_i, branch_i";
-            runAndDump(client, sql, 1_100_000, 3, 2);
-        }
+        testSpill(58_000_000, 16, 3, 1, false,true,
+          "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K`
GROUP BY empid_s44, dept_i, branch_i",
+          1_100_000, 3,2, 2
+        );
     }
-
     /**
-     * Test when memory limit is set to very low value even to hold one partition in 2 Phase
Hash Agg scenario and
-     * fallback mechanism to use unbounded memory is disabled then Query Fails in HashAgg
with Resource Error.
+     * Test with the "fallback" option disabled: When not enough memory available to allow
spilling, then fail (Resource error) !!
+     *
      * @throws Exception
      */
     @Test
     public void testHashAggrFailWithFallbackDisabed() throws Exception {
-        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
-            .toConsole()
-            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
-            ;
 
-        FixtureBuilder builder = ClusterFixture.builder()
-            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
-            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
-            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
-            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
-            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
-            .maxParallelization(2)
-            .saveProfiles()
-            ;
-        try (LogFixture logs = logBuilder.build();
-             ClusterFixture cluster = builder.build();
-             ClientFixture client = cluster.clientFixture()) {
-            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
-            QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
-            fail();
+        try {
+            testSpill(34_000_000, 4, 5, 2, false /* no fallback */, true, null,
+              1_200_000, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
+            fail(); // in case the above test did not throw
         } catch (Exception ex) {
             assertTrue(ex instanceof UserRemoteException);
             assertTrue(((UserRemoteException)ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.RESOURCE);
+            // must get here for the test to succeed ...
         }
     }
-
     /**
-     * Test when memory limit is set to very low value even to hold one partition in 2 Phase
Hash Agg scenario and
-     * fallback mechanism to use unbounded memory is enabled then query completes successfully
without spilling.
+     * Test with the "fallback" option ON: When not enough memory is available to allow spilling
(internally need enough memory to
+     * create multiple partitions), then behave like the pre-1.11 Hash Aggregate: Allocate
unlimited memory, no spill.
      *
      * @throws Exception
      */
     @Test
     public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
-        LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
-            .toConsole()
-            .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
-            ;
 
-        FixtureBuilder builder = ClusterFixture.builder()
-            .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
-            .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
-            .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
-            .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, true)
-            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
-            .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
-            .maxParallelization(2)
-            .saveProfiles()
-            ;
-        try (LogFixture logs = logBuilder.build();
-             ClusterFixture cluster = builder.build();
-             ClientFixture client = cluster.clientFixture()) {
-            String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K`
GROUP BY empid_s17, dept_i, branch_i";
-            runAndDump(client, sql, 1_200_000, 0, 0);
-        } catch (Exception ex) {
-            fail();
-        }
+        testSpill(34_000_000, 4, 5, 2, true /* do fallback */,true, null,
+          1_200_000, 0 /* no spill due to fallback to pre-1.11 */,0, 0
+        );
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index d2ff805..e8db1e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -33,7 +33,6 @@ import org.apache.drill.exec.ops.OperExecContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.BatchSchema;
@@ -91,15 +90,8 @@ public class TestSortImpl extends DrillTest {
           .setQueryId(queryId)
           .build();
     SortConfig sortConfig = new SortConfig(opContext.getConfig());
-    DrillbitEndpoint ep = DrillbitEndpoint.newBuilder()
-        .setAddress("foo.bar.com")
-        .setUserPort(1234)
-        .setControlPort(1235)
-        .setDataPort(1236)
-        .setVersion("1.11")
-        .build();
-    SpillSet spillSet = new SpillSet(opContext.getConfig(), handle,
-                                     popConfig, ep);
+
+    SpillSet spillSet = new SpillSet(opContext.getConfig(), handle, popConfig);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
     SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
     return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 4e17eda..e97493e 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -198,11 +198,11 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   private static String createErrorMsg(final BufferAllocator allocator, final int rounded,
final int requested) {
     if (rounded != requested) {
       return String.format(
-          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current
allocation: %d",
-          rounded, requested, allocator.getAllocatedMemory());
+          "Unable to allocate buffer of size %d (rounded from %d) due to memory limit (%d).
Current allocation: %d",
+          rounded, requested, allocator.getLimit(), allocator.getAllocatedMemory());
     } else {
-      return String.format("Unable to allocate buffer of size %d due to memory limit. Current
allocation: %d",
-          rounded, allocator.getAllocatedMemory());
+      return String.format("Unable to allocate buffer of size %d due to memory limit (%d).
Current allocation: %d",
+          rounded, allocator.getLimit(), allocator.getAllocatedMemory());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 4418212..21959f6 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -198,7 +198,7 @@ public final class ${className} extends BaseDataValueVector implements
<#if type
     try {
       values.allocateNew(totalBytes, valueCount);
       bits.allocateNew(valueCount);
-    } catch(DrillRuntimeException e) {
+    } catch(RuntimeException e) {
       clear();
       throw e;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 4527da8..e5432da 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -362,7 +362,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements
V
     try {
       data = allocator.buffer(totalBytes);
       offsetVector.allocateNew(valueCount + 1);
-    } catch (DrillRuntimeException e) {
+    } catch (RuntimeException e) {
       clear();
       throw e;
     }


Mime
View raw message