drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [8/9] git commit: DRILL-1113: Add configuration in DrillClient to encode complex/repeated types as JSON string
Date Wed, 09 Jul 2014 22:00:51 GMT
DRILL-1113: Add configuration in DrillClient to encode complex/repeated types as JSON string


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2dcf8cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2dcf8cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2dcf8cb8

Branch: refs/heads/master
Commit: 2dcf8cb8ab53379f077979ecf3fe38b34994bf7c
Parents: 9abffa1
Author: Aditya Kishore <aditya@maprtech.com>
Authored: Mon Jul 7 15:03:12 2014 -0700
Committer: Aditya Kishore <aditya@maprtech.com>
Committed: Wed Jul 9 11:55:46 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/hbase/BaseHBaseTest.java   | 27 ------------------
 .../drill/hbase/TestHBaseCFAsJSONString.java    |  2 +-
 .../org/apache/drill/exec/ExecConstants.java    |  2 +-
 .../apache/drill/exec/client/DrillClient.java   |  8 ++++--
 .../apache/drill/exec/rpc/user/UserClient.java  | 17 ++----------
 .../src/main/resources/drill-module.conf        |  4 +++
 .../java/org/apache/drill/BaseTestQuery.java    | 29 ++++++++++++++++++++
 7 files changed, 42 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index e6a5474..36cf15b 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -52,8 +52,6 @@ public class BaseHBaseTest extends BaseTestQuery {
 
   @Rule public TestName TEST_NAME = new TestName();
 
-  private int[] columnWidths = new int[] { 8 };
-
   @Before
   public void printID() throws Exception {
     System.out.printf("Running %s#%s\n", getClass().getName(), TEST_NAME.getMethodName());
@@ -80,14 +78,6 @@ public class BaseHBaseTest extends BaseTestQuery {
     HBaseTestsSuite.tearDownCluster();
   }
 
-  protected void setColumnWidth(int columnWidth) {
-    this.columnWidths = new int[] { columnWidth };
-  }
-
-  protected void setColumnWidths(int[] columnWidths) {
-    this.columnWidths = columnWidths;
-  }
-
   protected String getPlanText(String planFile, String tableName) throws IOException {
     return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)
         .replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\"
: " + HBaseTestsSuite.getZookeeperPort())
@@ -111,23 +101,6 @@ public class BaseHBaseTest extends BaseTestQuery {
     printResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException
{
-    int rowCount = 0;
-    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    for(QueryResultBatch result : results){
-      rowCount += result.getHeader().getRowCount();
-      loader.load(result.getHeader().getDef(), result.getData());
-      if (loader.getRecordCount() <= 0) {
-        break;
-      }
-      VectorUtil.showVectorAccessibleContent(loader, columnWidths);
-      loader.clear();
-      result.release();
-    }
-    System.out.println("Total record count: " + rowCount);
-    return rowCount;
-  }
-
   private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount)
throws SchemaChangeException {
     int rowCount = printResult(results);
     if (expectedRowCount != -1) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
index 9cc0356..4b4b648 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -39,7 +39,7 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest {
   }
 
   @AfterClass
-  public static void closeClient() throws IOException {
+  public static void closeMyClient() throws IOException {
     if(client != null) client.close();
     client = parent_client;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 7681dd5..c2f459e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -75,7 +75,7 @@ public interface ExecConstants {
   public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
   public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
 
-
+  public static final String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
 
   public static final String OUTPUT_FORMAT_OPTION = "store.format";
   public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION,
"parquet");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 3a9d015..6690bf5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -71,7 +71,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   private final BufferAllocator allocator;
   private int reconnectTimes;
   private int reconnectDelay;
-  private boolean supportComplexTypes = true;
+  private boolean supportComplexTypes;
   private final boolean ownsZkConnection;
   private final boolean ownsAllocator;
 
@@ -99,6 +99,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     this.clusterCoordinator = coordinator;
     this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
     this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY);
+    this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES);
   }
 
   public DrillConfig getConfig(){
@@ -161,7 +162,8 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     // just use the first endpoint for now
     DrillbitEndpoint endpoint = endpoints.iterator().next();
 
-    this.client = new UserClient(allocator, TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS),
"Client-"));
+    this.client = new UserClient(supportComplexTypes, allocator,
+                                 TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS),
"Client-"));
     logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
     connect(endpoint);
     connected = true;
@@ -193,7 +195,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   private void connect(DrillbitEndpoint endpoint) throws RpcException {
     FutureHandler f = new FutureHandler();
     try {
-      client.setSupportComplexTypes(supportComplexTypes).connect(f, endpoint, props);
+      client.connect(f, endpoint, props);
       f.checkedGet();
     } catch (InterruptedException e) {
       throw new RpcException(e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index ad885f6..d49a9fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -46,8 +46,9 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
 
   private boolean supportComplexTypes = true;
 
-  public UserClient(BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
+  public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup)
{
     super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class,
BitToUserHandshake.PARSER);
+    this.supportComplexTypes = supportComplexTypes;
   }
 
   public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
@@ -112,18 +113,4 @@ public class UserClient extends BasicClientWithConnection<RpcType,
UserToBitHand
     return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
   }
 
-  /**
-   * Sets whether the application is willing to accept complex types (Map, Arrays) in the
returned result set.
-   * Default is {@code true}. If set to {@code false}, the complex types are returned as
JSON encoded VARCHAR type.
-   *
-   * @throws IllegalStateException if called after a connection has been established.
-   */
-  public UserClient setSupportComplexTypes(boolean supportComplexTypes) {
-    if (isActive()) {
-      throw new IllegalStateException("Attempted to modify connection property after connection
has been established.");
-    }
-    this.supportComplexTypes = supportComplexTypes;
-    return this;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8a69ec1..81a2bc2 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -19,6 +19,10 @@
 
 drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
 
+drill.client: {
+  supports-complex-types: true
+}
+
 drill.exec: {
   cluster-id: "drillbits1"
   rpc: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 7cfe51a..07a2075 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -32,10 +32,12 @@ import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.client.PrintingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
@@ -55,6 +57,8 @@ import com.google.common.io.Resources;
 public class BaseTestQuery extends ExecTest{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
+  private int[] columnWidths = new int[] { 8 };
+
   private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
 
   @SuppressWarnings("serial")
@@ -227,4 +231,29 @@ public class BaseTestQuery extends ExecTest{
     }
   }
 
+  protected void setColumnWidth(int columnWidth) {
+    this.columnWidths = new int[] { columnWidth };
+  }
+
+  protected void setColumnWidths(int[] columnWidths) {
+    this.columnWidths = columnWidths;
+  }
+
+  protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException
{
+    int rowCount = 0;
+    RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+    for(QueryResultBatch result : results){
+      rowCount += result.getHeader().getRowCount();
+      loader.load(result.getHeader().getDef(), result.getData());
+      if (loader.getRecordCount() <= 0) {
+        break;
+      }
+      VectorUtil.showVectorAccessibleContent(loader, columnWidths);
+      loader.clear();
+      result.release();
+    }
+    System.out.println("Total record count: " + rowCount);
+    return rowCount;
+  }
+
 }


Mime
View raw message