knox-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject [knox] branch master updated: KNOX-1997: Adding changes to buffer messages from backend in onMessag… (#143)
Date Wed, 13 Nov 2019 14:25:41 GMT
This is an automated email from the ASF dual-hosted git repository.

more pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 96587a4  KNOX-1997: Adding changes to buffer messages from backend in onMessag…
(#143)
96587a4 is described below

commit 96587a4ed89ce6691198a0df8a96a9c81ddb875f
Author: Rajat Goel <rajat.goel@guavus.com>
AuthorDate: Wed Nov 13 19:55:32 2019 +0530

    KNOX-1997: Adding changes to buffer messages from backend in onMessag… (#143)
    
    * KNOX-1997: Adding changes to buffer messages from backend in onMessageText() if frontend
session related data structures have not been setup i.e. remote is null. Message buffer will
be flushed when remote is set by other thread executing onWebSocketConnect() API. To synchronise
reading/flushing buffer, added a lock
---
 .../gateway/config/impl/GatewayConfigImpl.java     |  7 ++
 .../websockets/GatewayWebsocketHandler.java        |  3 +-
 .../gateway/websockets/ProxyWebSocketAdapter.java  | 72 ++++++++++++++--
 .../knox/gateway/websockets/BadBackendTest.java    |  5 +-
 .../gateway/websockets/ConnectionDroppedTest.java  |  5 +-
 .../gateway/websockets/MessageFailureTest.java     |  5 +-
 .../WebsocketEchoHTTPServiceRoleTest.java          |  1 +
 .../knox/gateway/websockets/WebsocketEchoTest.java |  3 +-
 .../gateway/websockets/WebsocketEchoTestBase.java  | 18 ++--
 .../WebsocketMultipleConnectionTest.java           |  7 +-
 ...va => WebsocketServerInitiatedMessageTest.java} | 98 ++++++++++++++--------
 .../apache/knox/gateway/config/GatewayConfig.java  |  7 ++
 .../org/apache/knox/gateway/GatewayTestConfig.java |  6 ++
 13 files changed, 181 insertions(+), 56 deletions(-)

diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
index 71dd591..342628b 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/config/impl/GatewayConfigImpl.java
@@ -141,6 +141,7 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig
{
   public static final String WEBSOCKET_INPUT_BUFFER_SIZE = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.input.buffer.size";
   public static final String WEBSOCKET_ASYNC_WRITE_TIMEOUT = GATEWAY_CONFIG_FILE_PREFIX +
".websocket.async.write.timeout";
   public static final String WEBSOCKET_IDLE_TIMEOUT = GATEWAY_CONFIG_FILE_PREFIX + ".websocket.idle.timeout";
+  public static final String WEBSOCKET_MAX_WAIT_BUFFER_COUNT = GATEWAY_CONFIG_FILE_PREFIX
+ ".websocket.max.wait.buffer.count";
 
   /**
    * Properties for for gateway port mapping feature
@@ -190,6 +191,7 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig
{
   public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
   public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
   public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+  public static final int DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 100;
 
   public static final boolean DEFAULT_GATEWAY_PORT_MAPPING_ENABLED = true;
   public static final boolean DEFAULT_REMOTE_ALIAS_SERVICE_ENABLED = true;
@@ -851,6 +853,11 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig
{
   }
 
   @Override
+  public int getWebsocketMaxWaitBufferCount() {
+    return getInt( WEBSOCKET_MAX_WAIT_BUFFER_COUNT, DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT);
+  }
+
+  @Override
   public Map<String, Integer> getGatewayPortMappings() {
 
     final Map<String, Integer> result = new ConcurrentHashMap<>();
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
index 0f19052..8fff20b 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/GatewayWebsocketHandler.java
@@ -118,7 +118,8 @@ public class GatewayWebsocketHandler extends WebSocketHandler
       LOG.debugLog("Generated backend URL for websocket connection: " + backendURL);
 
       /* Upgrade happens here */
-      return new ProxyWebSocketAdapter(URI.create(backendURL), pool, getClientEndpointConfig(req));
+      return new ProxyWebSocketAdapter
+              (URI.create(backendURL), pool, getClientEndpointConfig(req), config);
     } catch (final Exception e) {
       LOG.failedCreatingWebSocket(e);
       throw e;
diff --git a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
index 4c345cb..6364a01 100644
--- a/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
+++ b/gateway-server/src/main/java/org/apache/knox/gateway/websockets/ProxyWebSocketAdapter.java
@@ -19,7 +19,11 @@ package org.apache.knox.gateway.websockets;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.websocket.ClientEndpointConfig;
 import javax.websocket.CloseReason;
@@ -28,6 +32,7 @@ import javax.websocket.DeploymentException;
 import javax.websocket.WebSocketContainer;
 
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.eclipse.jetty.io.RuntimeIOException;
 import org.eclipse.jetty.util.component.LifeCycle;
 import org.eclipse.jetty.websocket.api.BatchMode;
@@ -57,21 +62,30 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
 
   private ExecutorService pool;
 
+  /* Message buffer for holding data frames temporarily in memory till connection is setup.
+   Keeping the max size of the buffer as 100 messages for now. */
+  private List<String> messageBuffer = new ArrayList<String>();
+  private Lock remoteLock = new ReentrantLock();
+
+  private final GatewayConfig config;
+
   /**
    * Used to transmit headers from browser to backend server.
    * @since 0.14
    */
   private ClientEndpointConfig clientConfig;
 
-  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool) {
-    this(backend, pool, null);
+  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, GatewayConfig
config) {
+    this(backend, pool, null, config);
   }
 
-  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig
clientConfig) {
+  public ProxyWebSocketAdapter(final URI backend, final ExecutorService pool, final ClientEndpointConfig
clientConfig,
+                               GatewayConfig config) {
     super();
     this.backend = backend;
     this.pool = pool;
     this.clientConfig = clientConfig;
+    this.config = config;
   }
 
   @Override
@@ -104,9 +118,33 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
       throw new RuntimeIOException(e);
     }
 
+    remoteLock.lock();
     super.onWebSocketConnect(frontEndSession);
     this.frontendSession = frontEndSession;
 
+    final RemoteEndpoint remote = frontEndSession.getRemote();
+    try {
+      if (!messageBuffer.isEmpty()) {
+        LOG.debugLog("Found old buffered messages");
+        for (String obj:messageBuffer) {
+          LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
+          remote.sendString(obj);
+        }
+        messageBuffer.clear();
+        if (remote.getBatchMode() == BatchMode.ON) {
+          remote.flush();
+        }
+      } else {
+        LOG.debugLog("Message buffer is empty");
+      }
+    } catch (IOException e) {
+      LOG.connectionFailed(e);
+      throw new RuntimeIOException(e);
+    }
+    finally
+    {
+      remoteLock.unlock();
+    }
   }
 
   @Override
@@ -198,12 +236,29 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
 
       @Override
       public void onMessageText(String message, Object session) {
+        LOG.logMessage("[From Backend <---]" + message);
+        remoteLock.lock();
         final RemoteEndpoint remote = getRemote();
+        try {
+          if (remote == null) {
+            LOG.debugLog("Remote endpoint is null");
+            if (messageBuffer.size() >= config.getWebsocketMaxWaitBufferCount()) {
+              throw new RuntimeIOException("Remote is null and message buffer is full. Cannot
buffer anymore ");
+            }
+            LOG.debugLog("Buffering message: " + message);
+            messageBuffer.add(message);
+            return;
+          }
 
-        LOG.logMessage("[From Backend <---]" + message);
+          /* Proxy message to frontend */
+          LOG.debugLog("Found old buffered messages");
+          for (String obj:messageBuffer) {
+            LOG.debugLog("Sending old buffered message [From Backend <---]: " + obj);
+            remote.sendString(obj);
+          }
+          messageBuffer.clear();
 
-        /* Proxy message to frontend */
-        try {
+          LOG.debugLog("Sending current message [From Backend <---]: " + message);
           remote.sendString(message);
           if (remote.getBatchMode() == BatchMode.ON) {
             remote.flush();
@@ -212,7 +267,10 @@ public class ProxyWebSocketAdapter extends WebSocketAdapter {
           LOG.connectionFailed(e);
           throw new RuntimeIOException(e);
         }
-
+        finally
+        {
+          remoteLock.unlock();
+        }
       }
 
       @Override
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
index 6637813..194f172 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/BadBackendTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.ContextHandler;
@@ -25,6 +26,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.easymock.EasyMock;
 
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
@@ -77,13 +79,14 @@ public class BadBackendTest {
   }
 
   private static void startProxy() throws Exception {
+    GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     proxy = new Server();
     proxyConnector = new ServerConnector(proxy);
     proxy.addConnector(proxyConnector);
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10)));
+        new ProxyWebSocketAdapter(new URI(BAD_BACKEND), Executors.newFixedThreadPool(10),
gatewayConfig));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
index 80e3509..b2cf243 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/ConnectionDroppedTest.java
@@ -17,12 +17,14 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.ContextHandler;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.easymock.EasyMock;
 
 import javax.websocket.ContainerProvider;
 import javax.websocket.WebSocketContainer;
@@ -112,13 +114,14 @@ public class ConnectionDroppedTest {
   }
 
   private static void startProxy() throws Exception {
+    GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     proxy = new Server();
     proxyConnector = new ServerConnector(proxy);
     proxy.addConnector(proxyConnector);
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
+        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10), gatewayConfig));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
index 64b7f9a..e855f82 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/MessageFailureTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.apache.knox.gateway.config.GatewayConfig;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
@@ -26,6 +27,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.easymock.EasyMock;
 
 import javax.websocket.CloseReason;
 import javax.websocket.ContainerProvider;
@@ -152,13 +154,14 @@ public class MessageFailureTest {
   }
 
   private static void startProxy() throws Exception {
+    GatewayConfig gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
     proxy = new Server();
     proxyConnector = new ServerConnector(proxy);
     proxy.addConnector(proxyConnector);
 
     /* start Knox with WebsocketAdapter to test */
     final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
-        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10)));
+        new ProxyWebSocketAdapter(serverUri, Executors.newFixedThreadPool(10), gatewayConfig));
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
index e66015a..717a454 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoHTTPServiceRoleTest.java
@@ -59,6 +59,7 @@ public class WebsocketEchoHTTPServiceRoleTest extends WebsocketEchoTestBase
{
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    handler = null;
     WebsocketEchoTestBase.setUpBeforeClass();
     WebsocketEchoTestBase.startServers("http");
   }
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
index 7a08b9e..1d3dbb8 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
@@ -21,10 +21,10 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.net.URI;
 import javax.websocket.ContainerProvider;
 import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
-import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -58,6 +58,7 @@ public class WebsocketEchoTest extends WebsocketEchoTestBase {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    handler = null;
     WebsocketEchoTestBase.setUpBeforeClass();
     WebsocketEchoTestBase.startServers("ws");
   }
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
index a62d330..86a580a 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTestBase.java
@@ -36,6 +36,7 @@ import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.ContextHandler;
 import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,9 +61,7 @@ import static org.apache.knox.gateway.config.GatewayConfig.DEFAULT_IDENTITY_KEYS
 import static org.apache.knox.gateway.config.GatewayConfig.DEFAULT_IDENTITY_KEY_PASSPHRASE_ALIAS;
 
 /**
- * Base class for tests that attempt to proxy websocket connections through Knox
- * gateway. It setups a websocket socket connection that simply echoes data back.
- *
+ * Base class for websocoket echo tests.
  */
 public class WebsocketEchoTestBase {
   private static final String TEST_KEY_ALIAS = "test-identity";
@@ -70,7 +69,7 @@ public class WebsocketEchoTestBase {
   /**
    * Simulate backend websocket
    */
-  private static Server backendServer;
+  public static Server backendServer;
   /**
    * URI for backend websocket server
    */
@@ -93,6 +92,8 @@ public class WebsocketEchoTestBase {
    */
   public static URI serverUri;
 
+  public static WebSocketHandler handler;
+
   private static File topoDir;
   private static Path dataDir;
   private static Path securityDir;
@@ -142,7 +143,11 @@ public class WebsocketEchoTestBase {
     ServerConnector connector = new ServerConnector(backendServer);
     backendServer.addConnector(connector);
 
-    final WebsocketEchoHandler handler = new WebsocketEchoHandler();
+    synchronized (WebsocketEchoTestBase.class) {
+      if (handler == null) {
+        handler = new WebsocketEchoHandler();
+      }
+    }
 
     ContextHandler context = new ContextHandler();
     context.setContextPath("/");
@@ -272,6 +277,9 @@ public class WebsocketEchoTestBase {
     EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
         .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
 
+    EasyMock.expect(gatewayConfig.getWebsocketMaxWaitBufferCount())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT).anyTimes();
+
     EasyMock.expect(gatewayConfig.getRemoteRegistryConfigurationNames())
             .andReturn(Collections.emptyList())
             .anyTimes();
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
index c69d5b9..a2a4ae7 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketMultipleConnectionTest.java
@@ -113,7 +113,7 @@ public class WebsocketMultipleConnectionTest {
   /**
    * Maximum number of open connections to test.
    */
-  private static int MAX_CONNECTIONS = 100;
+  private static int MAX_CONNECTIONS = 99;
 
   public WebsocketMultipleConnectionTest() {
     super();
@@ -172,7 +172,7 @@ public class WebsocketMultipleConnectionTest {
       }
     }
 
-    latch.await(5 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
+    latch.await(50 * MAX_CONNECTIONS, TimeUnit.MILLISECONDS);
 
     /* 90 KB per connection */
     /*
@@ -314,6 +314,9 @@ public class WebsocketMultipleConnectionTest {
     EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
         .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
 
+    EasyMock.expect(gatewayConfig.getWebsocketMaxWaitBufferCount())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT).anyTimes();
+
     EasyMock.expect(gatewayConfig.getRemoteRegistryConfigurationNames())
             .andReturn(Collections.emptyList())
             .anyTimes();
diff --git a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
similarity index 51%
copy from gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
copy to gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
index 7a08b9e..26722df 100644
--- a/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketEchoTest.java
+++ b/gateway-server/src/test/java/org/apache/knox/gateway/websockets/WebsocketServerInitiatedMessageTest.java
@@ -17,13 +17,23 @@
  */
 package org.apache.knox.gateway.websockets;
 
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import javax.websocket.ContainerProvider;
-import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
+import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
 
@@ -50,14 +60,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
  *
  * @since 0.10
  */
-public class WebsocketEchoTest extends WebsocketEchoTestBase {
+public class WebsocketServerInitiatedMessageTest extends WebsocketEchoTestBase {
 
-  public WebsocketEchoTest() {
+  public WebsocketServerInitiatedMessageTest() {
     super();
   }
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    handler = new WebsocketServerInitiatedEchoHandler();
     WebsocketEchoTestBase.setUpBeforeClass();
     WebsocketEchoTestBase.startServers("ws");
   }
@@ -68,51 +79,64 @@ public class WebsocketEchoTest extends WebsocketEchoTestBase {
   }
 
   /*
-   * Test direct connection to websocket server without gateway
+   * Test websocket server initiated echo
    */
   @Test
-  public void testDirectEcho() throws Exception {
-
+  public void testGatewayServerInitiatedEcho() throws Exception {
     WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
     WebsocketClient client = new WebsocketClient();
+    container.connectToServer(client,
+            new URI(serverUri.toString() + "gateway/websocket/123foo456bar/channels"));
 
-    Session session = container.connectToServer(client, backendServerUri);
+    //session.getBasicRemote().sendText("Echo");
+    client.messageQueue.awaitMessages(1, 5000, TimeUnit.MILLISECONDS);
 
-    session.getBasicRemote().sendText("Echo");
-    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+    assertThat(client.messageQueue.get(0), is("echo"));
   }
 
-  /*
-   * Test websocket proxying through gateway.
+  /**
+   * A Mock websocket handler
+   *
    */
-  @Test
-  public void testGatewayEcho() throws Exception {
-    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-
-    WebsocketClient client = new WebsocketClient();
-    Session session = container.connectToServer(client,
-        new URI(serverUri.toString() + "gateway/websocket/ws"));
-
-    session.getBasicRemote().sendText("Echo");
-    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
-
-    assertThat(client.messageQueue.get(0), is("Echo"));
+  private static class WebsocketServerInitiatedEchoHandler extends WebSocketHandler implements
WebSocketCreator {
+    private final ServerInitiatingMessageSocket socket = new ServerInitiatingMessageSocket();
+
+    @Override
+    public void configure(WebSocketServletFactory factory) {
+      factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+      factory.setCreator(this);
+    }
+
+    @Override
+    public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
{
+      return socket;
+    }
   }
 
-  /*
-   * Test websocket rewrite rules proxying through gateway.
+  /**
+   * A simple socket initiating message on connect
    */
-  @Test
-  public void testGatewayRewriteEcho() throws Exception {
-    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-
-    WebsocketClient client = new WebsocketClient();
-    Session session = container.connectToServer(client,
-            new URI(serverUri.toString() + "gateway/websocket/123foo456bar/channels"));
-
-    session.getBasicRemote().sendText("Echo");
-    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
-
-    assertThat(client.messageQueue.get(0), is("Echo"));
+  private static class ServerInitiatingMessageSocket extends WebSocketAdapter {
+
+    @Override
+    public void onWebSocketError(Throwable cause) {
+      throw new RuntimeException(cause);
+    }
+
+    @Override
+    public void onWebSocketConnect(Session sess) {
+      super.onWebSocketConnect(sess);
+
+      try {
+        RemoteEndpoint remote = getRemote();
+        remote.sendString("echo", null);
+        if (remote.getBatchMode() == BatchMode.ON) {
+          remote.flush();
+        }
+      } catch (IOException x) {
+        throw new RuntimeIOException(x);
+      }
+    }
   }
 }
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
index 790e6b2..3e6b32a 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/config/GatewayConfig.java
@@ -379,6 +379,13 @@ public interface GatewayConfig {
    */
   int getWebsocketIdleTimeout();
 
+  /**
+   * Max count of messages that can be temporarily buffered in memory before a connection
is properly setup.
+   * @since 0.10
+   * @return buffer size
+   */
+  int getWebsocketMaxWaitBufferCount();
+
   boolean isMetricsEnabled();
 
   boolean isJmxMetricsReportingEnabled();
diff --git a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
index 2f3d588..4a2c754 100644
--- a/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
+++ b/gateway-test-release-utils/src/main/java/org/apache/knox/gateway/GatewayTestConfig.java
@@ -47,6 +47,7 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig
{
   public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE = 4096;
   public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT = 60000;
   public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT = 300000;
+  public static final int DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT = 100;
 
   private Path gatewayHomePath = Paths.get("gateway-home");
   private String hadoopConfDir = "hadoop";
@@ -545,6 +546,11 @@ public class GatewayTestConfig extends Configuration implements GatewayConfig
{
   }
 
   @Override
+  public int getWebsocketMaxWaitBufferCount() {
+    return DEFAULT_WEBSOCKET_MAX_WAIT_BUFFER_COUNT;
+  }
+
+  @Override
   public boolean isMetricsEnabled() {
     return false;
   }


Mime
View raw message