drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From meh...@apache.org
Subject [4/5] drill git commit: DRILL-2903: General improvements to tests in TestDrillbitResilience + Added RepeatTestRule to tests that are flaky + Added Controls.Builder to create controls string in tests + Added @Ignore to failing tests (filed JIRAs)
Date Thu, 25 Jun 2015 05:30:38 GMT
DRILL-2903: General improvements to tests in TestDrillbitResilience
+ Added RepeatTestRule to tests that are flaky
+ Added Controls.Builder to create controls string in tests
+ Added @Ignore to failing tests (filed JIRAs)

Other fixes:
+ Added @Override to ScanBatch#close to avoid potential bugs
+ Added docs link in ProtobufLengthDecoder
+ Fixed logging issue in CountDownLatchImpl


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/00aa01fb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/00aa01fb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/00aa01fb

Branch: refs/heads/master
Commit: 00aa01fb90f3210d1e3027d7f759fb1085b814bd
Parents: 20ec0cd
Author: Sudheesh Katkam <skatkam@maprtech.com>
Authored: Fri May 29 16:50:23 2015 -0700
Committer: Mehant Baid <mehantr@gmail.com>
Committed: Wed Jun 24 16:24:46 2015 -0700

----------------------------------------------------------------------
 .../drill/common/util/RepeatTestRule.java       |  72 ++++
 .../org/apache/drill/common/util/TestTools.java |  10 +-
 .../java/org/apache/drill/test/DrillTest.java   |   5 +-
 .../drill/exec/physical/impl/ScanBatch.java     |   1 +
 .../drill/exec/rpc/ProtobufLengthDecoder.java   |   3 +-
 .../drill/exec/rpc/control/ControlTunnel.java   |   3 +-
 .../testing/CountDownLatchInjectionImpl.java    |   2 +-
 .../drill/exec/testing/ExecutionControls.java   |   9 +-
 .../exec/work/batch/ControlMessageHandler.java  |   8 +-
 .../apache/drill/exec/work/foreman/Foreman.java |   1 -
 .../drill/exec/work/foreman/QueryManager.java   |   2 +-
 .../exec/work/fragment/FragmentExecutor.java    |   1 +
 .../exec/server/TestDrillbitResilience.java     | 398 +++++++++----------
 .../org/apache/drill/exec/testing/Controls.java | 214 ++++++++++
 .../exec/testing/ControlsInjectionUtil.java     | 100 ++++-
 .../testing/TestCountDownLatchInjection.java    |  10 +-
 .../exec/testing/TestExceptionInjection.java    |  48 +--
 .../drill/exec/testing/TestPauseInjection.java  |  26 +-
 18 files changed, 621 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/common/src/main/java/org/apache/drill/common/util/RepeatTestRule.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/RepeatTestRule.java b/common/src/main/java/org/apache/drill/common/util/RepeatTestRule.java
new file mode 100644
index 0000000..bc2eabf
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/util/RepeatTestRule.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util;
+
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * A {@link TestRule} to repeat a test for a specified number of times. If "count" <= 0, the test is not run.
+ * For example,
+ * <pre>
+ * {@code
+ * @Test
+ * @Repeat(count = 20) // repeats the test 20 times
+ * public void unitTest() {
+ *   // code
+ * }
+ * }
+ * </pre>
+ */
+public class RepeatTestRule implements TestRule {
+
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target({ElementType.METHOD})
+  public @interface Repeat {
+    int count();
+  }
+
+  private static class RepeatStatement extends Statement {
+    private final Statement statement;
+    private final int count;
+
+    private RepeatStatement(final Statement statement, final int count) {
+      this.statement = statement;
+      this.count = count;
+    }
+
+    @Override
+    public void evaluate() throws Throwable {
+      for (int i = 0; i < count; ++i) {
+        statement.evaluate();
+      }
+    }
+  }
+
+  @Override
+  public Statement apply(final Statement base, final Description description) {
+    final Repeat repeat = description.getAnnotation(Repeat.class);
+    return repeat != null ? new RepeatStatement(base, repeat.count()) : base;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/common/src/main/java/org/apache/drill/common/util/TestTools.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/TestTools.java b/common/src/main/java/org/apache/drill/common/util/TestTools.java
index 5be8d40..1ad235b 100644
--- a/common/src/main/java/org/apache/drill/common/util/TestTools.java
+++ b/common/src/main/java/org/apache/drill/common/util/TestTools.java
@@ -24,7 +24,7 @@ import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
 
 public class TestTools {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTools.class);
+  // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTools.class);
 
   static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments()
       .toString().indexOf("-agentlib:jdwp") > 0;
@@ -38,9 +38,15 @@ public class TestTools {
     return IS_DEBUG ? new TestName() : new Timeout(timeout);
   }
 
+  /**
+   * If not enforced, the repeat rule applies only if the test is run in non-debug mode.
+   */
+  public static TestRule getRepeatRule(final boolean enforce) {
+    return enforce || !IS_DEBUG ? new RepeatTestRule() : new TestName();
+  }
+
   public static String getWorkingPath() {
     return WORKING_PATH;
   }
 
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index bbe014f..95ba936 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -23,6 +23,7 @@ import java.lang.management.MemoryMXBean;
 import java.util.List;
 
 import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.common.util.RepeatTestRule;
 import org.apache.drill.common.util.TestTools;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -38,7 +39,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 public class DrillTest {
-  static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTest.class);
+//  private static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTest.class);
 
   protected static final ObjectMapper objectMapper;
   static {
@@ -57,6 +58,8 @@ public class DrillTest {
   @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000);
   @Rule public final TestLogReporter logOutcome = LOG_OUTCOME;
 
+  @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false);
+
   @Rule public TestName TEST_NAME = new TestName();
 
   @Before

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a0560a5..6bf1280 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -356,6 +356,7 @@ public class ScanBatch implements CloseableRecordBatch {
     return WritableBatch.get(this);
   }
 
+  @Override
   public void close() {
     container.clear();
     for (ValueVector v : partitionVectors) {

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
index 4f075d3..4e03f11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
@@ -29,7 +29,8 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import com.google.protobuf.CodedInputStream;
 
 /**
- * Modified version of ProtobufVarint32FrameDecoder that avoids bytebuf copy.
+ * Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy.
+ * See the documentation there.
  */
 public class ProtobufLengthDecoder extends ByteToMessageDecoder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 16b9b63..b90912d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.FutureBitCommand;
 import org.apache.drill.exec.rpc.ListeningCommand;
@@ -57,7 +58,7 @@ public class ControlTunnel {
     manager.runCommand(b);
   }
 
-  public void resumeFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
+  public void unpauseFragment(final RpcOutcomeListener<Ack> outcomeListener, final FragmentHandle handle) {
     final SignalFragment b = new SignalFragment(outcomeListener, handle, RpcType.REQ_UNPAUSE_FRAGMENT);
     manager.runCommand(b);
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
index 561d816..7584bad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjectionImpl.java
@@ -65,7 +65,7 @@ public class CountDownLatchInjectionImpl extends Injection implements CountDownL
     try {
       latch.await();
     } catch (final InterruptedException e) {
-      logger.warn("Interrupted while awaiting in %s at %s.", siteClass.getSimpleName(), desc);
+      logger.warn("Interrupted while awaiting in {} at {}.", siteClass.getSimpleName(), desc);
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
index 836fa50..2c0afe4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.testing;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -102,7 +103,7 @@ public final class ExecutionControls {
       }
       final String jsonString = v.string_val;
       try {
-        controlsOptionMapper.readValue(jsonString, Controls.class);
+        validateControlsString(jsonString);
       } catch (final IOException e) {
         throw new ExpressionParsingException("Invalid control options string (" + jsonString + ").", e);
       }
@@ -112,10 +113,14 @@ public final class ExecutionControls {
   /**
    * POJO used to parse JSON-specified controls.
    */
-  public static class Controls {
+  private static class Controls {
     public Collection<? extends Injection> injections;
   }
 
+  public static void validateControlsString(final String jsonString) throws IOException {
+    controlsOptionMapper.readValue(jsonString, Controls.class);
+  }
+
   /**
    * The default value for controls.
    */

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 8ee7d38..9f302a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -182,7 +182,7 @@ public class ControlMessageHandler {
     //     request; it is possible that before the fragment state was updated in the QueryManager, this handler
     //     received a cancel signal.
     // (2) Unknown fragment.
-    logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+    logger.warn("Dropping request to cancel fragment. {} does not exist.", QueryIdHelper.getQueryIdentifier(handle));
     return Acks.OK;
   }
 
@@ -202,7 +202,7 @@ public class ControlMessageHandler {
     }
 
     // fragment completed or does not exist
-    logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getFragmentId(handle));
+    logger.warn("Dropping request to resume fragment. {} does not exist.", QueryIdHelper.getQueryIdentifier(handle));
     return Acks.OK;
   }
 
@@ -221,8 +221,8 @@ public class ControlMessageHandler {
       } else {
         logger.warn(
             "Dropping request for early fragment termination for path {} -> {} as path to executor unavailable.",
-            QueryIdHelper.getFragmentId(finishedReceiver.getSender()),
-            QueryIdHelper.getFragmentId(finishedReceiver.getReceiver()));
+            QueryIdHelper.getQueryIdentifier(finishedReceiver.getSender()),
+            QueryIdHelper.getQueryIdentifier(finishedReceiver.getReceiver()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index b1e5df5..716fb66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -295,7 +295,6 @@ public class Foreman implements Runnable {
       if(resume) {
         resume();
       }
-      injector.injectPause(queryContext.getExecutionControls(), "foreman-ready", logger);
 
       // restore the thread's original name
       currentThread.setName(originalName);

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index a7dfe85..9318233 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -236,7 +236,7 @@ public class QueryManager {
     for(final FragmentData data : fragmentDataSet) {
       final DrillbitEndpoint endpoint = data.getEndpoint();
       final FragmentHandle handle = data.getHandle();
-      controller.getTunnel(endpoint).resumeFragment(new SignalListener(endpoint, handle,
+      controller.getTunnel(endpoint).unpauseFragment(new SignalListener(endpoint, handle,
         SignalListener.Signal.UNPAUSE), handle);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a6bd692..3409587 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -241,6 +241,7 @@ public class FragmentExecutor implements Runnable {
       updateState(FragmentState.RUNNING);
 
       acceptExternalEvents.countDown();
+      injector.injectPause(fragmentContext.getExecutionControls(), "fragment-running", logger);
 
       final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
       logger.debug("Starting fragment {}:{} on {}:{}",

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 696aed8..ce09f68 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -39,6 +39,7 @@ import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.util.RepeatTestRule.Repeat;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.client.DrillClient;
@@ -73,6 +74,7 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.apache.drill.exec.testing.Controls;
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.foreman.Foreman;
 import org.apache.drill.exec.work.foreman.ForemanException;
@@ -90,10 +92,8 @@ import com.google.common.base.Preconditions;
 
 /**
  * Test how resilient drillbits are to throwing exceptions during various phases of query
- * execution by injecting exceptions at various points and to cancellations in various phases.
- * The test cases are mentioned in DRILL-2383.
+ * execution by injecting exceptions at various points, and to cancellations in various phases.
  */
-@Ignore
 public class TestDrillbitResilience extends DrillTest {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
 
@@ -103,6 +103,11 @@ public class TestDrillbitResilience extends DrillTest {
   private static DrillClient drillClient;
 
   /**
+   * The number of times test (that are repeated) should be repeated.
+   */
+  private static final int NUM_RUNS = 3;
+
+  /**
    * Note: Counting sys.memory executes a fragment on every drillbit. This is a better check in comparison to
    * counting sys.drillbits.
    */
@@ -160,11 +165,26 @@ public class TestDrillbitResilience extends DrillTest {
   private final static String DRILLBIT_BETA = "beta";
   private final static String DRILLBIT_GAMMA = "gamma";
 
+  /**
+   * Get the endpoint for the drillbit, if it is running
+   * @param name name of the drillbit
+   * @return endpoint of the drillbit
+   */
+  private static DrillbitEndpoint getEndpoint(final String name) {
+    @SuppressWarnings("resource")
+    final Drillbit drillbit = drillbits.get(name);
+    if (drillbit == null) {
+      throw new IllegalStateException("No Drillbit named \"" + name + "\" found.");
+    }
+    return drillbit.getContext().getEndpoint();
+  }
+
   @BeforeClass
   public static void startSomeDrillbits() throws Exception {
     // turn off the HTTP server to avoid port conflicts between the drill bits
     System.setProperty(ExecConstants.HTTP_ENABLE, "false");
 
+    // turn on error for failure in cancelled fragments
     zkHelper = new ZookeeperHelper(true);
     zkHelper.startZookeeper(1);
 
@@ -284,58 +304,10 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   /**
-   * Create a single exception injection.
-   *
-   * @param siteClass      the injection site class
-   * @param desc           the injection site description
-   * @param exceptionClass the class of the exception to throw
-   * @return the created controls JSON as string
-   */
-  private static String createSingleException(final Class<?> siteClass, final String desc,
-                                              final Class<? extends Throwable> exceptionClass) {
-    final String siteClassName = siteClass.getName();
-    final String exceptionClassName = exceptionClass.getName();
-    return "{\"injections\":[{"
-      + "\"type\":\"exception\","
-      + "\"siteClass\":\"" + siteClassName + "\","
-      + "\"desc\":\"" + desc + "\","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1,"
-      + "\"exceptionClass\":\"" + exceptionClassName + "\""
-      + "}]}";
-  }
-
-  /**
-   * Create a single exception injection.
-   *
-   * @param siteClass      the injection site class
-   * @param desc           the injection site description
-   * @param exceptionClass the class of the exception to throw
-   * @param bitName        the drillbit name which should be injected into
-   * @return the created controls JSON as string
+   * Sets a session option.
    */
-  private static String createSingleExceptionOnBit(final Class<?> siteClass, final String desc,
-                                                   final Class<? extends Throwable> exceptionClass,
-                                                   final String bitName) {
-    final String siteClassName = siteClass.getName();
-    final String exceptionClassName = exceptionClass.getName();
-    @SuppressWarnings("resource")
-    final Drillbit drillbit = drillbits.get(bitName);
-    if (drillbit == null) {
-      throw new IllegalStateException("No Drillbit named \"" + bitName + "\" found");
-    }
-
-    final DrillbitEndpoint endpoint = drillbit.getContext().getEndpoint();
-    return "{\"injections\":[{"
-      + "\"address\":\"" + endpoint.getAddress() + "\","
-      + "\"port\":\"" + endpoint.getUserPort() + "\","
-      + "\"type\":\"exception\","
-      + "\"siteClass\":\"" + siteClassName + "\","
-      + "\"desc\":\"" + desc + "\","
-      + "\"nSkip\":0,"
-      + "\"nFire\":1,"
-      + "\"exceptionClass\":\"" + exceptionClassName + "\""
-      + "}]}";
+  private static void setSessionOption(final String option, final String value) {
+    ControlsInjectionUtil.setSessionOption(drillClient, option, value);
   }
 
   /**
@@ -345,8 +317,8 @@ public class TestDrillbitResilience extends DrillTest {
    * @param exceptionClass the expected exception class
    * @param desc           the expected exception site description
    */
-  private static void assertExceptionInjected(final Throwable throwable,
-                                              final Class<? extends Throwable> exceptionClass, final String desc) {
+  private static void assertExceptionMessage(final Throwable throwable, final Class<? extends Throwable> exceptionClass,
+                                             final String desc) {
     assertTrue("Throwable was not of UserException type.", throwable instanceof UserException);
     final ExceptionWrapper cause = ((UserException) throwable).getOrCreatePBError(false).getException();
     assertEquals("Exception class names should match.", exceptionClass.getName(), cause.getExceptionClass());
@@ -354,16 +326,17 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   @Test
-  public void settingNoopInjectionsAndQuery() {
+  public void settingNoOpInjectionsAndQuery() {
     final long before = countAllocatedMemory();
 
-    final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
+    final String controls = Controls.newBuilder()
+      .addExceptionOnBit(getClass(), "noop", RuntimeException.class, getEndpoint(DRILLBIT_BETA))
+      .build();
     setControls(controls);
-    try {
-      QueryTestUtil.test(drillClient, TEST_QUERY);
-    } catch (final Exception e) {
-      fail(e.getMessage());
-    }
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    final Pair<QueryState, Exception> pair = listener.waitForCompletion();
+    assertStateCompleted(pair, QueryState.COMPLETED);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
@@ -376,17 +349,14 @@ public class TestDrillbitResilience extends DrillTest {
    * @param desc site description
    */
   private static void testForeman(final String desc) {
-    final String controls = createSingleException(Foreman.class, desc, ForemanException.class);
-    setControls(controls);
-    try {
-      QueryTestUtil.test(drillClient, TEST_QUERY);
-      fail();
-    } catch (final Exception e) {
-      assertExceptionInjected(e, ForemanException.class, desc);
-    }
+    final String controls = Controls.newBuilder()
+      .addException(Foreman.class, desc, ForemanException.class)
+      .build();
+    assertFailsWithException(controls, ForemanException.class, desc);
   }
 
   @Test
+  @Repeat(count = NUM_RUNS)
   public void foreman_runTryBeginning() {
     final long before = countAllocatedMemory();
 
@@ -397,6 +367,8 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   @Test
+  @Ignore // TODO(DRILL-3163, DRILL-3167)
+  //@Repeat(count = NUM_RUNS)
   public void foreman_runTryEnd() {
     final long before = countAllocatedMemory();
 
@@ -534,10 +506,11 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   /**
-   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the state is not
-   * as expected or if an exception is thrown.
+   * Given the result of {@link WaitUntilCompleteListener#waitForCompletion}, this method fails if the completed state
+   * is not as expected, or if an exception is thrown. The completed state could be COMPLETED or CANCELED. This state
+   * is set when {@link WaitUntilCompleteListener#queryCompleted} is called.
    */
-  private static void assertCompleteState(final Pair<QueryState, Exception> result, final QueryState expectedState) {
+  private static void assertStateCompleted(final Pair<QueryState, Exception> result, final QueryState expectedState) {
     final QueryState actualState = result.getFirst();
     final Exception exception = result.getSecond();
     if (actualState != expectedState || exception != null) {
@@ -547,58 +520,28 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   /**
-   * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
-   */
-  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
-    assertCancelledWithoutException(controls, listener, TEST_QUERY);
-  }
-
-  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener, final String query) {
-    assertCancelled(controls, query, listener);
-  }
-
-  /**
    * Given a set of controls, this method ensures that the given query completes with a CANCELED state.
    */
-  private static void assertCancelled(final String controls, final String testQuery,
-      final WaitUntilCompleteListener listener) {
+  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener,
+                                                      final String query) {
     setControls(controls);
 
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, testQuery, listener);
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertCompleteState(result, QueryState.CANCELED);
-  }
-
-  private static void setSessionOption(final String option, final String value) {
-    try {
-      final List<QueryDataBatch> results = drillClient.runQuery(QueryType.SQL,
-          String.format("alter session set `%s` = %s", option, value));
-      for (final QueryDataBatch data : results) {
-        data.release();
-      }
-    } catch(RpcException e) {
-      fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString()));
-    }
+    assertStateCompleted(result, QueryState.CANCELED);
   }
 
-  private static String createPauseInjection(final Class siteClass, final String siteDesc, final int nSkip) {
-    return "{\"injections\" : [{"
-      + "\"type\" : \"pause\"," +
-      "\"siteClass\" : \"" + siteClass.getName() + "\","
-      + "\"desc\" : \"" + siteDesc + "\","
-      + "\"nSkip\" : " + nSkip
-      + "}]}";
-  }
-
-  private static String createPauseInjection(final Class siteClass, final String siteDesc) {
-    return createPauseInjection(siteClass, siteDesc, 0);
+  /**
+   * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
+   */
+  private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
+    assertCancelledWithoutException(controls, listener, TEST_QUERY);
   }
 
-  @Test // To test pause and resume. Test hangs if resume did not happen.
+  @Test // To test pause and resume. Test hangs and times out if resume did not happen.
   public void passThrough() {
     final long before = countAllocatedMemory();
 
-
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
       @Override
       public void queryIdArrived(final QueryId queryId) {
@@ -609,20 +552,26 @@ public class TestDrillbitResilience extends DrillTest {
       }
     };
 
-    final String controls = createPauseInjection(PojoRecordReader.class, "read-next");
+    final String controls = Controls.newBuilder()
+      .addPause(PojoRecordReader.class, "read-next")
+      .build();
     setControls(controls);
 
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertCompleteState(result, QueryState.COMPLETED);
+    assertStateCompleted(result, QueryState.COMPLETED);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @Test // Cancellation TC 1: cancel before any result set is returned
-  @Ignore // DRILL-3052
-  public void cancelBeforeAnyResultsArrive() {
+  // DRILL-3052: Since root fragment is waiting on data and leaf fragments are cancelled before they send any
+  // data to root, root will never run. This test will timeout if the root did not send the final state to Foreman.
+  // DRILL-2383: Cancellation TC 1: cancel before any result set is returned.
+  @Test
+  @Ignore // TODO(DRILL-3192)
+  //@Repeat(count = NUM_RUNS)
+  public void cancelWhenQueryIdArrives() {
     final long before = countAllocatedMemory();
 
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@@ -634,14 +583,17 @@ public class TestDrillbitResilience extends DrillTest {
       }
     };
 
-    final String controls = createPauseInjection(Foreman.class, "foreman-ready");
+    final String controls = Controls.newBuilder()
+      .addPause(FragmentExecutor.class, "fragment-running")
+      .build();
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @Test // Cancellation TC 2: cancel in the middle of fetching result set
+  @Test // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
+  @Repeat(count = NUM_RUNS)
   public void cancelInMiddleOfFetchingResults() {
     final long before = countAllocatedMemory();
 
@@ -660,7 +612,9 @@ public class TestDrillbitResilience extends DrillTest {
     };
 
     // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
-    final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+    final String controls = Controls.newBuilder()
+      .addPause(ScreenCreator.class, "sending-data", 1)
+      .build();
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
@@ -668,7 +622,8 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
 
-  @Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
+  @Test // DRILL-2383: Cancellation TC 3: cancel after all result set are produced but not all are fetched
+  @Repeat(count = NUM_RUNS)
   public void cancelAfterAllResultsProduced() {
     final long before = countAllocatedMemory();
 
@@ -685,15 +640,17 @@ public class TestDrillbitResilience extends DrillTest {
       }
     };
 
-    final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
+    final String controls = Controls.newBuilder()
+      .addPause(ScreenCreator.class, "send-complete")
+      .build();
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @Test // Cancellation TC 4: cancel after everything is completed and fetched
-  @Ignore
+  @Test // DRILL-2383: Cancellation TC 4: cancel after everything is completed and fetched
+  @Repeat(count = NUM_RUNS)
   public void cancelAfterEverythingIsCompleted() {
     final long before = countAllocatedMemory();
 
@@ -710,27 +667,28 @@ public class TestDrillbitResilience extends DrillTest {
       }
     };
 
-    final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
+    final String controls = Controls.newBuilder()
+      .addPause(Foreman.class, "foreman-cleanup")
+      .build();
     assertCancelledWithoutException(controls, listener);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @Test // Completion TC 1: success
+  @Test // DRILL-2383: Completion TC 1: success
   public void successfullyCompletes() {
     final long before = countAllocatedMemory();
 
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
     QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
-    assertCompleteState(result, QueryState.COMPLETED);
+    assertStateCompleted(result, QueryState.COMPLETED);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-
   /**
    * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
    */
@@ -742,47 +700,54 @@ public class TestDrillbitResilience extends DrillTest {
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     final QueryState state = result.getFirst();
     assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
-    assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc);
+    assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
   }
 
   private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
-      final String exceptionDesc) {
+                                               final String exceptionDesc) {
     assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
   }
 
-  @Test // Completion TC 2: failed query - before query is executed - while sql parsing
+  @Test // DRILL-2383: Completion TC 2: failed query - before query is executed - while sql parsing
   public void failsWhenParsing() {
     final long before = countAllocatedMemory();
 
     final String exceptionDesc = "sql-parsing";
     final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
-    final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass);
+    final String controls = Controls.newBuilder()
+    .addException(DrillSqlWorker.class, exceptionDesc, exceptionClass)
+      .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
+  @Test // DRILL-2383: Completion TC 3: failed query - before query is executed - while sending fragments to other
+  // drillbits
   public void failsWhenSendingFragments() {
     final long before = countAllocatedMemory();
 
     final String exceptionDesc = "send-fragments";
     final Class<? extends Throwable> exceptionClass = ForemanException.class;
-    final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass);
+    final String controls = Controls.newBuilder()
+    .addException(Foreman.class, exceptionDesc, exceptionClass)
+      .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  @Test // Completion TC 4: failed query - during query execution
+  @Test // DRILL-2383: Completion TC 4: failed query - during query execution
   public void failsDuringExecution() {
     final long before = countAllocatedMemory();
 
     final String exceptionDesc = "fragment-execution";
     final Class<? extends Throwable> exceptionClass = IOException.class;
-    final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
+    final String controls = Controls.newBuilder()
+      .addException(FragmentExecutor.class, exceptionDesc, exceptionClass)
+      .build();
     assertFailsWithException(controls, exceptionClass, exceptionDesc);
 
     final long after = countAllocatedMemory();
@@ -794,11 +759,14 @@ public class TestDrillbitResilience extends DrillTest {
    * Specifically tests cancelling fragment which has {@link MergingRecordBatch} blocked waiting for data.
    */
   @Test
-  public void testInterruptingBlockedMergingRecordBatch() {
+  @Repeat(count = NUM_RUNS)
+  public void interruptingBlockedMergingRecordBatch() {
     final long before = countAllocatedMemory();
 
-    final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
-    testInterruptingBlockedFragmentsWaitingForData(control);
+    final String control = Controls.newBuilder()
+      .addPause(MergingRecordBatch.class, "waiting-for-data", 1)
+      .build();
+    interruptingBlockedFragmentsWaitingForData(control);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
@@ -809,23 +777,26 @@ public class TestDrillbitResilience extends DrillTest {
    * Specifically tests cancelling fragment which has {@link UnorderedReceiverBatch} blocked waiting for data.
    */
   @Test
-  public void testInterruptingBlockedUnorderedReceiverBatch() {
+  @Repeat(count = NUM_RUNS)
+  public void interruptingBlockedUnorderedReceiverBatch() {
     final long before = countAllocatedMemory();
 
-    final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
-    testInterruptingBlockedFragmentsWaitingForData(control);
+    final String control = Controls.newBuilder()
+      .addPause(UnorderedReceiverBatch.class, "waiting-for-data", 1)
+      .build();
+    interruptingBlockedFragmentsWaitingForData(control);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
+  private static void interruptingBlockedFragmentsWaitingForData(final String control) {
     try {
       setSessionOption(SLICE_TARGET, "1");
       setSessionOption(HASHAGG.getOptionName(), "false");
 
       final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
-      assertCancelled(control, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+      assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
     } finally {
       setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
       setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
@@ -838,7 +809,8 @@ public class TestDrillbitResilience extends DrillTest {
    * the partitioner threads.
    */
   @Test
-  public void testInterruptingPartitionerThreadFragment() {
+  @Repeat(count = NUM_RUNS)
+  public void interruptingPartitionerThreadFragment() {
     try {
       setSessionOption(SLICE_TARGET, "1");
       setSessionOption(HASHAGG.getOptionName(), "true");
@@ -846,22 +818,13 @@ public class TestDrillbitResilience extends DrillTest {
 
       final long before = countAllocatedMemory();
 
-      final String controls = "{\"injections\" : ["
-        + "{"
-        + "\"type\" : \"latch\","
-        + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
-        + "\"desc\" : \"partitioner-sender-latch\""
-        + "},"
-        + "{"
-        + "\"type\" : \"pause\","
-        + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
-        + "\"desc\" : \"wait-for-fragment-interrupt\","
-        + "\"nSkip\" : 1"
-        + "}" +
-        "]}";
+      final String controls = Controls.newBuilder()
+      .addLatch(PartitionerDecorator.class, "partitioner-sender-latch")
+      .addPause(PartitionerDecorator.class, "wait-for-fragment-interrupt", 1)
+      .build();
 
       final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
-      assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+      assertCancelledWithoutException(controls, new ListenerThatCancelsQueryAfterFirstBatchOfData(), query);
 
       final long after = countAllocatedMemory();
       assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
@@ -874,85 +837,104 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   @Test
-  public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
-
+  @Ignore // TODO(DRILL-3193)
+  //@Repeat(count = NUM_RUNS)
+  public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
     final long before = countAllocatedMemory();
 
-    final String control =
-      createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
-    assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+    final String control = Controls.newBuilder()
+      .addPause(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1)
+      .build();
+    assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData());
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
   @Test
+  @Repeat(count = NUM_RUNS)
   public void memoryLeaksWhenCancelled() {
     setSessionOption(SLICE_TARGET, "10");
 
     final long before = countAllocatedMemory();
 
-    final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
-    String query = null;
     try {
-      query = BaseTestQuery.getFile("queries/tpch/09.sql");
-    } catch (final IOException e) {
-      fail("Failed to get query file: " + e);
-    }
+      final String controls = Controls.newBuilder()
+        .addPause(ScreenCreator.class, "sending-data", 1)
+        .build();
+      String query = null;
+      try {
+        query = BaseTestQuery.getFile("queries/tpch/09.sql");
+        query = query.substring(0, query.length() - 1); // drop the ";"
+      } catch (final IOException e) {
+        fail("Failed to get query file: " + e);
+      }
 
-    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
-      private boolean cancelRequested = false;
+      final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
+        private volatile boolean cancelRequested = false;
 
-      @Override
-      public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
-        if (!cancelRequested) {
-          check(queryId != null, "Query id should not be null, since we have waited long enough.");
-          cancelAndResume();
-          cancelRequested = true;
+        @Override
+        public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+          if (!cancelRequested) {
+            check(queryId != null, "Query id should not be null, since we have waited long enough.");
+            cancelAndResume();
+            cancelRequested = true;
+          }
+          result.release();
         }
-        result.release();
-      }
-    };
-
-    assertCancelledWithoutException(controls, listener, query.substring(0, query.length() - 1));
+      };
 
-    final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+      assertCancelledWithoutException(controls, listener, query);
 
-    setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+      final long after = countAllocatedMemory();
+      assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+    } finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+    }
   }
 
   @Test
+  @Ignore // TODO(DRILL-3194)
+  //@Repeat(count = NUM_RUNS)
   public void memoryLeaksWhenFailed() {
     setSessionOption(SLICE_TARGET, "10");
 
     final long before = countAllocatedMemory();
 
-    final String exceptionDesc = "fragment-execution";
-    final Class<? extends Throwable> exceptionClass = IOException.class;
-    final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
-    String query = null;
     try {
-      query = BaseTestQuery.getFile("queries/tpch/09.sql");
-    } catch (final IOException e) {
-      fail("Failed to get query file: " + e);
-    }
+      final String exceptionDesc = "fragment-execution";
+      final Class<? extends Throwable> exceptionClass = IOException.class;
+      final String controls = Controls.newBuilder()
+        .addException(FragmentExecutor.class, exceptionDesc, exceptionClass)
+        .build();
+
+      String query = null;
+      try {
+        query = BaseTestQuery.getFile("queries/tpch/09.sql");
+        query = query.substring(0, query.length() - 1); // drop the ";"
+      } catch (final IOException e) {
+        fail("Failed to get query file: " + e);
+      }
 
-    assertFailsWithException(controls, exceptionClass, exceptionDesc, query.substring(0, query.length() - 1));
+      assertFailsWithException(controls, exceptionClass, exceptionDesc, query);
 
-    final long after = countAllocatedMemory();
-    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+      final long after = countAllocatedMemory();
+      assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
 
-    setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+    } finally {
+      setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+    }
   }
 
   @Test // DRILL-3065
-  public void testInterruptingAfterMSorterSorting() {
+  public void failsAfterMSorterSorting() {
     final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
     Class<? extends Exception> typeOfException = RuntimeException.class;
 
     final long before = countAllocatedMemory();
-    final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SORT, typeOfException);
+    final String controls = Controls.newBuilder()
+      .addException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SORT, typeOfException)
+      .build();
     assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query);
 
     final long after = countAllocatedMemory();
@@ -960,28 +942,30 @@ public class TestDrillbitResilience extends DrillTest {
   }
 
   @Test // DRILL-3085
-  public void testInterruptingAfterMSorterSetup() {
+  public void failsAfterMSorterSetup() {
     final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
     Class<? extends Exception> typeOfException = RuntimeException.class;
 
     final long before = countAllocatedMemory();
-    final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, typeOfException);
+    final String controls = Controls.newBuilder()
+    .addException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, typeOfException)
+      .build();
     assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query);
 
     final long after = countAllocatedMemory();
     assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
   }
 
-  private long countAllocatedMemory() {
+  private static long countAllocatedMemory() {
     // wait to make sure all fragments finished cleaning up
     try {
       Thread.sleep(2000);
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       // just ignore
     }
 
     long allocated = 0;
-    for (String name : drillbits.keySet()) {
+    for (final String name : drillbits.keySet()) {
       allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory();
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
new file mode 100644
index 0000000..ef0e4a8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/Controls.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.List;
+
+public class Controls {
+
+  private static final String EMPTY_CONTROLS = "{\"injections\" : []}";
+
+  /**
+   * Returns a builder that can be used to add injections.
+   *
+   * @return a builder instance
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Constructor. To disallow building controls without the builder.
+   */
+  private Controls() {
+  }
+
+  /**
+   * A builder to create a controls string, a JSON that holds a list of injections that are to be injected in code for
+   * testing purposes. This string is passed through the
+   * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
+   * <p/>
+   * The builder class can be reused; it is safe to call build() multiple times to build multiple controls strings in
+   * series. Each new controls string contains all the injections added to the builder before it.
+   */
+  public static class Builder {
+
+    private final List<String> injections = Lists.newArrayList();
+
+    public Builder() {
+    }
+
+    /**
+     * Adds an exception injection to the controls builder with the given parameters.
+     *
+     * @param siteClass      class where the exception should be thrown
+     * @param desc           descriptor for the exception site in the site class
+     * @param exceptionClass class of the exception to throw
+     * @param nSkip          number of times to skip before firing
+     * @param nFire          number of times to fire the exception
+     * @return this builder
+     */
+    public Builder addException(final Class<?> siteClass, final String desc,
+                                final Class<? extends Throwable> exceptionClass, final int nSkip,
+                                final int nFire) {
+      injections.add(ControlsInjectionUtil.createException(siteClass, desc, nSkip, nFire, exceptionClass));
+      return this;
+    }
+
+    /**
+     * Adds an exception injection to the controls builder with the given parameters. The injection is not skipped, and
+     * the exception is thrown when execution reaches the site.
+     *
+     * @param siteClass      class where the exception should be thrown
+     * @param desc           descriptor for the exception site in the site class
+     * @param exceptionClass class of the exception to throw
+     * @return this builder
+     */
+    public Builder addException(final Class<?> siteClass, final String desc,
+                                final Class<? extends Throwable> exceptionClass) {
+      return addException(siteClass, desc, exceptionClass, 0, 1);
+    }
+
+    /**
+     * Adds an exception injection (for the specified drillbit) to the controls builder with the given parameters.
+     *
+     * @param siteClass      class where the exception should be thrown
+     * @param desc           descriptor for the exception site in the site class
+     * @param exceptionClass class of the exception to throw
+     * @param endpoint       the endpoint of the drillbit on which to inject
+     * @param nSkip          number of times to skip before firing
+     * @param nFire          number of times to fire the exception
+     * @return this builder
+     */
+    public Builder addExceptionOnBit(final Class<?> siteClass, final String desc,
+                                     final Class<? extends Throwable> exceptionClass,
+                                     final DrillbitEndpoint endpoint, final int nSkip,
+                                     final int nFire) {
+      injections.add(ControlsInjectionUtil.createExceptionOnBit(siteClass, desc, nSkip, nFire, exceptionClass,
+        endpoint));
+      return this;
+    }
+
+    /**
+     * Adds an exception injection (for the specified drillbit) to the controls builder with the given parameters. The
+     * injection is not skipped, and the exception is thrown when execution reaches the site on the specified drillbit.
+     *
+     * @param siteClass      class where the exception should be thrown
+     * @param desc           descriptor for the exception site in the site class
+     * @param exceptionClass class of the exception to throw
+     * @param endpoint       endpoint of the drillbit on which to inject
+     * @return this builder
+     */
+    public Builder addExceptionOnBit(final Class<?> siteClass, final String desc,
+                                     final Class<? extends Throwable> exceptionClass,
+                                     final DrillbitEndpoint endpoint) {
+      return addExceptionOnBit(siteClass, desc, exceptionClass, endpoint, 0, 1);
+    }
+
+    /**
+     * Adds a pause injection to the controls builder with the given parameters.
+     *
+     * @param siteClass class where the pause should happen
+     * @param desc      descriptor for the pause site in the site class
+     * @param nSkip     number of times to skip before firing
+     * @return this builder
+     */
+    public Builder addPause(final Class siteClass, final String desc, final int nSkip) {
+      injections.add(ControlsInjectionUtil.createPause(siteClass, desc, nSkip));
+      return this;
+    }
+
+    /**
+     * Adds a pause injection to the controls builder with the given parameters. The pause is not skipped i.e. the pause
+     * happens when execution reaches the site.
+     *
+     * @param siteClass class where the pause should happen
+     * @param desc      descriptor for the pause site in the site class
+     * @return this builder
+     */
+    public Builder addPause(final Class siteClass, final String desc) {
+      return addPause(siteClass, desc, 0);
+    }
+
+    /**
+     * Adds a pause injection (for the specified drillbit) to the controls builder with the given parameters.
+     *
+     * @param siteClass class where the pause should happen
+     * @param desc      descriptor for the pause site in the site class
+     * @param nSkip     number of times to skip before firing
+     * @return this builder
+     */
+    public Builder addPauseOnBit(final Class siteClass, final String desc,
+                                 final DrillbitEndpoint endpoint, final int nSkip) {
+      injections.add(ControlsInjectionUtil.createPauseOnBit(siteClass, desc, nSkip, endpoint));
+      return this;
+    }
+
+    /**
+     * Adds a pause injection (for the specified drillbit) to the controls builder with the given parameters. The pause
+     * is not skipped i.e. the pause happens when execution reaches the site.
+     *
+     * @param siteClass class where the pause should happen
+     * @param desc      descriptor for the pause site in the site class
+     * @return this builder
+     */
+    public Builder addPauseOnBit(final Class siteClass, final String desc,
+                                 final DrillbitEndpoint endpoint) {
+      return addPauseOnBit(siteClass, desc, endpoint, 0);
+    }
+
+    /**
+     * Adds a count down latch to the controls builder with the given parameters.
+     *
+     * @param siteClass class where the latch should be injected
+     * @param desc      descriptor for the latch in the site class
+     * @return this builder
+     */
+    public Builder addLatch(final Class siteClass, final String desc) {
+      injections.add(ControlsInjectionUtil.createLatch(siteClass, desc));
+      return this;
+    }
+
+    /**
+     * Builds the controls string.
+     *
+     * @return a validated controls string with the added injections
+     * @throws java.lang.AssertionError if controls cannot be validated using
+     *                                  {@link org.apache.drill.exec.testing.ExecutionControls#controlsOptionMapper}
+     */
+    public String build() {
+      if (injections.size() == 0) {
+        return EMPTY_CONTROLS;
+      }
+
+      final StringBuilder builder = new StringBuilder("{ \"injections\" : [");
+      for (final String injection : injections) {
+        builder.append(injection)
+          .append(",");
+      }
+      builder.setLength(builder.length() - 1); // remove the extra ","
+      builder.append("]}");
+      final String controls = builder.toString();
+      ControlsInjectionUtil.validateControlsString(controls);
+      return controls;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
index 346c6dd..81f2015 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ControlsInjectionUtil.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.testing;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -26,10 +26,10 @@ import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.rpc.user.UserSession.QueryCountIncrementer;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.options.OptionValue;
-import org.apache.drill.exec.testing.ExecutionControls.Controls;
 
 import java.util.List;
 
+import static org.apache.drill.exec.ExecConstants.DRILLBIT_CONTROL_INJECTIONS;
 import static org.junit.Assert.fail;
 
 /**
@@ -49,44 +49,118 @@ public class ControlsInjectionUtil {
     }
   };
 
-  public static void setControls(final DrillClient drillClient, final String controls) {
-    validateControlsString(controls);
+  public static void setSessionOption(final DrillClient drillClient, final String option, final String value) {
     try {
       final List<QueryDataBatch> results = drillClient.runQuery(
-        UserBitShared.QueryType.SQL, String.format("alter session set `%s` = '%s'",
-          ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls));
+        UserBitShared.QueryType.SQL, String.format("ALTER session SET `%s` = %s",
+          option, value));
       for (final QueryDataBatch data : results) {
         data.release();
       }
-    } catch (RpcException e) {
-      fail("Could not set controls options: " + e.toString());
+    } catch (final RpcException e) {
+      fail("Could not set option: " + e.toString());
     }
   }
 
+  public static void setControls(final DrillClient drillClient, final String controls) {
+    validateControlsString(controls);
+    setSessionOption(drillClient, DRILLBIT_CONTROL_INJECTIONS, "'" + controls + "'");
+  }
+
   public static void setControls(final UserSession session, final String controls) {
     validateControlsString(controls);
     final OptionValue opValue = OptionValue.createString(OptionValue.OptionType.SESSION,
-      ExecConstants.DRILLBIT_CONTROL_INJECTIONS, controls);
+      DRILLBIT_CONTROL_INJECTIONS, controls);
 
     final OptionManager options = session.getOptions();
     try {
       options.getAdmin().validate(opValue);
       options.setOption(opValue);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       fail("Could not set controls options: " + e.getMessage());
     }
     incrementer.increment(session); // to simulate that a query completed
   }
 
-  private static void validateControlsString(final String controls) {
+  public static void validateControlsString(final String controls) {
     try {
-      ExecutionControls.controlsOptionMapper.readValue(controls, Controls.class);
-    } catch (Exception e) {
+      ExecutionControls.validateControlsString(controls);
+    } catch (final Exception e) {
       fail("Could not validate controls JSON: " + e.getMessage());
     }
   }
 
   /**
+   * Create a single exception injection. Note this format is not directly accepted by the injection mechanism. Use
+   * the {@link Controls} to build exceptions.
+   */
+  public static String createException(final Class<?> siteClass, final String desc, final int nSkip,
+                                       final int nFire, final Class<? extends Throwable> exceptionClass) {
+    final String siteClassName = siteClass.getName();
+    final String exceptionClassName = exceptionClass.getName();
+    return "{ \"type\":\"exception\","
+      + "\"siteClass\":\"" + siteClassName + "\","
+      + "\"desc\":\"" + desc + "\","
+      + "\"nSkip\":" + nSkip + ","
+      + "\"nFire\":" + nFire + ","
+      + "\"exceptionClass\":\"" + exceptionClassName + "\"}";
+  }
+
+  /**
+   * Create a single exception injection on a specific bit. Note this format is not directly accepted by the injection
+   * mechanism. Use the {@link Controls} to build exceptions.
+   */
+  public static String createExceptionOnBit(final Class<?> siteClass, final String desc, final int nSkip,
+                                            final int nFire, final Class<? extends Throwable> exceptionClass,
+                                            final DrillbitEndpoint endpoint) {
+    final String siteClassName = siteClass.getName();
+    final String exceptionClassName = exceptionClass.getName();
+    return "{ \"type\":\"exception\","
+      + "\"siteClass\":\"" + siteClassName + "\","
+      + "\"desc\":\"" + desc + "\","
+      + "\"nSkip\":" + nSkip + ","
+      + "\"nFire\":" + nFire + ","
+      + "\"exceptionClass\":\"" + exceptionClassName + "\","
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\"}";
+  }
+
+  /**
+   * Create a pause injection. Note this format is not directly accepted by the injection mechanism. Use the
+   * {@link Controls} to build exceptions.
+   */
+  public static String createPause(final Class siteClass, final String desc, final int nSkip) {
+    return "{ \"type\" : \"pause\"," +
+      "\"siteClass\" : \"" + siteClass.getName() + "\","
+      + "\"desc\" : \"" + desc + "\","
+      + "\"nSkip\" : " + nSkip + "}";
+  }
+
+  /**
+   * Create a pause injection on a specific bit. Note this format is not directly accepted by the injection
+   * mechanism. Use the {@link Controls} to build exceptions.
+   */
+  public static String createPauseOnBit(final Class siteClass, final String desc, final int nSkip,
+                                        final DrillbitEndpoint endpoint) {
+    return "{ \"type\" : \"pause\"," +
+      "\"siteClass\" : \"" + siteClass.getName() + "\","
+      + "\"desc\" : \"" + desc + "\","
+      + "\"nSkip\" : " + nSkip + ","
+      + "\"address\":\"" + endpoint.getAddress() + "\","
+      + "\"port\":\"" + endpoint.getUserPort() + "\"}";
+  }
+
+  /**
+   * Create a latch injection. Note this format is not directly accepted by the injection mechanism. Use the
+   * {@link Controls} to build exceptions.
+   */
+  public static String createLatch(final Class siteClass, final String desc) {
+    return "{ \"type\":\"latch\"," +
+      "\"siteClass\":\"" + siteClass.getName() + "\","
+      + "\"desc\":\"" + desc + "\"}";
+  }
+
+  /**
    * Clears all the controls.
    */
   public static void clearControls(final DrillClient client) {

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
index c0d9fd4..c911f79 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestCountDownLatchInjection.java
@@ -126,13 +126,11 @@ public class TestCountDownLatchInjection extends BaseTestQuery {
     final ExtendedLatch trigger = new ExtendedLatch(1);
     final Pointer<Long> countingDownTime = new Pointer<>();
 
-    final String jsonString = "{\"injections\":[{"
-      + "\"type\":\"latch\"," +
-      "\"siteClass\":\"org.apache.drill.exec.testing.TestCountDownLatchInjection$DummyClass\","
-      + "\"desc\":\"" + DummyClass.LATCH_NAME + "\""
-      + "}]}";
+    final String controls = Controls.newBuilder()
+      .addLatch(DummyClass.class, DummyClass.LATCH_NAME)
+      .build();
 
-    ControlsInjectionUtil.setControls(session, jsonString);
+    ControlsInjectionUtil.setControls(session, controls);
 
     final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
index 305f0b7..40620c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java
@@ -22,7 +22,6 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ZookeeperHelper;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserProtos.UserProperties;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -148,37 +147,13 @@ public class TestExceptionInjection extends BaseTestQuery {
     }
   }
 
-  private static String createException(final String desc, final int nSkip, final int nFire,
-                                        final String exceptionClass) {
-    return "{\"injections\":[{"
-      + "\"type\":\"exception\","
-      + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
-      + "\"desc\":\"" + desc + "\","
-      + "\"nSkip\": " + nSkip + ","
-      + "\"nFire\": " + nFire + ","
-      + "\"exceptionClass\":\"" + exceptionClass + "\""
-      + "}]}";
-  }
-
-  private static String createExceptionOnBit(final DrillbitEndpoint endpoint, final String desc, final int nSkip,
-                                             final int nFire, final String exceptionClass) {
-    return "{\"injections\":[{"
-      + "\"address\":\"" + endpoint.getAddress() + "\","
-      + "\"port\":\"" + endpoint.getUserPort() + "\","
-      + "\"type\":\"exception\","
-      + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\","
-      + "\"desc\":\"" + desc + "\","
-      + "\"nSkip\": " + nSkip + ","
-      + "\"nFire\": " + nFire + ","
-      + "\"exceptionClass\":\"" + exceptionClass + "\""
-      + "}]}";
-  }
-
   @SuppressWarnings("static-method")
   @Test
   public void checkedInjection() {
     // set the injection via the parsing POJOs
-    final String controls = createException(DummyClass.THROWS_IOEXCEPTION, 0, 1, IOException.class.getName());
+    final String controls = Controls.newBuilder()
+      .addException(DummyClass.class, DummyClass.THROWS_IOEXCEPTION, IOException.class, 0, 1)
+      .build();
     ControlsInjectionUtil.setControls(session, controls);
 
     final QueryContext context = new QueryContext(session, bits[0].getContext());
@@ -204,8 +179,10 @@ public class TestExceptionInjection extends BaseTestQuery {
     final String passthroughDesc = "<<injected from descPassthrough>>";
     final int nSkip = 7;
     final int nFire = 3;
-    final String exceptionClass = RuntimeException.class.getName();
-    final String controls = createException(passthroughDesc, nSkip, nFire, exceptionClass);
+    final Class<? extends Throwable> exceptionClass = RuntimeException.class;
+    final String controls = Controls.newBuilder()
+      .addException(DummyClass.class, passthroughDesc, exceptionClass, nSkip, nFire)
+      .build();
     ControlsInjectionUtil.setControls(session, controls);
 
     final QueryContext context = new QueryContext(session, bits[0].getContext());
@@ -219,7 +196,7 @@ public class TestExceptionInjection extends BaseTestQuery {
 
     // these should throw
     for (int i = 0; i < nFire; ++i) {
-      assertPassthroughThrows(dummyClass, exceptionClass, passthroughDesc);
+      assertPassthroughThrows(dummyClass, exceptionClass.getName(), passthroughDesc);
     }
 
     // this shouldn't throw
@@ -260,10 +237,11 @@ public class TestExceptionInjection extends BaseTestQuery {
     final String passthroughDesc = "<<injected from descPassthrough>>";
     final int nSkip = 7;
     final int nFire = 3;
-    final String exceptionClass = RuntimeException.class.getName();
+    final Class<? extends Throwable> exceptionClass = RuntimeException.class;
     // only drillbit1's (address, port)
-    final String controls = createExceptionOnBit(drillbitContext1.getEndpoint(), passthroughDesc, nSkip, nFire,
-      exceptionClass);
+    final String controls = Controls.newBuilder()
+    .addExceptionOnBit(DummyClass.class, passthroughDesc, exceptionClass, drillbitContext1.getEndpoint(), nSkip, nFire)
+      .build();
 
     ControlsInjectionUtil.setControls(session, controls);
 
@@ -278,7 +256,7 @@ public class TestExceptionInjection extends BaseTestQuery {
 
       // these should throw
       for (int i = 0; i < nFire; ++i) {
-        assertPassthroughThrows(class1, exceptionClass, passthroughDesc);
+        assertPassthroughThrows(class1, exceptionClass.getName(), passthroughDesc);
       }
 
       // this shouldn't throw

http://git-wip-us.apache.org/repos/asf/drill/blob/00aa01fb/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
index 2af6d95..f07f676 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestPauseInjection.java
@@ -120,14 +120,11 @@ public class TestPauseInjection extends BaseTestQuery {
     final ExtendedLatch trigger = new ExtendedLatch(1);
     final Pointer<Exception> ex = new Pointer<>();
 
-    final String jsonString = "{\"injections\":[{"
-      + "\"type\":\"pause\"," +
-      "\"siteClass\":\"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
-      + "\"desc\":\"" + DummyClass.PAUSES + "\","
-      + "\"nSkip\":0"
-      + "}]}";
+    final String controls = Controls.newBuilder()
+      .addPause(DummyClass.class, DummyClass.PAUSES)
+      .build();
 
-    ControlsInjectionUtil.setControls(session, jsonString);
+    ControlsInjectionUtil.setControls(session, controls);
 
     final QueryContext queryContext = new QueryContext(session, bits[0].getContext());
 
@@ -174,16 +171,11 @@ public class TestPauseInjection extends BaseTestQuery {
       .build();
 
     final DrillbitEndpoint drillbitEndpoint1 = drillbitContext1.getEndpoint();
-    final String jsonString = "{\"injections\":[{"
-      + "\"type\" : \"pause\"," +
-      "\"siteClass\" : \"org.apache.drill.exec.testing.TestPauseInjection$DummyClass\","
-      + "\"desc\" : \"" + DummyClass.PAUSES + "\","
-      + "\"nSkip\" : 0, "
-      + "\"address\" : \"" + drillbitEndpoint1.getAddress() + "\","
-      + "\"port\" : " + drillbitEndpoint1.getUserPort()
-      + "}]}";
-
-    ControlsInjectionUtil.setControls(session, jsonString);
+    final String controls = Controls.newBuilder()
+      .addPauseOnBit(DummyClass.class, DummyClass.PAUSES, drillbitEndpoint1)
+      .build();
+
+    ControlsInjectionUtil.setControls(session, controls);
 
     {
       final long expectedDuration = 1000L;


Mime
View raw message