drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [13/17] drill git commit: DRILL-6180: Use System Option "output_batch_size" for External Sort
Date Sat, 03 Mar 2018 18:47:19 GMT
DRILL-6180: Use System Option "output_batch_size" for External Sort

closes #1129


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/4ee207bd
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/4ee207bd
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/4ee207bd

Branch: refs/heads/master
Commit: 4ee207bd6744fcd9efcd7bafa3559093ad89fb46
Parents: 6a55b2b
Author: Padma Penumarthy <ppenumar97@yahoo.com>
Authored: Thu Feb 22 16:41:47 2018 -0800
Committer: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
Committed: Sat Mar 3 19:47:51 2018 +0200

----------------------------------------------------------------------
 .../impl/xsort/managed/ExternalSortBatch.java   |  2 +-
 .../physical/impl/xsort/managed/SortConfig.java | 12 ++-
 .../src/main/resources/drill-module.conf        |  9 +-
 .../managed/TestExternalSortInternals.java      | 88 ++++++++++----------
 .../impl/xsort/managed/TestSortImpl.java        |  3 +-
 5 files changed, 59 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/4ee207bd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 23e66a0..ea53a80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -213,7 +213,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort>
{
     super(popConfig, context, true);
     this.incoming = incoming;
 
-    SortConfig sortConfig = new SortConfig(context.getConfig());
+    SortConfig sortConfig = new SortConfig(context.getConfig(), context.getOptions());
     SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
     oContext.setInjector(injector);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext);

http://git-wip-us.apache.org/repos/asf/drill/blob/4ee207bd/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index 236c2f3..e592ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
 
 public class SortConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
@@ -71,8 +72,7 @@ public class SortConfig {
 
   private final int mSortBatchSize;
 
-  public SortConfig(DrillConfig config) {
-
+  public SortConfig(DrillConfig config, OptionManager options) {
     // Optional configured memory limit, typically used only for testing.
 
     maxMemory = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
@@ -99,7 +99,13 @@ public class SortConfig {
     // of memory, but no smaller than the minimum size. In any event, an
     // output batch can contain no fewer than a single record.
 
-    mergeBatchSize = (int) Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE),
MIN_MERGE_BATCH_SIZE);
+    // get the output batch size from context.
+    // Size of the batch sent downstream from the sort operator during
+    // the merge phase. Default value is 16M.
+    // Don't change defaults unless you know what you are doing,
+    // larger sizes can result in memory fragmentation, smaller sizes
+    // in excessive operator iterator overhead.
+    mergeBatchSize = (int) Math.max(options.getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR),
MIN_MERGE_BATCH_SIZE);
 
     // Limit on in-memory batches, primarily for testing.
 

http://git-wip-us.apache.org/repos/asf/drill/blob/4ee207bd/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index fc365d2..32be387 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -293,12 +293,7 @@ drill.exec: {
         // Set large enough to get long, continuous writes, but not so
         // large as to overwhelm a temp directory.
         // Supports HOCON memory suffixes.
-        file_size: 256M,
-        // Size of the batch sent downstream from the sort operator during
-        // the merge phase. Don't change this unless you know what you are doing,
-        // larger sizes can result in memory fragmentation, smaller sizes
-        // in excessive operator iterator overhead.
-        merge_batch_size = 16M
+        file_size: 256M
       }
     }
   },
@@ -424,7 +419,7 @@ drill.exec.options: {
     drill.exec.storage.implicit.fqn.column.label: "fqn",
     drill.exec.storage.implicit.suffix.column.label: "suffix",
     drill.exec.testing.controls: "{}",
-    drill.exec.memory.operator.output_batch_size : 33554432, # 32 MB
+    drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB
     exec.bulk_load_table_list.bulk_size: 1000,
     exec.compile.scalar_replacement: false,
     exec.enable_bulk_load_table_list: false,

http://git-wip-us.apache.org/repos/asf/drill/blob/4ee207bd/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index 1315a86..b0afbb2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -22,12 +22,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.drill.categories.OperatorTest;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
 import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
-import org.apache.drill.test.ConfigBuilder;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -42,8 +43,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
    */
   @Test
   public void testConfigDefaults() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getOptionManager());
     // Zero means no artificial limit
     assertEquals(0, sortConfig.maxMemory());
     // Zero mapped to large number
@@ -67,16 +67,18 @@ public class TestExternalSortInternals extends SubOperatorTest {
   @Test
   public void testConfigOverride() {
     // Verify the various HOCON ways of setting memory
-    DrillConfig drillConfig = new ConfigBuilder()
+    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K")
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10)
         .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, "10M")
         .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000)
-        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000)
         .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50)
         .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 10)
         .build();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    FragmentContext fragmentContext = builder.build().getFragmentContext();
+    fragmentContext.getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, 600_000);
+    SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions());
     assertEquals(2000 * 1024, sortConfig.maxMemory());
     assertEquals(10, sortConfig.mergeLimit());
     assertEquals(10 * ONE_MEG, sortConfig.spillFileSize());
@@ -91,15 +93,17 @@ public class TestExternalSortInternals extends SubOperatorTest {
    */
   @Test
   public void testConfigLimits() {
-    DrillConfig drillConfig = new ConfigBuilder()
+    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, SortConfig.MIN_MERGE_LIMIT - 1)
         .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, SortConfig.MIN_SPILL_FILE_SIZE
- 1)
         .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, SortConfig.MIN_SPILL_BATCH_SIZE
- 1)
-        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE
- 1)
         .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1)
         .put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, 0)
         .build();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    FragmentContext fragmentContext = builder.build().getFragmentContext();
+    fragmentContext.getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, SortConfig.MIN_MERGE_BATCH_SIZE
- 1);
+    SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions());
     assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit());
     assertEquals(SortConfig.MIN_SPILL_FILE_SIZE, sortConfig.spillFileSize());
     assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, sortConfig.spillBatchSize());
@@ -110,8 +114,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testMemoryManagerBasics() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     long memoryLimit = 70 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -217,8 +220,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testSmallRows() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     long memoryLimit = 100 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -263,8 +265,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testLowMemory() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     int memoryLimit = 10 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -306,8 +307,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testLowerMemory() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     int memoryLimit = 10 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -352,8 +352,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testExtremeLowMemory() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     long memoryLimit = 10 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -391,8 +390,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testMemoryOverflow() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     long memoryLimit = 10 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -413,34 +411,36 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testConfigConstraints() {
-    int memConstaint = 40 * ONE_MEG;
-    int batchSizeConstaint = ONE_MEG / 2;
-    int mergeSizeConstaint = ONE_MEG;
-    DrillConfig drillConfig = new ConfigBuilder()
-        .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstaint)
-        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstaint)
-        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, mergeSizeConstaint)
+    int memConstraint = 40 * ONE_MEG;
+    int batchSizeConstraint = ONE_MEG / 2;
+    int mergeSizeConstraint = ONE_MEG;
+
+    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    builder.configBuilder()
+        .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstraint)
+        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, batchSizeConstraint)
         .build();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    FragmentContext fragmentContext = builder.build().getFragmentContext();
+    fragmentContext.getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, mergeSizeConstraint);
+    SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions());
     long memoryLimit = 50 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
-    assertEquals(batchSizeConstaint, memManager.getPreferredSpillBatchSize());
-    assertEquals(mergeSizeConstaint, memManager.getPreferredMergeBatchSize());
-    assertEquals(memConstaint, memManager.getMemoryLimit());
+    assertEquals(batchSizeConstraint, memManager.getPreferredSpillBatchSize());
+    assertEquals(mergeSizeConstraint, memManager.getPreferredMergeBatchSize());
+    assertEquals(memConstraint, memManager.getMemoryLimit());
 
     int rowWidth = 300;
     int rowCount = 10000;
     int batchSize = rowWidth * rowCount * 2;
 
     memManager.updateEstimates(batchSize, rowWidth, rowCount);
-    verifyCalcs(sortConfig, memConstaint, memManager, batchSize, rowWidth, rowCount);
+    verifyCalcs(sortConfig, memConstraint, memManager, batchSize, rowWidth, rowCount);
   }
 
   @Test
   public void testMemoryDynamics() {
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
     long memoryLimit = 50 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);
 
@@ -471,10 +471,12 @@ public class TestExternalSortInternals extends SubOperatorTest {
     // No artificial merge limit
 
     int mergeLimitConstraint = 100;
-    DrillConfig drillConfig = new ConfigBuilder()
+    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
         .build();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    FragmentContext fragmentContext = builder.build().getFragmentContext();
+    SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions());
 
     // Allow four spill batches, 8 MB each, plus one output of 16
     // Allow for internal fragmentation
@@ -569,9 +571,7 @@ public class TestExternalSortInternals extends SubOperatorTest {
 
   @Test
   public void testMergeCalcsExtreme() {
-
-    DrillConfig drillConfig = DrillConfig.create();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    SortConfig sortConfig = new SortConfig(fixture.getFragmentContext().getConfig(), fixture.getFragmentContext().getOptions());
 
     // Force odd situation in which the spill batch is larger
     // than memory. Won't actually run, but needed to test
@@ -600,10 +600,12 @@ public class TestExternalSortInternals extends SubOperatorTest {
   public void testMergeLimit() {
     // Constrain merge width
     int mergeLimitConstraint = 5;
-    DrillConfig drillConfig = new ConfigBuilder()
+    OperatorFixture.Builder builder = new OperatorFixture.Builder();
+    builder.configBuilder()
         .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
         .build();
-    SortConfig sortConfig = new SortConfig(drillConfig);
+    FragmentContext fragmentContext = builder.build().getFragmentContext();
+    SortConfig sortConfig = new SortConfig(fragmentContext.getConfig(), fragmentContext.getOptions());
     // Plenty of memory, memory will not be a limit
     long memoryLimit = 400 * ONE_MEG;
     SortMemoryManager memManager = new SortMemoryManager(sortConfig, memoryLimit);

http://git-wip-us.apache.org/repos/asf/drill/blob/4ee207bd/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index a985478..bcc53fa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -30,6 +30,7 @@ import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -94,7 +95,7 @@ public class TestSortImpl extends DrillTest {
           .setMinorFragmentId(3)
           .setQueryId(queryId)
           .build();
-    SortConfig sortConfig = new SortConfig(opContext.getFragmentContext().getConfig());
+    SortConfig sortConfig = new SortConfig(opContext.getFragmentContext().getConfig(), opContext.getFragmentContext().getOptions());
 
     SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle,
popConfig);
     PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);


Mime
View raw message