DRILL-4297: Enable custom serializers and deserializers when using CustomTunnel
- Adds support for customer serializers and deserializers
- Adds pre-built serializers and deserializers for Protobuf, Jackson, Protostuff (protobuf)
and Protostuff (json)
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e57c6542
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e57c6542
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e57c6542
Branch: refs/heads/master
Commit: e57c6542c20d2db37837c7a3e72700a8412ae822
Parents: 422c5a8
Author: Jacques Nadeau <jacques@apache.org>
Authored: Thu Feb 4 18:09:54 2016 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Fri Feb 5 07:00:32 2016 -0800
----------------------------------------------------------------------
.../drill/exec/rpc/control/ControlTunnel.java | 166 +++++++++++++++---
.../drill/exec/rpc/control/Controller.java | 42 ++++-
.../drill/exec/rpc/control/ControllerImpl.java | 17 +-
.../exec/rpc/control/CustomHandlerRegistry.java | 50 +++---
.../exec/rpc/control/TestCustomTunnel.java | 167 ++++++++++++++++++-
5 files changed, 393 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index ff8be1d..ad6601c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -21,7 +21,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -38,9 +37,24 @@ import org.apache.drill.exec.rpc.FutureBitCommand;
import org.apache.drill.exec.rpc.ListeningCommand;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
-
+import org.apache.drill.exec.rpc.control.Controller.CustomSerDe;
+
+import com.dyuproject.protostuff.JsonIOUtil;
+import com.dyuproject.protostuff.LinkedBuffer;
+import com.dyuproject.protostuff.ProtobufIOUtil;
+import com.dyuproject.protostuff.ProtostuffIOUtil;
+import com.dyuproject.protostuff.Schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
+import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
@@ -188,11 +202,21 @@ public class ControlTunnel {
}
}
- public <SEND extends Message, RECEIVE extends Message> CustomTunnel<SEND, RECEIVE>
getCustomTunnel(
+ @SuppressWarnings("unchecked")
+ public <SEND extends MessageLite, RECEIVE extends MessageLite> CustomTunnel<SEND,
RECEIVE> getCustomTunnel(
int messageTypeId, Class<SEND> clazz, Parser<RECEIVE> parser) {
- return new CustomTunnel<SEND, RECEIVE>(messageTypeId, parser);
+ return new CustomTunnel<SEND, RECEIVE>(
+ messageTypeId,
+ ((CustomSerDe<SEND>) new ProtoSerDe<Message>(null)),
+ new ProtoSerDe<RECEIVE>(parser));
+ }
+
+ public <SEND, RECEIVE> CustomTunnel<SEND, RECEIVE> getCustomTunnel(
+ int messageTypeId, CustomSerDe<SEND> send, CustomSerDe<RECEIVE> receive)
{
+ return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive);
}
+
private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection>
{
private CustomMessage message;
@@ -235,24 +259,24 @@ public class ControlTunnel {
*/
public class CustomFuture<RECEIVE> {
- private Parser<RECEIVE> parser;
- private DrillRpcFuture<CustomMessage> future;
+ private final CustomSerDe<RECEIVE> serde;
+ private final DrillRpcFuture<CustomMessage> future;
- public CustomFuture(Parser<RECEIVE> parser, DrillRpcFuture<CustomMessage>
future) {
+ public CustomFuture(CustomSerDe<RECEIVE> serde, DrillRpcFuture<CustomMessage>
future) {
super();
- this.parser = parser;
+ this.serde = serde;
this.future = future;
}
- public RECEIVE get() throws RpcException, InvalidProtocolBufferException {
+ public RECEIVE get() throws Exception {
CustomMessage message = future.checkedGet();
- return parser.parseFrom(message.getMessage());
+ return serde.deserializeReceived(message.getMessage().toByteArray());
}
- public RECEIVE get(long timeout, TimeUnit unit) throws RpcException, TimeoutException,
+ public RECEIVE get(long timeout, TimeUnit unit) throws Exception,
InvalidProtocolBufferException {
CustomMessage message = future.checkedGet(timeout, unit);
- return parser.parseFrom(message.getMessage());
+ return serde.deserializeReceived(message.getMessage().toByteArray());
}
public DrillBuf getBuffer() throws RpcException {
@@ -261,6 +285,7 @@ public class ControlTunnel {
}
+
/**
* A special tunnel that can be used for custom types of messages. Its lifecycle is tied
to the underlying
* ControlTunnel.
@@ -269,14 +294,16 @@ public class ControlTunnel {
* @param <RECEIVE>
* The expected response the control tunnel expects to receive.
*/
- public class CustomTunnel<SEND extends Message, RECEIVE extends Message> {
+ public class CustomTunnel<SEND, RECEIVE> {
private int messageTypeId;
- private Parser<RECEIVE> parser;
+ private CustomSerDe<SEND> send;
+ private CustomSerDe<RECEIVE> receive;
- private CustomTunnel(int messageTypeId, Parser<RECEIVE> parser) {
+ private CustomTunnel(int messageTypeId, CustomSerDe<SEND> send, CustomSerDe<RECEIVE>
receive) {
super();
this.messageTypeId = messageTypeId;
- this.parser = parser;
+ this.send = send;
+ this.receive = receive;
}
/**
@@ -289,13 +316,13 @@ public class ControlTunnel {
*/
public CustomFuture<RECEIVE> send(SEND messageToSend, ByteBuf... dataBodies) {
final CustomMessage customMessage = CustomMessage.newBuilder()
- .setMessage(messageToSend.toByteString())
+ .setMessage(ByteString.copyFrom(send.serializeToSend(messageToSend)))
.setType(messageTypeId)
.build();
final SyncCustomMessageSender b = new SyncCustomMessageSender(customMessage, dataBodies);
manager.runCommand(b);
DrillRpcFuture<CustomMessage> innerFuture = b.getFuture();
- return new CustomFuture<RECEIVE>(parser, innerFuture);
+ return new CustomFuture<RECEIVE>(receive, innerFuture);
}
/**
@@ -309,7 +336,7 @@ public class ControlTunnel {
*/
public void send(RpcOutcomeListener<RECEIVE> listener, SEND messageToSend, ByteBuf...
dataBodies) {
final CustomMessage customMessage = CustomMessage.newBuilder()
- .setMessage(messageToSend.toByteString())
+ .setMessage(ByteString.copyFrom(send.serializeToSend(messageToSend)))
.setType(messageTypeId)
.build();
manager.runCommand(new CustomMessageSender(new CustomTunnelListener(listener), customMessage,
dataBodies));
@@ -331,9 +358,9 @@ public class ControlTunnel {
@Override
public void success(CustomMessage value, ByteBuf buffer) {
try {
- RECEIVE message = parser.parseFrom(value.getMessage());
+ RECEIVE message = receive.deserializeReceived(value.getMessage().toByteArray());
innerListener.success(message, buffer);
- } catch (InvalidProtocolBufferException e) {
+ } catch (Exception e) {
innerListener.failed(new RpcException("Failure while parsing message locally.",
e));
}
@@ -345,7 +372,104 @@ public class ControlTunnel {
}
}
+
+ }
+
+
+
+
+ public static class ProtoSerDe<MSG extends MessageLite> implements CustomSerDe<MSG>
{
+ private final Parser<MSG> parser;
+
+ ProtoSerDe(Parser<MSG> parser) {
+ this.parser = parser;
+ }
+
+ @Override
+ public byte[] serializeToSend(MSG send) {
+ return send.toByteArray();
+ }
+
+ @Override
+ public MSG deserializeReceived(byte[] bytes) throws Exception {
+ return parser.parseFrom(bytes);
+ }
+
+ }
+
+ public static class JacksonSerDe<MSG> implements CustomSerDe<MSG> {
+
+ private final ObjectWriter writer;
+ private final ObjectReader reader;
+
+ public JacksonSerDe(Class<MSG> clazz) {
+ ObjectMapper mapper = new ObjectMapper();
+ writer = mapper.writerFor(clazz);
+ reader = mapper.readerFor(clazz);
+ }
+
+ public JacksonSerDe(Class<MSG> clazz, JsonSerializer<MSG> serializer, JsonDeserializer<MSG>
deserializer) {
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ mapper.registerModule(module);
+ module.addSerializer(clazz, serializer);
+ module.addDeserializer(clazz, deserializer);
+ writer = mapper.writerFor(clazz);
+ reader = mapper.readerFor(clazz);
+ }
+
+ @Override
+ public byte[] serializeToSend(MSG send) {
+ try {
+ return writer.writeValueAsBytes(send);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public MSG deserializeReceived(byte[] bytes) throws Exception {
+ return (MSG) reader.readValue(bytes);
+ }
+
}
+ public static class ProtostuffBinarySerDe<MSG extends com.dyuproject.protostuff.Message<MSG>>
implements
+ CustomSerDe<MSG> {
+ private Schema<MSG> schema;
+
+ @Override
+ public byte[] serializeToSend(MSG send) {
+ final LinkedBuffer buffer = LinkedBuffer.allocate(512);
+ return ProtostuffIOUtil.toByteArray(send, schema, buffer);
+ }
+ @Override
+ public MSG deserializeReceived(byte[] bytes) throws Exception {
+ MSG msg = schema.newMessage();
+ ProtobufIOUtil.mergeFrom(bytes, msg, schema);
+ return msg;
+ }
+
+ }
+
+ public static class ProtostuffJsonSerDe<MSG extends com.dyuproject.protostuff.Message<MSG>>
implements
+ CustomSerDe<MSG> {
+ private Schema<MSG> schema;
+
+ @Override
+ public byte[] serializeToSend(MSG send) {
+ final LinkedBuffer buffer = LinkedBuffer.allocate(512);
+ return JsonIOUtil.toByteArray(send, schema, false, buffer);
+ }
+
+ @Override
+ public MSG deserializeReceived(byte[] bytes) throws Exception {
+ MSG msg = schema.newMessage();
+ JsonIOUtil.mergeFrom(bytes, msg, schema, false);
+ return msg;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index d6b288c..a5f470c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -49,7 +49,6 @@ public interface Controller extends AutoCloseable {
/**
* Register a new handler for custom message types. Should be done before any messages.
This is threadsafe as this
* method manages locking internally.
- *
* @param messageTypeId
* The type of message id to handle. This corresponds to the CustomMessage.type
field. Note that only a
* single handler for a particular type of message can be registered within a
particular Drillbit.
@@ -62,13 +61,32 @@ public interface Controller extends AutoCloseable {
CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser);
/**
+ * Register a new handler for custom message types. Should be done before any messages.
This is threadsafe as this
+ * method manages locking internally.
+ * @param messageTypeId
+ * The type of message id to handle. This corresponds to the CustomMessage.type
field. Note that only a
+ * single handler for a particular type of message can be registered within a
particular Drillbit.
+ * @param handler
+ * The handler that should be used to handle this type of message.
+ * @param requestSerde
+ * CustomSerDe for incoming requests.
+ * @param responseSerde
+ * CustomSerDe for serializing responses.
+ */
+ public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId,
+ CustomMessageHandler<REQUEST, RESPONSE> handler,
+ CustomSerDe<REQUEST> requestSerde,
+ CustomSerDe<RESPONSE> responseSerde);
+
+ /**
* Defines how the Controller should handle custom messages. Implementations need to be
threadsafe.
+ *
* @param <REQUEST>
* The type of request message.
* @param <RESPONSE>
* The type of the response message.
*/
- public interface CustomMessageHandler<REQUEST extends MessageLite, RESPONSE extends
MessageLite> {
+ public interface CustomMessageHandler<REQUEST, RESPONSE> {
/**
* Handle an incoming message.
@@ -81,15 +99,18 @@ public interface Controller extends AutoCloseable {
* throw this exception if there is an RPC failure that should be communicated
to the sender.
*/
public CustomResponse<RESPONSE> onMessage(REQUEST pBody, DrillBuf dBody) throws
UserRpcException;
+
}
+
+
/**
* A simple interface that describes the nature of the response to the custom incoming
message.
*
* @param <RESPONSE>
* The type of message that the respopnse contains. Must be a protobuf message
type.
*/
- public interface CustomResponse<RESPONSE extends MessageLite> {
+ public interface CustomResponse<RESPONSE> {
/**
* The structured portion of the response.
@@ -103,4 +124,19 @@ public interface Controller extends AutoCloseable {
*/
public ByteBuf[] getBodies();
}
+
+ /**
+ * Interface for defining how to serialize and deserialize custom message for consumer
who want to use something other
+ * than Protobuf messages.
+ *
+ * @param <SEND>
+ * The class that is expected to be sent.
+ * @param <RECEIVE>
+ * The class that is expected to received.
+ */
+ public interface CustomSerDe<MSG> {
+ public byte[] serializeToSend(MSG send);
+
+ public MSG deserializeReceived(byte[] bytes) throws Exception;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index ddc7778..482f117 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import com.google.common.collect.Lists;
+import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
@@ -71,12 +72,26 @@ public class ControllerImpl implements Controller {
}
+ @SuppressWarnings("unchecked")
@Override
public <REQUEST extends MessageLite, RESPONSE extends MessageLite> void registerCustomHandler(int
messageTypeId,
CustomMessageHandler<REQUEST, RESPONSE> handler, Parser<REQUEST> parser)
{
- handlerRegistry.registerCustomHandler(messageTypeId, handler, parser);
+ handlerRegistry.registerCustomHandler(
+ messageTypeId,
+ handler,
+ new ControlTunnel.ProtoSerDe<REQUEST>(parser),
+ (CustomSerDe<RESPONSE>) new ControlTunnel.ProtoSerDe<Message>(null));
}
+ @Override
+ public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId,
+ CustomMessageHandler<REQUEST, RESPONSE> handler,
+ CustomSerDe<REQUEST> requestSerde,
+ CustomSerDe<RESPONSE> responseSerde) {
+ handlerRegistry.registerCustomHandler(messageTypeId, handler, requestSerde, responseSerde);
+ }
+
+
public void close() throws Exception {
List<AutoCloseable> closeables = Lists.newArrayList();
closeables.add(server);
http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
index 7fc09fb..7a2bd04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/CustomHandlerRegistry.java
@@ -36,9 +36,6 @@ import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
public class CustomHandlerRegistry {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CustomHandlerRegistry.class);
@@ -46,7 +43,7 @@ public class CustomHandlerRegistry {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final AutoCloseableLock read = new AutoCloseableLock(readWriteLock.readLock());
private final AutoCloseableLock write = new AutoCloseableLock(readWriteLock.writeLock());
- private final IntObjectHashMap<ParsingHandler<?>> handlers = new IntObjectHashMap<>();
+ private final IntObjectHashMap<ParsingHandler<?, ?>> handlers = new IntObjectHashMap<>();
private volatile DrillbitEndpoint endpoint;
public CustomHandlerRegistry() {
@@ -56,13 +53,15 @@ public class CustomHandlerRegistry {
this.endpoint = endpoint;
}
- public <SEND extends MessageLite> void registerCustomHandler(int messageTypeId,
- CustomMessageHandler<SEND, ?> handler,
- Parser<SEND> parser) {
+ public <REQUEST, RESPONSE> void registerCustomHandler(int messageTypeId,
+ CustomMessageHandler<REQUEST, RESPONSE> handler,
+ Controller.CustomSerDe<REQUEST> requestSerde,
+ Controller.CustomSerDe<RESPONSE> responseSerde) {
Preconditions.checkNotNull(handler);
- Preconditions.checkNotNull(parser);
+ Preconditions.checkNotNull(requestSerde);
+ Preconditions.checkNotNull(responseSerde);
try (AutoCloseableLock lock = write.open()) {
- ParsingHandler<?> parsingHandler = handlers.get(messageTypeId);
+ ParsingHandler<?, ?> parsingHandler = handlers.get(messageTypeId);
if (parsingHandler != null) {
throw new IllegalStateException(String.format(
"Only one handler can be registered for a given custom message type. You tried
to register a handler for "
@@ -70,13 +69,13 @@ public class CustomHandlerRegistry {
messageTypeId));
}
- parsingHandler = new ParsingHandler<SEND>(handler, parser);
+ parsingHandler = new ParsingHandler<REQUEST, RESPONSE>(handler, requestSerde,
responseSerde);
handlers.put(messageTypeId, parsingHandler);
}
}
public Response handle(CustomMessage message, DrillBuf dBody) throws RpcException {
- final ParsingHandler<?> handler;
+ final ParsingHandler<?, ?> handler;
try (AutoCloseableLock lock = read.open()) {
handler = handlers.get(message.getType());
}
@@ -89,8 +88,12 @@ public class CustomHandlerRegistry {
message.getType())));
}
final CustomResponse<?> customResponse = handler.onMessage(message.getMessage(),
dBody);
+ @SuppressWarnings("unchecked")
final CustomMessage responseMessage = CustomMessage.newBuilder()
- .setMessage(customResponse.getMessage().toByteString())
+ .setMessage(
+ ByteString.copyFrom(((Controller.CustomSerDe<Object>) handler.getResponseSerDe())
+ .serializeToSend(customResponse
+ .getMessage())))
.setType(message.getType())
.build();
// make sure we don't pass in a null array.
@@ -99,23 +102,32 @@ public class CustomHandlerRegistry {
}
- private class ParsingHandler<SEND extends MessageLite> {
- private final CustomMessageHandler<SEND, ?> handler;
- private final Parser<SEND> parser;
+ private class ParsingHandler<REQUEST, RESPONSE> {
+ private final CustomMessageHandler<REQUEST, ?> handler;
+ private final Controller.CustomSerDe<REQUEST> requestSerde;
+ private final Controller.CustomSerDe<RESPONSE> responseSerde;
- public ParsingHandler(CustomMessageHandler<SEND, ?> handler, Parser<SEND>
parser) {
+ public ParsingHandler(
+ CustomMessageHandler<REQUEST, RESPONSE> handler,
+ Controller.CustomSerDe<REQUEST> requestSerde,
+ Controller.CustomSerDe<RESPONSE> responseSerde) {
super();
this.handler = handler;
- this.parser = parser;
+ this.requestSerde = requestSerde;
+ this.responseSerde = responseSerde;
+ }
+
+ public Controller.CustomSerDe<RESPONSE> getResponseSerDe() {
+ return responseSerde;
}
public CustomResponse<?> onMessage(ByteString pBody, DrillBuf dBody) throws UserRpcException
{
try {
- final SEND message = parser.parseFrom(pBody);
+ final REQUEST message = requestSerde.deserializeReceived(pBody.toByteArray());
return handler.onMessage(message, dBody);
- } catch (InvalidProtocolBufferException e) {
+ } catch (Exception e) {
throw new UserRpcException(endpoint, "Failure parsing message.", e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e57c6542/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
index 2008a48..9770a7e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/control/TestCustomTunnel.java
@@ -30,7 +30,6 @@ import java.util.Random;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlTunnel.CustomFuture;
import org.apache.drill.exec.rpc.control.ControlTunnel.CustomTunnel;
@@ -39,8 +38,6 @@ import org.apache.drill.exec.rpc.control.Controller.CustomResponse;
import org.apache.drill.exec.server.DrillbitContext;
import org.junit.Test;
-import com.google.protobuf.InvalidProtocolBufferException;
-
public class TestCustomTunnel extends BaseTestQuery {
private final QueryId expectedId = QueryId
@@ -61,7 +58,7 @@ public class TestCustomTunnel extends BaseTestQuery {
}
@Test
- public void ensureRoundTrip() throws RpcException, InvalidProtocolBufferException {
+ public void ensureRoundTrip() throws Exception {
final DrillbitContext context = getDrillbitContext();
final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(),
false);
@@ -74,7 +71,7 @@ public class TestCustomTunnel extends BaseTestQuery {
}
@Test
- public void ensureRoundTripBytes() throws RpcException, InvalidProtocolBufferException
{
+ public void ensureRoundTripBytes() throws Exception {
final DrillbitContext context = getDrillbitContext();
final TestCustomMessageHandler handler = new TestCustomMessageHandler(context.getEndpoint(),
true);
context.getController().registerCustomHandler(1002, handler, DrillbitEndpoint.PARSER);
@@ -90,6 +87,8 @@ public class TestCustomTunnel extends BaseTestQuery {
assertTrue(Arrays.equals(expected, actual));
}
+
+
private class TestCustomMessageHandler implements CustomMessageHandler<DrillbitEndpoint,
QueryId> {
private DrillbitEndpoint expectedValue;
private final boolean returnBytes;
@@ -135,4 +134,162 @@ public class TestCustomTunnel extends BaseTestQuery {
};
}
}
+
+ @Test
+ public void ensureRoundTripJackson() throws Exception {
+ final DrillbitContext context = getDrillbitContext();
+ final MesgA mesgA = new MesgA();
+ mesgA.fieldA = "123";
+ mesgA.fieldB = "okra";
+
+ final TestCustomMessageHandlerJackson handler = new TestCustomMessageHandlerJackson(mesgA);
+ context.getController().registerCustomHandler(1003, handler,
+ new ControlTunnel.JacksonSerDe<MesgA>(MesgA.class),
+ new ControlTunnel.JacksonSerDe<MesgB>(MesgB.class));
+ final ControlTunnel loopbackTunnel = context.getController().getTunnel(context.getEndpoint());
+ final CustomTunnel<MesgA, MesgB> tunnel = loopbackTunnel.getCustomTunnel(
+ 1003,
+ new ControlTunnel.JacksonSerDe<MesgA>(MesgA.class),
+ new ControlTunnel.JacksonSerDe<MesgB>(MesgB.class));
+ CustomFuture<MesgB> future = tunnel.send(mesgA);
+ assertEquals(expectedB, future.get());
+ }
+
+ private MesgB expectedB = new MesgB().set("hello", "bye", "friend");
+
+ public static class MesgA {
+ public String fieldA;
+ public String fieldB;
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((fieldA == null) ? 0 : fieldA.hashCode());
+ result = prime * result + ((fieldB == null) ? 0 : fieldB.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MesgA other = (MesgA) obj;
+ if (fieldA == null) {
+ if (other.fieldA != null) {
+ return false;
+ }
+ } else if (!fieldA.equals(other.fieldA)) {
+ return false;
+ }
+ if (fieldB == null) {
+ if (other.fieldB != null) {
+ return false;
+ }
+ } else if (!fieldB.equals(other.fieldB)) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ public static class MesgB {
+ public String fieldA;
+ public String fieldB;
+ public String fieldC;
+
+ public MesgB set(String a, String b, String c) {
+ fieldA = a;
+ fieldB = b;
+ fieldC = c;
+ return this;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((fieldA == null) ? 0 : fieldA.hashCode());
+ result = prime * result + ((fieldB == null) ? 0 : fieldB.hashCode());
+ result = prime * result + ((fieldC == null) ? 0 : fieldC.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MesgB other = (MesgB) obj;
+ if (fieldA == null) {
+ if (other.fieldA != null) {
+ return false;
+ }
+ } else if (!fieldA.equals(other.fieldA)) {
+ return false;
+ }
+ if (fieldB == null) {
+ if (other.fieldB != null) {
+ return false;
+ }
+ } else if (!fieldB.equals(other.fieldB)) {
+ return false;
+ }
+ if (fieldC == null) {
+ if (other.fieldC != null) {
+ return false;
+ }
+ } else if (!fieldC.equals(other.fieldC)) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ private class TestCustomMessageHandlerJackson implements CustomMessageHandler<MesgA,
MesgB> {
+ private MesgA expectedValue;
+
+ public TestCustomMessageHandlerJackson(MesgA expectedValue) {
+ super();
+ this.expectedValue = expectedValue;
+ }
+
+ @Override
+ public CustomResponse<MesgB> onMessage(MesgA pBody, DrillBuf dBody) throws UserRpcException
{
+
+ if (!expectedValue.equals(pBody)) {
+ throw new UserRpcException(DrillbitEndpoint.getDefaultInstance(),
+ "Invalid expected downstream value.", new IllegalStateException());
+ }
+
+ return new CustomResponse<MesgB>() {
+
+ @Override
+ public MesgB getMessage() {
+ return expectedB;
+ }
+
+ @Override
+ public ByteBuf[] getBodies() {
+ return null;
+ }
+
+ };
+ }
+ }
}
|