DRILL-1113: Add configuration in DrillClient to encode complex/repeated types as JSON string
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2dcf8cb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2dcf8cb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2dcf8cb8
Branch: refs/heads/master
Commit: 2dcf8cb8ab53379f077979ecf3fe38b34994bf7c
Parents: 9abffa1
Author: Aditya Kishore <aditya@maprtech.com>
Authored: Mon Jul 7 15:03:12 2014 -0700
Committer: Aditya Kishore <aditya@maprtech.com>
Committed: Wed Jul 9 11:55:46 2014 -0700
----------------------------------------------------------------------
.../org/apache/drill/hbase/BaseHBaseTest.java | 27 ------------------
.../drill/hbase/TestHBaseCFAsJSONString.java | 2 +-
.../org/apache/drill/exec/ExecConstants.java | 2 +-
.../apache/drill/exec/client/DrillClient.java | 8 ++++--
.../apache/drill/exec/rpc/user/UserClient.java | 17 ++----------
.../src/main/resources/drill-module.conf | 4 +++
.../java/org/apache/drill/BaseTestQuery.java | 29 ++++++++++++++++++++
7 files changed, 42 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index e6a5474..36cf15b 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -52,8 +52,6 @@ public class BaseHBaseTest extends BaseTestQuery {
@Rule public TestName TEST_NAME = new TestName();
- private int[] columnWidths = new int[] { 8 };
-
@Before
public void printID() throws Exception {
System.out.printf("Running %s#%s\n", getClass().getName(), TEST_NAME.getMethodName());
@@ -80,14 +78,6 @@ public class BaseHBaseTest extends BaseTestQuery {
HBaseTestsSuite.tearDownCluster();
}
- protected void setColumnWidth(int columnWidth) {
- this.columnWidths = new int[] { columnWidth };
- }
-
- protected void setColumnWidths(int[] columnWidths) {
- this.columnWidths = columnWidths;
- }
-
protected String getPlanText(String planFile, String tableName) throws IOException {
return Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8)
.replaceFirst("\"hbase\\.zookeeper\\.property\\.clientPort\".*:.*\\d+", "\"hbase.zookeeper.property.clientPort\"
: " + HBaseTestsSuite.getZookeeperPort())
@@ -111,23 +101,6 @@ public class BaseHBaseTest extends BaseTestQuery {
printResultAndVerifyRowCount(results, expectedRowCount);
}
- protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException
{
- int rowCount = 0;
- RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- for(QueryResultBatch result : results){
- rowCount += result.getHeader().getRowCount();
- loader.load(result.getHeader().getDef(), result.getData());
- if (loader.getRecordCount() <= 0) {
- break;
- }
- VectorUtil.showVectorAccessibleContent(loader, columnWidths);
- loader.clear();
- result.release();
- }
- System.out.println("Total record count: " + rowCount);
- return rowCount;
- }
-
private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount)
throws SchemaChangeException {
int rowCount = printResult(results);
if (expectedRowCount != -1) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
index 9cc0356..4b4b648 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -39,7 +39,7 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest {
}
@AfterClass
- public static void closeClient() throws IOException {
+ public static void closeMyClient() throws IOException {
if(client != null) client.close();
client = parent_client;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 7681dd5..c2f459e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -75,7 +75,7 @@ public interface ExecConstants {
public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
-
+ public static final String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
public static final String OUTPUT_FORMAT_OPTION = "store.format";
public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION,
"parquet");
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 3a9d015..6690bf5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -71,7 +71,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private final BufferAllocator allocator;
private int reconnectTimes;
private int reconnectDelay;
- private boolean supportComplexTypes = true;
+ private boolean supportComplexTypes;
private final boolean ownsZkConnection;
private final boolean ownsAllocator;
@@ -99,6 +99,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
this.clusterCoordinator = coordinator;
this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY);
+ this.supportComplexTypes = config.getBoolean(ExecConstants.CLIENT_SUPPORT_COMPLEX_TYPES);
}
public DrillConfig getConfig(){
@@ -161,7 +162,8 @@ public class DrillClient implements Closeable, ConnectionThrottle{
// just use the first endpoint for now
DrillbitEndpoint endpoint = endpoints.iterator().next();
- this.client = new UserClient(allocator, TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS),
"Client-"));
+ this.client = new UserClient(supportComplexTypes, allocator,
+ TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS),
"Client-"));
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
connect(endpoint);
connected = true;
@@ -193,7 +195,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private void connect(DrillbitEndpoint endpoint) throws RpcException {
FutureHandler f = new FutureHandler();
try {
- client.setSupportComplexTypes(supportComplexTypes).connect(f, endpoint, props);
+ client.connect(f, endpoint, props);
f.checkedGet();
} catch (InterruptedException e) {
throw new RpcException(e);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index ad885f6..d49a9fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -46,8 +46,9 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
private boolean supportComplexTypes = true;
- public UserClient(BufferAllocator alloc, EventLoopGroup eventLoopGroup) {
+ public UserClient(boolean supportComplexTypes, BufferAllocator alloc, EventLoopGroup eventLoopGroup)
{
super(UserRpcConfig.MAPPING, alloc, eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class,
BitToUserHandshake.PARSER);
+ this.supportComplexTypes = supportComplexTypes;
}
public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
@@ -112,18 +113,4 @@ public class UserClient extends BasicClientWithConnection<RpcType,
UserToBitHand
return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
- /**
- * Sets whether the application is willing to accept complex types (Map, Arrays) in the
returned result set.
- * Default is {@code true}. If set to {@code false}, the complex types are returned as
JSON encoded VARCHAR type.
- *
- * @throws IllegalStateException if called after a connection has been established.
- */
- public UserClient setSupportComplexTypes(boolean supportComplexTypes) {
- if (isActive()) {
- throw new IllegalStateException("Attempted to modify connection property after connection
has been established.");
- }
- this.supportComplexTypes = supportComplexTypes;
- return this;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8a69ec1..81a2bc2 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -19,6 +19,10 @@
drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+drill.client: {
+ supports-complex-types: true
+}
+
drill.exec: {
cluster-id: "drillbits1"
rpc: {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2dcf8cb8/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 7cfe51a..07a2075 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -32,10 +32,12 @@ import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.client.PrintingResultsListener;
import org.apache.drill.exec.client.QuerySubmitter;
import org.apache.drill.exec.client.QuerySubmitter.Format;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
@@ -55,6 +57,8 @@ import com.google.common.io.Resources;
public class BaseTestQuery extends ExecTest{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
+ private int[] columnWidths = new int[] { 8 };
+
private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
@SuppressWarnings("serial")
@@ -227,4 +231,29 @@ public class BaseTestQuery extends ExecTest{
}
}
+ protected void setColumnWidth(int columnWidth) {
+ this.columnWidths = new int[] { columnWidth };
+ }
+
+ protected void setColumnWidths(int[] columnWidths) {
+ this.columnWidths = columnWidths;
+ }
+
+ protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException
{
+ int rowCount = 0;
+ RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ for(QueryResultBatch result : results){
+ rowCount += result.getHeader().getRowCount();
+ loader.load(result.getHeader().getDef(), result.getData());
+ if (loader.getRecordCount() <= 0) {
+ break;
+ }
+ VectorUtil.showVectorAccessibleContent(loader, columnWidths);
+ loader.clear();
+ result.release();
+ }
+ System.out.println("Total record count: " + rowCount);
+ return rowCount;
+ }
+
}
|