activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2323 NettyTransport should also send requests with void promises
Date Wed, 08 May 2019 19:49:50 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f040cf  ARTEMIS-2323 NettyTransport should also send requests with void promises
     new ee674ba  This closes #2647
5f040cf is described below

commit 5f040cf38bf42f3fc43ae91b37a03e71fea26b61
Author: Francesco Nigro <nigro.fra@gmail.com>
AuthorDate: Fri Apr 26 13:46:26 2019 +0200

    ARTEMIS-2323 NettyTransport should also send requests with void promises
---
 .../transport/amqp/client/AmqpConnection.java      | 28 +--------------------
 .../transport/netty/NettyTcpTransport.java         | 29 +++++++++++++++++++---
 .../activemq/transport/netty/NettyTransport.java   |  2 ++
 .../activemq/transport/netty/NettyWSTransport.java | 15 +++++------
 4 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 9181cea..6a2f42c 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -57,7 +57,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
 
 public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener
{
@@ -295,31 +294,6 @@ public class AmqpConnection extends AmqpAbstractResource<Connection>
implements
       return session;
    }
 
-   //----- Access to low level IO for specific test cases -------------------//
-
-   public void sendRawBytes(final byte[] rawData) throws Exception {
-      checkClosed();
-
-      final ClientFuture request = new ClientFuture();
-
-      serializer.execute(new Runnable() {
-
-         @Override
-         public void run() {
-            checkClosed();
-            try {
-               transport.send(Unpooled.wrappedBuffer(rawData));
-            } catch (IOException e) {
-               fireClientException(e);
-            } finally {
-               request.onSuccess();
-            }
-         }
-      });
-
-      request.sync();
-   }
-
    //----- Configuration accessors ------------------------------------------//
 
    /**
@@ -550,7 +524,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection>
implements
             if (toWrite != null && toWrite.hasRemaining()) {
                ByteBuf outbound = transport.allocateSendBuffer(toWrite.remaining());
                outbound.writeBytes(toWrite);
-               transport.send(outbound);
+               transport.sendVoidPromise(outbound);
                protonTransport.outputConsumed();
             } else {
                done = true;
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
index 9eab670..b3701ad 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
@@ -22,7 +22,11 @@ import java.security.Principal;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import static java.util.function.Function.identity;
 
+import io.netty.channel.ChannelPromise;
+import io.netty.util.ReferenceCounted;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -222,17 +226,34 @@ public class NettyTcpTransport implements NettyTransport {
       return channel.alloc().ioBuffer(size, size);
    }
 
-   @Override
-   public ChannelFuture send(ByteBuf output) throws IOException {
-      checkConnected();
+   protected final ChannelFuture writeAndFlush(ByteBuf output,
+                                               ChannelPromise promise,
+                                               Function<? super ByteBuf, ? extends ReferenceCounted>
bufferTransformer) throws IOException {
+      try {
+         checkConnected();
+      } catch (IOException ioEx) {
+         output.release();
+         throw ioEx;
+      }
       int length = output.readableBytes();
       if (length == 0) {
+         output.release();
          return null;
       }
 
       LOG.trace("Attempted write of: {} bytes", length);
 
-      return channel.writeAndFlush(output);
+      return channel.writeAndFlush(bufferTransformer.apply(output), promise);
+   }
+
+   @Override
+   public ChannelFuture send(ByteBuf output) throws IOException {
+      return writeAndFlush(output, channel.newPromise(), identity());
+   }
+
+   @Override
+   public void sendVoidPromise(ByteBuf output) throws IOException {
+      writeAndFlush(output, channel.voidPromise(), identity());
    }
 
    @Override
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java
index b06be3e..3727d67 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTransport.java
@@ -38,6 +38,8 @@ public interface NettyTransport {
 
    ByteBuf allocateSendBuffer(int size) throws IOException;
 
+   void sendVoidPromise(ByteBuf output) throws IOException;
+
    ChannelFuture send(ByteBuf output) throws IOException;
 
    NettyTransportListener getTransportListener();
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java
index 08f4816..15efe54 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyWSTransport.java
@@ -80,16 +80,13 @@ public class NettyWSTransport extends NettyTcpTransport {
    }
 
    @Override
-   public ChannelFuture send(ByteBuf output) throws IOException {
-      checkConnected();
-      int length = output.readableBytes();
-      if (length == 0) {
-         return null;
-      }
-
-      LOG.trace("Attempted write of: {} bytes", length);
+   public void sendVoidPromise(ByteBuf output) throws IOException {
+      writeAndFlush(output, channel.voidPromise(), BinaryWebSocketFrame::new);
+   }
 
-      return channel.writeAndFlush(new BinaryWebSocketFrame(output));
+   @Override
+   public ChannelFuture send(ByteBuf output) throws IOException {
+      return writeAndFlush(output, channel.newPromise(), BinaryWebSocketFrame::new);
    }
 
    @Override


Mime
View raw message