knox-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From su...@apache.org
Subject [2/2] knox git commit: KNOX-752 Implementation of initial websocket support (Sandeep More via Sumit Gupta)
Date Mon, 31 Oct 2016 18:02:18 GMT
KNOX-752 Implementation of initial websocket support (Sandeep More via Sumit Gupta)


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/c6caebd4
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/c6caebd4
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/c6caebd4

Branch: refs/heads/master
Commit: c6caebd4be9b6f1d3882d4e5938007fbaad55678
Parents: 9bcca25
Author: Sumit Gupta <sumit@apache.org>
Authored: Mon Oct 31 14:00:43 2016 -0400
Committer: Sumit Gupta <sumit@apache.org>
Committed: Mon Oct 31 14:00:43 2016 -0400

----------------------------------------------------------------------
 gateway-release/home/conf/gateway-site.xml      |   7 +
 gateway-server/pom.xml                          |  28 ++
 .../apache/hadoop/gateway/GatewayServer.java    |  29 +-
 .../gateway/config/impl/GatewayConfigImpl.java  |  88 ++++-
 .../websockets/GatewayWebsocketHandler.java     | 217 +++++++++++
 .../websockets/MessageEventCallback.java        |  66 ++++
 .../gateway/websockets/ProxyInboundSocket.java  |  82 ++++
 .../websockets/ProxyWebSocketAdapter.java       | 243 ++++++++++++
 .../websockets/WebsocketLogMessages.java        |  60 +++
 .../gateway/websockets/BadBackendTest.java      | 117 ++++++
 .../hadoop/gateway/websockets/BadUrlTest.java   | 307 +++++++++++++++
 .../websockets/ConnectionDroppedTest.java       | 202 ++++++++++
 .../hadoop/gateway/websockets/EchoSocket.java   |  68 ++++
 .../gateway/websockets/MessageFailureTest.java  | 206 ++++++++++
 .../gateway/websockets/WebsocketClient.java     | 131 +++++++
 .../websockets/WebsocketEchoHandler.java        |  47 +++
 .../gateway/websockets/WebsocketEchoTest.java   | 368 ++++++++++++++++++
 .../WebsocketMultipleConnectionTest.java        | 388 +++++++++++++++++++
 .../websocket/0.6.0/rewrite.xml                 |  26 ++
 .../websocket/0.6.0/service.xml                 |  33 ++
 .../hadoop/gateway/config/GatewayConfig.java    |  57 +++
 .../hadoop/gateway/GatewayTestConfig.java       |  74 ++++
 .../hadoop/gateway/GatewayTestConfig.java       |  74 ++++
 pom.xml                                         |  32 ++
 24 files changed, 2942 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-release/home/conf/gateway-site.xml
----------------------------------------------------------------------
diff --git a/gateway-release/home/conf/gateway-site.xml b/gateway-release/home/conf/gateway-site.xml
index d9ba16c..80cfacf 100644
--- a/gateway-release/home/conf/gateway-site.xml
+++ b/gateway-release/home/conf/gateway-site.xml
@@ -60,4 +60,11 @@ limitations under the License.
         <description>Boolean flag indicating whether to enable debug messages for krb5 authentication</description>
     </property>
 
+    <!-- @since 0.10 Websocket configs -->
+    <property>
+        <name>gateway.websocket.feature.enabled</name>
+        <value>false</value>
+        <description>Enable/Disable websocket feature.</description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/pom.xml
----------------------------------------------------------------------
diff --git a/gateway-server/pom.xml b/gateway-server/pom.xml
index 7501aaf..a9c5b70 100644
--- a/gateway-server/pom.xml
+++ b/gateway-server/pom.xml
@@ -213,6 +213,34 @@
             <artifactId>apache-jstl</artifactId>
         </dependency>
 
+        <!-- Websocket support -->
+
+        <dependency>
+            <groupId>org.eclipse.jetty.websocket</groupId>
+            <artifactId>websocket-server</artifactId>
+        </dependency>
+
+        <dependency>
+           <groupId>org.eclipse.jetty.websocket</groupId>
+           <artifactId>websocket-servlet</artifactId>
+        </dependency>
+
+        <dependency>
+           <groupId>javax.websocket</groupId>
+           <artifactId>javax.websocket-api</artifactId>
+        </dependency>
+
+        <dependency>
+           <groupId>org.eclipse.jetty.websocket</groupId>
+           <artifactId>javax-websocket-server-impl</artifactId>
+        </dependency>
+
+        <dependency>
+           <groupId>org.eclipse.jetty.websocket</groupId>
+           <artifactId>javax-websocket-client-impl</artifactId>
+        </dependency>
+
+
         <!-- ********** ********** ********** ********** ********** ********** -->
         <!-- ********** Test Dependencies                           ********** -->
         <!-- ********** ********** ********** ********** ********** ********** -->

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
index 15baa56..abaa168 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayServer.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.gateway.trace.ErrorHandler;
 import org.apache.hadoop.gateway.trace.TraceHandler;
 import org.apache.hadoop.gateway.util.Urls;
 import org.apache.hadoop.gateway.util.XmlUtils;
+import org.apache.hadoop.gateway.websockets.GatewayWebsocketHandler;
 import org.apache.log4j.PropertyConfigurator;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
@@ -361,13 +362,27 @@ public class GatewayServer {
     DefaultTopologyHandler defaultTopoHandler = new DefaultTopologyHandler(
         config, services, contexts);
 
-    /*
-     * Chaining the gzipHandler to correlationHandler. The expected flow here is
-     * defaultTopoHandler -> logHandler -> gzipHandler -> correlationHandler ->
-     * traceHandler
-     */
-    handlers.setHandlers(
-        new Handler[] { defaultTopoHandler, logHandler, gzipHandler });
+    if (config.isWebsocketEnabled()) {      
+      GatewayWebsocketHandler websockethandler = new GatewayWebsocketHandler(
+          config, services);
+      websockethandler.setHandler(gzipHandler);
+
+      /*
+       * Chaining the gzipHandler to correlationHandler. The expected flow here
+       * is defaultTopoHandler -> logHandler -> gzipHandler ->
+       * correlationHandler -> traceHandler -> websockethandler
+       */
+      handlers.setHandlers(
+          new Handler[] { defaultTopoHandler, logHandler, websockethandler });
+    } else {
+      /*
+       * Chaining the gzipHandler to correlationHandler. The expected flow here
+       * is defaultTopoHandler -> logHandler -> gzipHandler ->
+       * correlationHandler -> traceHandler
+       */
+      handlers.setHandlers(
+          new Handler[] { defaultTopoHandler, logHandler, gzipHandler });
+    }
 
     return handlers;
   }

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
index 0bfe82f..d8349d8 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/config/impl/GatewayConfigImpl.java
@@ -45,7 +45,7 @@ import java.util.Map;
  *
  * 1. gateway-default.xml - All the configuration variables that the
  *    Gateway needs.  These are the defaults that ship with the app
- *    and should only be changed be the app developers.
+ *    and should only be changed by the app developers.
  *
  * 2. gateway-site.xml - The (possibly empty) configuration that the
  *    system administrator can set variables for their Hadoop cluster.
@@ -131,6 +131,16 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
   public static final String DEPLOYMENTS_BACKUP_VERSION_LIMIT =  GATEWAY_CONFIG_FILE_PREFIX + ".deployment.backup.versionLimit";
   public static final String DEPLOYMENTS_BACKUP_AGE_LIMIT =  GATEWAY_CONFIG_FILE_PREFIX + ".deployment.backup.ageLimit";
 
+  /* @since 0.10 Websocket config variables */
+  public static final String WEBSOCKET_FEATURE_ENABLED =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.feature.enabled";
+  public static final String WEBSOCKET_MAX_TEXT_MESSAGE_SIZE =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.text.size";
+  public static final String WEBSOCKET_MAX_BINARY_MESSAGE_SIZE =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.binary.size";
+  public static final String WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.text.buffer.size";
+  public static final String WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.max.binary.buffer.size";
+  public static final String WEBSOCKET_INPUT_BUFFER_SIZE =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.input.buffer.size";
+  public static final String WEBSOCKET_ASYNC_WRITE_TIMEOUT =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.async.write.timeout";
+  public static final String WEBSOCKET_IDLE_TIMEOUT =  GATEWAY_CONFIG_FILE_PREFIX + ".websocket.idle.timeout";
+
   // These config property names are not inline with the convention of using the
   // GATEWAY_CONFIG_FILE_PREFIX as is done by those above. These are left for
   // backward compatibility. 
@@ -146,6 +156,17 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
   public static final String DEFAULT_DEPLOYMENT_DIR = "deployments";
   public static final String DEFAULT_SECURITY_DIR = "security";
   public static final String DEFAULT_DATA_DIR = "data";
+
+  /* Websocket defaults */
+  public static final boolean DEFAULT_WEBSOCKET_FEATURE_ENABLED =  false;
+  public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE =  Integer.MAX_VALUE;;
+  public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE =  Integer.MAX_VALUE;;
+  public static final int DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE =  32768;
+  public static final int DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE =  32768;
+  public static final int DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE =  4096;
+  public static final int DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT =  60000;
+  public static final int DEFAULT_WEBSOCKET_IDLE_TIMEOUT =  300000;
+
   private static List<String> DEFAULT_GLOBAL_RULES_SERVICES;
 
 
@@ -630,6 +651,71 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
     return DEFAULT_GLOBAL_RULES_SERVICES;
   }
 
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#isWebsocketEnabled()
+   */
+  @Override
+  public boolean isWebsocketEnabled() {
+    final String result = get( WEBSOCKET_FEATURE_ENABLED, Boolean.toString(DEFAULT_WEBSOCKET_FEATURE_ENABLED));
+    return Boolean.parseBoolean(result);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxTextMessageSize()
+   */
+  @Override
+  public int getWebsocketMaxTextMessageSize() {
+    return getInt( WEBSOCKET_MAX_TEXT_MESSAGE_SIZE, DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxBinaryMessageSize()
+   */
+  @Override
+  public int getWebsocketMaxBinaryMessageSize() {
+    return getInt( WEBSOCKET_MAX_BINARY_MESSAGE_SIZE, DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxTextMessageBufferSize()
+   */
+  @Override
+  public int getWebsocketMaxTextMessageBufferSize() {
+    return getInt( WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE, DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketMaxBinaryMessageBufferSize()
+   */
+  @Override
+  public int getWebsocketMaxBinaryMessageBufferSize() {
+    return getInt( WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE, DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketInputBufferSize()
+   */
+  @Override
+  public int getWebsocketInputBufferSize() {
+    return getInt( WEBSOCKET_INPUT_BUFFER_SIZE, DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketAsyncWriteTimeout()
+   */
+  @Override
+  public int getWebsocketAsyncWriteTimeout() {
+    return getInt( WEBSOCKET_ASYNC_WRITE_TIMEOUT, DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.gateway.config.GatewayConfig#websocketIdleTimeout()
+   */
+  @Override
+  public int getWebsocketIdleTimeout() {
+    return getInt( WEBSOCKET_IDLE_TIMEOUT, DEFAULT_WEBSOCKET_IDLE_TIMEOUT);
+  }
+
   private static long parseNetworkTimeout( String s ) {
     PeriodFormatter f = new PeriodFormatterBuilder()
         .appendMinutes().appendSuffix("m"," min")

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
new file mode 100644
index 0000000..fbae2f9
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/GatewayWebsocketHandler.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.service.definition.ServiceDefinition;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.registry.ServiceDefEntry;
+import org.apache.hadoop.gateway.services.registry.ServiceDefinitionRegistry;
+import org.apache.hadoop.gateway.services.registry.ServiceRegistry;
+import org.apache.hadoop.gateway.util.ServiceDefinitionsLoader;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * Websocket handler that will handle websocket connection request. This class
+ * is responsible for creating a proxy socket for inbound and outbound
+ * connections. This is also where the http to websocket handoff happens.
+ * 
+ * @since 0.10
+ */
+public class GatewayWebsocketHandler extends WebSocketHandler
+    implements WebSocketCreator {
+
+  private static WebsocketLogMessages LOG = MessagesFactory
+      .get(WebsocketLogMessages.class);
+
+  public final static String WEBSOCKET_PROTOCOL_STRING = "ws://";
+  
+  final static String REGEX_SPLIT_CLUSTER_NAME = "^((?:[^/]*/){1}[^/]*)";
+  
+  final static String REGEX_SPLIT_CONTEXT = "^((?:[^/]*/){2}[^/]*)";
+
+  final GatewayConfig config;
+  final GatewayServices services;
+
+  /**
+   * Create an instance
+   * 
+   * @param config
+   * @param services
+   */
+  public GatewayWebsocketHandler(final GatewayConfig config,
+      final GatewayServices services) {
+    super();
+
+    this.config = config;
+    this.services = services;
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.eclipse.jetty.websocket.server.WebSocketHandler#configure(org.eclipse.
+   * jetty.websocket.servlet.WebSocketServletFactory)
+   */
+  @Override
+  public void configure(final WebSocketServletFactory factory) {
+    factory.setCreator(this);
+    factory.getPolicy()
+        .setMaxTextMessageSize(config.getWebsocketMaxTextMessageSize());
+    factory.getPolicy()
+        .setMaxBinaryMessageSize(config.getWebsocketMaxBinaryMessageSize());
+
+    factory.getPolicy().setMaxBinaryMessageBufferSize(
+        config.getWebsocketMaxBinaryMessageBufferSize());
+    factory.getPolicy().setMaxTextMessageBufferSize(
+        config.getWebsocketMaxTextMessageBufferSize());
+
+    factory.getPolicy()
+        .setInputBufferSize(config.getWebsocketInputBufferSize());
+
+    factory.getPolicy()
+        .setAsyncWriteTimeout(config.getWebsocketAsyncWriteTimeout());
+    factory.getPolicy().setIdleTimeout(config.getWebsocketIdleTimeout());
+
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * org.eclipse.jetty.websocket.servlet.WebSocketCreator#createWebSocket(org.
+   * eclipse.jetty.websocket.servlet.ServletUpgradeRequest,
+   * org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse)
+   */
+  @Override
+  public Object createWebSocket(ServletUpgradeRequest req,
+      ServletUpgradeResponse resp) {
+
+    try {
+      final URI requestURI = req.getRequestURI();
+      final String path = requestURI.getPath();
+
+      /* URL used to connect to websocket backend */
+      final String backendURL = getMatchedBackendURL(path);
+
+      /* Upgrade happens here */
+      return new ProxyWebSocketAdapter(URI.create(backendURL));
+    } catch (final Exception e) {
+      LOG.failedCreatingWebSocket(e);
+      throw e;
+    }
+  }
+
+  /**
+   * This method looks at the context path and returns the backend websocket
+   * url. If websocket url is found it is used as is, or we default to
+   * ws://{host}:{port} which might or might not be right.
+   * 
+   * @param The
+   *          context path
+   * @return Websocket backend url
+   */
+  private synchronized String getMatchedBackendURL(final String path) {
+
+    final ServiceRegistry serviceRegistryService = services
+        .getService(GatewayServices.SERVICE_REGISTRY_SERVICE);
+
+    final ServiceDefinitionRegistry serviceDefinitionService = services
+        .getService(GatewayServices.SERVICE_DEFINITION_REGISTRY);
+
+    /* Filter out the /cluster/topology to get the context we want */
+    String[] pathInfo = path.split(REGEX_SPLIT_CONTEXT);
+
+    final ServiceDefEntry entry = serviceDefinitionService
+        .getMatchingService(pathInfo[1]);
+
+    if (entry == null) {
+      throw new RuntimeException(
+          String.format("Cannot find service for the given path: %s", path));
+    }
+
+    final File servicesDir = new File(config.getGatewayServicesDir());
+
+    final Set<ServiceDefinition> serviceDefs = ServiceDefinitionsLoader
+        .getServiceDefinitions(servicesDir);
+
+    /* URL used to connect to websocket backend */
+    String backendURL = urlFromServiceDefinition(serviceDefs,
+        serviceRegistryService, entry, path);
+
+    try {
+
+      /* if we do not find websocket URL we default to HTTP */
+      if (!StringUtils.contains(backendURL, WEBSOCKET_PROTOCOL_STRING)) {
+        URL serviceUrl = new URL(backendURL);
+
+        final StringBuffer backend = new StringBuffer();
+        /* Use http host:port if ws url not configured */
+        final String protocol = (serviceUrl.getProtocol() == "ws"
+            || serviceUrl.getProtocol() == "wss") ? serviceUrl.getProtocol()
+                : "ws";
+        backend.append(protocol).append("://");
+        backend.append(serviceUrl.getHost()).append(":");
+        backend.append(serviceUrl.getPort()).append("/");
+        ;
+        backend.append(serviceUrl.getPath());
+
+        backendURL = backend.toString();
+      }
+
+    } catch (MalformedURLException e) {
+      LOG.badUrlError(e);
+      throw new RuntimeException(e.toString());
+    }
+
+    return backendURL;
+  }
+
+  private static String urlFromServiceDefinition(
+      final Set<ServiceDefinition> serviceDefs,
+      final ServiceRegistry serviceRegistry, final ServiceDefEntry entry,
+      final String path) {
+
+    final String[] contexts = path.split("/");
+
+    final String serviceURL = serviceRegistry.lookupServiceURL(contexts[2],
+        entry.getName().toUpperCase());
+
+    /*
+     * we have a match, if ws:// is present it is returend else http:// is
+     * returned
+     */
+    return serviceURL;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java
new file mode 100644
index 0000000..66cd069
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/MessageEventCallback.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import javax.websocket.CloseReason;
+
+/**
+ * A simple callback interface used when evens happen on the Websocket client socket.
+ *
+ */
+public interface MessageEventCallback {
+
+  /**
+   * A generic callback, can be left un-implemented
+   */
+  void doCallback(final String message);
+  
+  /**
+   * Callback when connection is established.
+   * @param session 
+   */
+  void onConnectionOpen(final Object session);
+  
+  /**
+   * Callback when connection is closed.
+   * @param reason
+   */
+  void onConnectionClose(final CloseReason reason);
+  
+  /**
+   * Callback when there is an error in connection.
+   * @param cause
+   */
+  void onError(final Throwable cause);
+  
+  /**
+   * Callback when a text message is received.
+   * @param message
+   * @param session
+   */
+  void onMessageText(final String message, final Object session);
+  
+  /**
+   * Callback when a binary message is received.
+   * @param message
+   * @param last
+   * @param session
+   */
+  void onMessageBinary(final byte[]  message, final boolean last, final Object session);
+  
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
new file mode 100644
index 0000000..39744e1
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyInboundSocket.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.CloseReason;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+
+/**
+ * A Websocket client with callback.
+ * @since 0.10
+ */
+@ClientEndpoint
+public class ProxyInboundSocket {
+
+  /**
+   * Callback to be called once we have events on our socket.
+   */
+  final MessageEventCallback callback;
+
+  /**
+   * Create an instance
+   */
+  public ProxyInboundSocket(final MessageEventCallback callback) {
+    super();
+    this.callback = callback;
+  }
+
+  /* Client methods */
+  @OnOpen
+  public void onClientOpen(final javax.websocket.Session backendSession) {
+
+    callback.onConnectionOpen(backendSession);
+
+  }
+
+  @OnClose
+  public void onClientClose(final CloseReason reason) {
+    callback.onConnectionClose(reason);
+  }
+
+  @OnError
+  public void onClientError(Throwable cause) {
+    cause.printStackTrace(System.err);
+    callback.onError(cause);
+  }
+
+  @OnMessage
+  public void onBackendMessage(final String message,
+      final javax.websocket.Session session) {
+
+    callback.onMessageText(message, session);
+
+  }
+
+  @OnMessage
+  public void onBackendMessageBinary(final byte[] message, final boolean last,
+      final javax.websocket.Session session) {
+
+    callback.onMessageBinary(message, last, session);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
new file mode 100644
index 0000000..c7a7649
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/ProxyWebSocketAdapter.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.DeploymentException;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+/**
+ * Handles outbound/inbound Websocket connections and sessions.
+ *
+ * @since 0.10
+ */
+public class ProxyWebSocketAdapter extends WebSocketAdapter {
+
+  private static WebsocketLogMessages LOG = MessagesFactory
+      .get(WebsocketLogMessages.class);
+
+  /* URI for the backend */
+  private final URI backend;
+
+  /* Session between the frontend (browser) and Knox */
+  private Session frontendSession;
+
+  /* Session between the backend (outbound) and Knox */
+  private javax.websocket.Session backendSession;
+
+  private WebSocketContainer container;
+
+  /**
+   * Create an instance
+   */
+  public ProxyWebSocketAdapter(URI backend) {
+    super();
+    this.backend = backend;
+  }
+
+  @Override
+  public void onWebSocketConnect(final Session frontEndSession) {
+
+    /*
+     * Let's connect to the backend, this is where the Backend-to-frontend
+     * plumbing takes place
+     */
+    container = ContainerProvider.getWebSocketContainer();
+    final ProxyInboundSocket backendSocket = new ProxyInboundSocket(
+        getMessageCallback());
+
+    /* build the configuration */
+
+    /* Attempt Connect */
+    try {
+      backendSession = container.connectToServer(backendSocket, backend);
+      LOG.onConnectionOpen(backend.toString());
+
+    } catch (DeploymentException e) {
+      LOG.connectionFailed(e);
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      LOG.connectionFailed(e);
+      throw new RuntimeIOException(e);
+    }
+
+    super.onWebSocketConnect(frontEndSession);
+    this.frontendSession = frontEndSession;
+
+  }
+
+  @Override
+  public void onWebSocketBinary(final byte[] payload, final int offset,
+      final int length) {
+
+    if (isNotConnected()) {
+      return;
+    }
+
+    throw new UnsupportedOperationException(
+        "Websocket support for binary messages is not supported at this time.");
+  }
+
+  @Override
+  public void onWebSocketText(final String message) {
+
+    if (isNotConnected()) {
+      return;
+    }
+
+    LOG.logMessage("[From Frontend --->]" + message);
+
+    /* Proxy message to backend */
+    try {
+      backendSession.getBasicRemote().sendText(message);
+
+    } catch (IOException e) {
+      LOG.connectionFailed(e);
+    }
+
+  }
+
+  @Override
+  public void onWebSocketClose(int statusCode, String reason) {
+    super.onWebSocketClose(statusCode, reason);
+
+    closeQuietly();
+
+    LOG.onConnectionClose(backend.toString());
+
+  }
+
+  @Override
+  public void onWebSocketError(final Throwable t) {
+    passErrorToInboundConnection(t);
+  }
+
+  /**
+   * A helper function to pass errors to Inbound connection (browser/client)
+   */
+  private void passErrorToInboundConnection(final Throwable t) {
+
+    LOG.onError(t.toString());
+    if (t.toString().contains("exceeds maximum size")) {
+      frontendSession.close(StatusCode.MESSAGE_TOO_LARGE, t.getMessage());
+    }
+
+    else {
+      frontendSession.close(StatusCode.SERVER_ERROR, t.getMessage());
+      closeQuietly();
+      throw new RuntimeException(t);
+    }
+  }
+
+  private MessageEventCallback getMessageCallback() {
+
+    return new MessageEventCallback() {
+
+      @Override
+      public void doCallback(String message) {
+        /* do nothing */
+
+      }
+
+      @Override
+      public void onConnectionOpen(Object session) {
+        /* do nothing */
+
+      }
+
+      @Override
+      public void onConnectionClose(final CloseReason reason) {
+        try {
+          frontendSession.close(reason.getCloseCode().getCode(),
+              reason.getReasonPhrase());
+        } finally {
+          closeQuietly();
+        }
+
+      }
+
+      @Override
+      public void onError(Throwable cause) {
+        passErrorToInboundConnection(cause);
+      }
+
+      @Override
+      public void onMessageText(String message, Object session) {
+        final RemoteEndpoint remote = getRemote();
+
+        LOG.logMessage("[From Backend <---]" + message);
+
+        /* Proxy message to frontend */
+        try {
+          remote.sendString(message);
+          if (remote.getBatchMode() == BatchMode.ON) {
+            remote.flush();
+          }
+        } catch (IOException e) {
+          LOG.connectionFailed(e);
+          throw new RuntimeIOException(e);
+        }
+
+      }
+
+      @Override
+      public void onMessageBinary(byte[] message, boolean last,
+          Object session) {
+        throw new UnsupportedOperationException(
+            "Websocket support for binary messages is not supported at this time.");
+
+      }
+
+    };
+
+  }
+
+  private void closeQuietly() {
+
+    try {
+      backendSession.close();
+    } catch (IOException e) {
+      LOG.connectionFailed(e);
+    }
+
+    if (container instanceof LifeCycle) {
+      try {
+        ((LifeCycle) container).stop();
+      } catch (Exception e) {
+        LOG.connectionFailed(e);
+      }
+    }
+
+    frontendSession.close();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java
new file mode 100644
index 0000000..55997d5
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/websockets/WebsocketLogMessages.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import org.apache.hadoop.gateway.i18n.messages.Message;
+import org.apache.hadoop.gateway.i18n.messages.MessageLevel;
+import org.apache.hadoop.gateway.i18n.messages.Messages;
+import org.apache.hadoop.gateway.i18n.messages.StackTrace;
+
+/**
+ * Logging for Websocket
+ * 
+ * @since 0.10
+ */
+
+@Messages(logger = "org.apache.hadoop.gateway.websockets")
+public interface WebsocketLogMessages {
+
+  @Message(level = MessageLevel.ERROR,
+      text = "Error creating websocket connection: {0}")
+  void failedCreatingWebSocket(
+      @StackTrace(level = MessageLevel.ERROR) Exception e);
+
+  @Message(level = MessageLevel.ERROR,
+      text = "Unable to connect to websocket server: {0}")
+  void connectionFailed(@StackTrace(level = MessageLevel.ERROR) Exception e);
+
+  @Message(level = MessageLevel.ERROR, text = "Error: {0}")
+  void onError(final String message);
+
+  @Message(level = MessageLevel.ERROR, text = "Bad or malformed url: {0}")
+  void badUrlError(@StackTrace(level = MessageLevel.ERROR) Exception e);
+
+  @Message(level = MessageLevel.DEBUG,
+      text = "Websocket connection to backend server {0} opened")
+  void onConnectionOpen(final String backend);
+
+  @Message(level = MessageLevel.DEBUG, text = "Message: {0}")
+  void logMessage(final String message);
+
+  @Message(level = MessageLevel.DEBUG,
+      text = "Websocket connection to backend server {0} closed")
+  void onConnectionClose(final String backend);
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
new file mode 100644
index 0000000..1e4b86f
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadBackendTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.hadoop.gateway.websockets.BigEchoSocketHandler;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test a case where the backend is down.
+ *
+ */
+public class BadBackendTest {
+  
+  /* Proxy */
+  private static Server proxy;
+  private static ServerConnector proxyConnector;
+  private static URI proxyUri;
+  
+  private static final String BAD_BACKEND = "ws://localhost:666";
+  
+  public BadBackendTest() {
+    super();
+  }
+  
+  @BeforeClass
+  public static void startServer() throws Exception {
+    startProxy();
+
+  }
+
+  @AfterClass
+  public static  void stopServer() throws Exception {
+    proxy.stop();
+    
+  }
+  
+  /**
+   * Test for a message within limit.
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test(timeout = 8000)
+  public void testBadBackEnd() throws IOException, Exception {
+    final String message = "Echo";
+    
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+    javax.websocket.Session session = container.connectToServer(client,
+        proxyUri);
+    session.getBasicRemote().sendText(message);
+
+    client.awaitClose(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode(), 1000,
+        TimeUnit.MILLISECONDS);
+    
+    Assert.assertThat(client.close.getCloseCode().getCode(), CoreMatchers.is(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode()));
+
+  }
+  
+  private static void startProxy() throws Exception {
+    proxy = new Server();
+    proxyConnector = new ServerConnector(proxy);
+    proxy.addConnector(proxyConnector);
+
+    /* start Knox with WebsocketAdapter to test */
+    final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+        new ProxyWebSocketAdapter(new URI(BAD_BACKEND)));
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(wsHandler);
+    proxy.setHandler(context);
+
+    // Start Server
+    proxy.start();
+
+    String host = proxyConnector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = proxyConnector.getLocalPort();
+    proxyUri = new URI(String.format("ws://%s:%d/", host, port));
+    
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
new file mode 100644
index 0000000..4b9f836
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/BadUrlTest.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
+import org.apache.hadoop.gateway.deploy.DeploymentFactory;
+import org.apache.hadoop.gateway.services.DefaultGatewayServices;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.TopologyEvent;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+import org.apache.hadoop.test.TestUtils;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.mycila.xmltool.XMLDoc;
+import com.mycila.xmltool.XMLTag;
+
+/**
+ * Test for bad URLs.
+ * <p>
+ * This test will set up a bad URL through the topology, so this test case will
+ * attempt to test the bad url case and also the plumbing around it.
+ * @since 0.10
+ */
+public class BadUrlTest {
+
+  /**
+   * Non-existant backend websocket server
+   */
+  private static String BACKEND = "http://localhost:9999";
+
+  /**
+   * Mock Gateway server
+   */
+  private static Server gatewayServer;
+
+  /**
+   * Mock gateway config
+   */
+  private static GatewayConfig gatewayConfig;
+
+  private static GatewayServices services;
+
+  /**
+   * URI for gateway server
+   */
+  private static URI serverUri;
+
+  private static File topoDir;
+
+  public BadUrlTest() {
+    super();
+  }
+
+  @BeforeClass
+  public static void startServers() throws Exception {
+
+    startGatewayServer();
+
+  }
+
+  @AfterClass
+  public static void stopServers() {
+    try {
+      gatewayServer.stop();
+    } catch (final Exception e) {
+      e.printStackTrace(System.err);
+    }
+
+    /* Cleanup the created files */
+    FileUtils.deleteQuietly(topoDir);
+
+  }
+
+  /**
+   * Test websocket proxying through gateway.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testBadUrl() throws Exception {
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+
+    container.connectToServer(client,
+        new URI(serverUri.toString() + "gateway/websocket/ws"));
+
+    client.awaitClose(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode(),
+        1000, TimeUnit.MILLISECONDS);
+
+    Assert.assertThat(client.close.getCloseCode().getCode(),
+        CoreMatchers.is(CloseReason.CloseCodes.UNEXPECTED_CONDITION.getCode()));
+
+  }
+
+  /**
+   * Start Gateway Server.
+   *
+   * @throws Exception
+   */
+  private static void startGatewayServer() throws Exception {
+    gatewayServer = new Server();
+    final ServerConnector connector = new ServerConnector(gatewayServer);
+    gatewayServer.addConnector(connector);
+
+    /* workaround so we can add our handler later at runtime */
+    HandlerCollection handlers = new HandlerCollection(true);
+
+    /* add some initial handlers */
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    handlers.addHandler(context);
+
+    gatewayServer.setHandler(handlers);
+
+    // Start Server
+    gatewayServer.start();
+
+    String host = connector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = connector.getLocalPort();
+    serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+    /* Setup websocket handler */
+    setupGatewayConfig(BACKEND);
+
+    final GatewayWebsocketHandler gatewayWebsocketHandler = new GatewayWebsocketHandler(
+        gatewayConfig, services);
+    handlers.addHandler(gatewayWebsocketHandler);
+    gatewayWebsocketHandler.start();
+  }
+
+  /**
+   * Initialize the configs and components required for this test.
+   *
+   * @param backend
+   * @throws IOException
+   */
+  private static void setupGatewayConfig(final String backend)
+      throws IOException {
+    services = new DefaultGatewayServices();
+
+    topoDir = createDir();
+    URL serviceUrl = ClassLoader.getSystemResource("websocket-services");
+
+    final File descriptor = new File(topoDir, "websocket.xml");
+    final FileOutputStream stream = new FileOutputStream(descriptor);
+    createKnoxTopology(backend).toStream(stream);
+    stream.close();
+
+    final TestTopologyListener topoListener = new TestTopologyListener();
+
+    final Map<String, String> options = new HashMap<String, String>();
+    options.put("persist-master", "false");
+    options.put("master", "password");
+
+    gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
+    EasyMock.expect(gatewayConfig.getGatewayTopologyDir())
+        .andReturn(topoDir.toString()).anyTimes();
+
+    EasyMock.expect(gatewayConfig.getGatewayServicesDir())
+        .andReturn(serviceUrl.getFile()).anyTimes();
+
+    EasyMock.expect(gatewayConfig.getEphemeralDHKeySize()).andReturn("2048")
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getGatewaySecurityDir())
+        .andReturn(topoDir.toString()).anyTimes();
+
+    /* Websocket configs */
+    EasyMock.expect(gatewayConfig.isWebsocketEnabled()).andReturn(true)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageSize())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageSize())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageBufferSize())
+        .andReturn(
+            GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageBufferSize())
+        .andReturn(
+            GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketInputBufferSize())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketAsyncWriteTimeout())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+
+    EasyMock.replay(gatewayConfig);
+
+    try {
+      services.init(gatewayConfig, options);
+    } catch (ServiceLifecycleException e) {
+      e.printStackTrace();
+    }
+
+    DeploymentFactory.setGatewayServices(services);
+    final TopologyService monitor = services
+        .getService(GatewayServices.TOPOLOGY_SERVICE);
+    monitor.addTopologyChangeListener(topoListener);
+    monitor.reloadTopologies();
+
+  }
+
+  private static File createDir() throws IOException {
+    return TestUtils
+        .createTempDir(WebsocketEchoTest.class.getSimpleName() + "-");
+  }
+
+  /**
+   * Intentionally add bad URL
+   *
+   * @param backend
+   * @return
+   */
+  private static XMLTag createKnoxTopology(final String backend) {
+    XMLTag xml = XMLDoc.newDocument(true).addRoot("topology").addTag("service")
+        .addTag("role").addText("WEBSOCKET").addTag("url").addText(backend)
+        .gotoParent().gotoRoot();
+    // System.out.println( "GATEWAY=" + xml.toString() );
+    return xml;
+  }
+
+  private static class TestTopologyListener implements TopologyListener {
+
+    public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
+
+    @Override
+    public void handleTopologyEvent(List<TopologyEvent> events) {
+      this.events.add(events);
+
+      synchronized (this) {
+        for (TopologyEvent event : events) {
+          if (!event.getType().equals(TopologyEvent.Type.DELETED)) {
+
+            /* for this test we only care about this part */
+            DeploymentFactory.createDeployment(gatewayConfig,
+                event.getTopology());
+
+          }
+        }
+
+      }
+
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
new file mode 100644
index 0000000..6562e5c
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/ConnectionDroppedTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.hadoop.gateway.websockets.BigEchoSocketHandler;
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test to simulate unexpected connection drop. Here we establish a connection
+ * and then try to simulate an error.
+ * 
+ * @since 0.10
+ *
+ */
+public class ConnectionDroppedTest {
+
+  private static Server backend;
+  private static ServerConnector connector;
+  private static URI serverUri;
+
+  /* Proxy */
+  private static Server proxy;
+  private static ServerConnector proxyConnector;
+  private static URI proxyUri;
+
+  public ConnectionDroppedTest() {
+    super();
+  }
+
+  @BeforeClass
+  public static void startServer() throws Exception {
+    startBackend();
+    startProxy();
+
+  }
+
+  @AfterClass
+  public static void stopServer() throws Exception {
+    /* ORDER MATTERS ! */
+    proxy.stop();
+    backend.stop();
+
+  }
+
+  /**
+   * The connection is dropped, so we should see a tineout exception when we try
+   * to poll the message from queue.
+   * 
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test(expected = java.util.concurrent.TimeoutException.class)
+  public void testDroppedConnection() throws IOException, Exception {
+    final String message = "Echo";
+
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+    javax.websocket.Session session = container.connectToServer(client,
+        proxyUri);
+
+    session.getBasicRemote().sendText(message);
+
+    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+  }
+
+  private static void startBackend() throws Exception {
+    backend = new Server();
+    connector = new ServerConnector(backend);
+    backend.addConnector(connector);
+
+    /* start backend with Echo socket */
+    final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+        new BadSocket());
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(wsHandler);
+    backend.setHandler(context);
+
+    // Start Server
+    backend.start();
+
+    String host = connector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = connector.getLocalPort();
+    serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+  }
+
+  private static void startProxy() throws Exception {
+    proxy = new Server();
+    proxyConnector = new ServerConnector(proxy);
+    proxy.addConnector(proxyConnector);
+
+    /* start Knox with WebsocketAdapter to test */
+    final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+        new ProxyWebSocketAdapter(serverUri));
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(wsHandler);
+    proxy.setHandler(context);
+
+    // Start Server
+    proxy.start();
+
+    String host = proxyConnector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = proxyConnector.getLocalPort();
+    proxyUri = new URI(String.format("ws://%s:%d/", host, port));
+
+  }
+
+}
+
+/**
+ * 
+ * Simulate a bad socket.
+ * 
+ * @since 0.10
+ */
+class BadSocket extends WebSocketAdapter {
+
+  private Session session;
+
+  @Override
+  public void onWebSocketConnect(final Session session) {
+    this.session = session;
+  }
+
+  @Override
+  public void onWebSocketBinary(byte[] payload, int offset, int len) {
+    if (isNotConnected())
+      return;
+
+    try {
+      RemoteEndpoint remote = getRemote();
+      remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null);
+      if (remote.getBatchMode() == BatchMode.ON)
+        remote.flush();
+    } catch (IOException x) {
+      throw new RuntimeIOException(x);
+    }
+  }
+
+  @Override
+  public void onWebSocketError(Throwable cause) {
+    throw new RuntimeException(cause);
+  }
+
+  @Override
+  public void onWebSocketText(String message) {
+    if (isNotConnected())
+      return;
+    // Throw an exception on purpose
+    throw new RuntimeException("Simulating bad connection ...");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java
new file mode 100644
index 0000000..e46b5e2
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/EchoSocket.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.BatchMode;
+import org.eclipse.jetty.websocket.api.RemoteEndpoint;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+/**
+ * 
+ * A simple Echo socket 
+ */
+public class EchoSocket extends WebSocketAdapter {
+
+  @Override
+  public void onWebSocketBinary(byte[] payload, int offset, int len) {
+    if (isNotConnected())
+      return;
+
+    try {
+      RemoteEndpoint remote = getRemote();
+      remote.sendBytes(BufferUtil.toBuffer(payload, offset, len), null);
+      if (remote.getBatchMode() == BatchMode.ON)
+        remote.flush();
+    } catch (IOException x) {
+      throw new RuntimeIOException(x);
+    }
+  }
+
+  @Override
+  public void onWebSocketError(Throwable cause) {
+    throw new RuntimeException(cause);
+  }
+
+  @Override
+  public void onWebSocketText(String message) {
+    if (isNotConnected())
+      return;
+
+    try {
+      RemoteEndpoint remote = getRemote();
+      remote.sendString(message, null);
+      if (remote.getBatchMode() == BatchMode.ON)
+        remote.flush();
+    } catch (IOException x) {
+      throw new RuntimeIOException(x);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
new file mode 100644
index 0000000..f98b7e1
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/MessageFailureTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.CloseReason;
+import javax.websocket.ContainerProvider;
+import javax.websocket.WebSocketContainer;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+import org.hamcrest.CoreMatchers;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Dummy Test for max message size.
+ *
+ */
+public class MessageFailureTest {
+
+  private static Server backend;
+  private static ServerConnector connector;
+  private static URI serverUri;
+
+  /* Proxy */
+  private static Server proxy;
+  private static ServerConnector proxyConnector;
+  private static URI proxyUri;
+
+  public MessageFailureTest() {
+    super();
+  }
+
+  @BeforeClass
+  public static void startServer() throws Exception {
+    startBackend();
+    startProxy();
+
+  }
+
+  @AfterClass
+  public static  void stopServer() throws Exception {
+    /* ORDER MATTERS ! */
+    proxy.stop();
+    backend.stop();
+    
+  }
+
+
+  /**
+   * Test for a message that bigger than configured value
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test(timeout = 8000)
+  public void testMessageTooBig() throws IOException, Exception {
+    final String bigMessage = "Echooooooooooooo";
+
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+    javax.websocket.Session session = container.connectToServer(client,
+        proxyUri);
+    session.getBasicRemote().sendText(bigMessage);
+
+    client.awaitClose(CloseReason.CloseCodes.TOO_BIG.getCode(), 1000,
+        TimeUnit.MILLISECONDS);
+    
+    Assert.assertThat(client.close.getCloseCode().getCode(), CoreMatchers.is(CloseReason.CloseCodes.TOO_BIG.getCode()));
+
+  }
+  
+
+  /**
+   * Test for a message within limit.
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test(timeout = 8000)
+  public void testMessageOk() throws IOException, Exception {
+    final String message = "Echo";
+
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+    javax.websocket.Session session = container.connectToServer(client,
+        proxyUri);
+    session.getBasicRemote().sendText(message);
+
+    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+    Assert.assertThat(client.messageQueue.get(0), CoreMatchers.is("Echo"));
+
+  }
+
+  
+  private static void startBackend() throws Exception {
+    backend = new Server();
+    connector = new ServerConnector(backend);
+    backend.addConnector(connector);
+
+    /* start backend with Echo socket */
+    final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+        new EchoSocket());
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(wsHandler);
+    backend.setHandler(context);
+
+    // Start Server
+    backend.start();
+
+    String host = connector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = connector.getLocalPort();
+    serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+  }
+
+  private static void startProxy() throws Exception {
+    proxy = new Server();
+    proxyConnector = new ServerConnector(proxy);
+    proxy.addConnector(proxyConnector);
+
+    /* start Knox with WebsocketAdapter to test */
+    final BigEchoSocketHandler wsHandler = new BigEchoSocketHandler(
+        new ProxyWebSocketAdapter(serverUri));
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(wsHandler);
+    proxy.setHandler(context);
+
+    // Start Server
+    proxy.start();
+
+    String host = proxyConnector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = proxyConnector.getLocalPort();
+    proxyUri = new URI(String.format("ws://%s:%d/", host, port));
+    
+  }
+
+}
+
+/**
+ * A Mock websocket handler that just Echos messages
+ *
+ */
+class BigEchoSocketHandler extends WebSocketHandler
+    implements WebSocketCreator {
+
+  // final EchoSocket socket = new EchoSocket();
+  final WebSocketAdapter socket;
+
+  public BigEchoSocketHandler(final WebSocketAdapter socket) {
+    this.socket = socket;
+  }
+
+  @Override
+  public void configure(WebSocketServletFactory factory) {
+    factory.getPolicy().setMaxTextMessageSize(10);
+    factory.getPolicy().setMaxBinaryMessageSize(10);
+    factory.setCreator(this);
+  }
+
+  @Override
+  public Object createWebSocket(ServletUpgradeRequest req,
+      ServletUpgradeResponse resp) {
+    return socket;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java
new file mode 100644
index 0000000..92045f3
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketClient.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.websocket.ClientEndpoint;
+import javax.websocket.CloseReason;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+
+import org.eclipse.jetty.util.BlockingArrayQueue;
+
+/**
+ * A Test websocket client
+ *
+ */
+@ClientEndpoint
+public class WebsocketClient {
+
+  private Session session = null;
+  public CloseReason close = null;
+  public MessageQueue messageQueue = new MessageQueue();
+  public LinkedList<Throwable> errors = new LinkedList<>();
+  public CountDownLatch closeLatch = new CountDownLatch(1);
+
+  public boolean onError = false;
+  public String errorMessage = null;
+
+  @OnClose
+  public void onWebSocketClose(CloseReason reason) {
+    this.close = reason;
+    closeLatch.countDown();
+  }
+
+  @OnMessage
+  public void onMessage(String message) {
+    this.messageQueue.offer(message);
+  }
+
+  @OnOpen
+  public void onOpen(Session session) {
+    this.session = session;
+  }
+
+  @OnError
+  public void onError(Session session, Throwable thr) {
+    errors.add(thr);
+    this.onError = true;
+    this.errorMessage = thr.toString();
+  }
+
+  public void sendText(String text) throws IOException {
+    if (session != null) {
+      session.getBasicRemote().sendText(text);
+    }
+  }
+
+  /**
+   * Check whether we have expected close code
+   *
+   * @param expectedCloseCode
+   * @param timeoutDuration
+   * @param timeoutUnit
+   * @throws TimeoutException
+   */
+  public void awaitClose(int expectedCloseCode, int timeoutDuration,
+      TimeUnit timeoutUnit) throws TimeoutException {
+
+    long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration, timeoutUnit);
+    long now = System.currentTimeMillis();
+    long expireOn = now + msDur;
+
+    while (close == null) {
+      try {
+        TimeUnit.MILLISECONDS.sleep(10);
+      } catch (InterruptedException ignore) {
+        /* ignore */
+      }
+      if ((System.currentTimeMillis() > expireOn)) {
+        throw new TimeoutException("Timed out reading message from queue");
+      }
+
+    }
+
+  }
+
+  public class MessageQueue extends BlockingArrayQueue<String> {
+
+    public void awaitMessages(int expectedMessageCount, int timeoutDuration,
+        TimeUnit timeoutUnit) throws TimeoutException {
+      long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration, timeoutUnit);
+      long now = System.currentTimeMillis();
+      long expireOn = now + msDur;
+
+      while (this.size() < expectedMessageCount) {
+        try {
+          TimeUnit.MILLISECONDS.sleep(20);
+        } catch (InterruptedException ignore) {
+          /* ignore */
+        }
+        if ((System.currentTimeMillis() > expireOn)) {
+          throw new TimeoutException("Timed out reading message from queue");
+        }
+
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java
new file mode 100644
index 0000000..dc5f2f1
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import org.eclipse.jetty.websocket.server.WebSocketHandler;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * A Mock websocket handler that just Echos messages
+ *
+ */
+public class WebsocketEchoHandler extends WebSocketHandler implements WebSocketCreator {
+
+  final EchoSocket socket = new EchoSocket();
+  
+  @Override
+  public void configure(WebSocketServletFactory factory)
+  {
+      factory.getPolicy().setMaxTextMessageSize(2 * 1024 * 1024);
+      factory.setCreator(this);
+  }
+
+  @Override
+  public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp)
+  {
+      return socket;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/c6caebd4/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java
new file mode 100644
index 0000000..d6d60a4
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/websockets/WebsocketEchoTest.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.websockets;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.ContainerProvider;
+import javax.websocket.Session;
+import javax.websocket.WebSocketContainer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.config.impl.GatewayConfigImpl;
+import org.apache.hadoop.gateway.deploy.DeploymentFactory;
+import org.apache.hadoop.gateway.services.DefaultGatewayServices;
+import org.apache.hadoop.gateway.services.GatewayServices;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.topology.TopologyService;
+import org.apache.hadoop.gateway.topology.TopologyEvent;
+import org.apache.hadoop.gateway.topology.TopologyListener;
+import org.apache.hadoop.test.TestUtils;
+import org.easymock.EasyMock;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.handler.ContextHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.mycila.xmltool.XMLDoc;
+import com.mycila.xmltool.XMLTag;
+
+/**
+ * A basic test that attempts to proxy websocket connections through Knox
+ * gateway.
+ * <p>
+ * The way the test is set up is as follows: <br/>
+ * <ul>
+ * <li>A Mock Websocket server is setup which simply echos the responses sent by
+ * client.
+ * <li>Knox Gateway is set up with websocket handler
+ * {@link GatewayWebsocketHandler} that can proxy the requests.
+ * <li>Appropriate Topology and service definition files are set up with the
+ * address of the Websocket server.
+ * <li>A mock client is setup to connect to gateway.
+ * </ul>
+ * 
+ * The test is to confirm whether the message is sent all the way to the backend
+ * Websocket server through Knox and back.
+ * 
+ * 
+ * @since 0.10
+ */
+public class WebsocketEchoTest {
+
+  /**
+   * Simulate backend websocket
+   */
+  private static Server backendServer;
+  /**
+   * URI for backend websocket server
+   */
+  private static URI backendServerUri;
+
+  /**
+   * Mock Gateway server
+   */
+  private static Server gatewayServer;
+
+  /**
+   * Mock gateway config
+   */
+  private static GatewayConfig gatewayConfig;
+
+  private static GatewayServices services;
+
+  /**
+   * URI for gateway server
+   */
+  private static URI serverUri;
+
+  private static File topoDir;
+
+  public WebsocketEchoTest() {
+    super();
+  }
+
+  @BeforeClass
+  public static void startServers() throws Exception {
+
+    startWebsocketServer();
+    startGatewayServer();
+
+  }
+
+  @AfterClass
+  public static void stopServers() {
+    try {
+      gatewayServer.stop();
+      backendServer.stop();
+    } catch (final Exception e) {
+      e.printStackTrace(System.err);
+    }
+
+    /* Cleanup the created files */
+    FileUtils.deleteQuietly(topoDir);
+
+  }
+
+  /**
+   * Test direct connection to websocket server without gateway
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testDirectEcho() throws Exception {
+
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+    WebsocketClient client = new WebsocketClient();
+
+    Session session = container.connectToServer(client, backendServerUri);
+
+    session.getBasicRemote().sendText("Echo");
+    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+  }
+
+  /**
+   * Test websocket proxying through gateway.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testGatewayEcho() throws Exception {
+    WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+
+    WebsocketClient client = new WebsocketClient();
+    Session session = container.connectToServer(client,
+        new URI(serverUri.toString() + "gateway/websocket/ws"));
+
+    session.getBasicRemote().sendText("Echo");
+    client.messageQueue.awaitMessages(1, 1000, TimeUnit.MILLISECONDS);
+
+    assertThat(client.messageQueue.get(0), is("Echo"));
+
+  }
+
+  /**
+   * Start Mock Websocket server that acts as backend.
+   * 
+   * @throws Exception
+   */
+  private static void startWebsocketServer() throws Exception {
+
+    backendServer = new Server();
+    ServerConnector connector = new ServerConnector(backendServer);
+    backendServer.addConnector(connector);
+
+    final WebsocketEchoHandler handler = new WebsocketEchoHandler();
+
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    context.setHandler(handler);
+    backendServer.setHandler(context);
+
+    // Start Server
+    backendServer.start();
+
+    String host = connector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = connector.getLocalPort();
+    backendServerUri = new URI(String.format("ws://%s:%d/ws", host, port));
+
+  }
+
+  /**
+   * Start Gateway Server.
+   * 
+   * @throws Exception
+   */
+  private static void startGatewayServer() throws Exception {
+    gatewayServer = new Server();
+    final ServerConnector connector = new ServerConnector(gatewayServer);
+    gatewayServer.addConnector(connector);
+
+    /* workaround so we can add our handler later at runtime */
+    HandlerCollection handlers = new HandlerCollection(true);
+
+    /* add some initial handlers */
+    ContextHandler context = new ContextHandler();
+    context.setContextPath("/");
+    handlers.addHandler(context);
+
+    gatewayServer.setHandler(handlers);
+
+    // Start Server
+    gatewayServer.start();
+
+    String host = connector.getHost();
+    if (host == null) {
+      host = "localhost";
+    }
+    int port = connector.getLocalPort();
+    serverUri = new URI(String.format("ws://%s:%d/", host, port));
+
+    /* Setup websocket handler */
+    setupGatewayConfig(backendServerUri.toString());
+
+    final GatewayWebsocketHandler gatewayWebsocketHandler = new GatewayWebsocketHandler(
+        gatewayConfig, services);
+    handlers.addHandler(gatewayWebsocketHandler);
+    gatewayWebsocketHandler.start();
+  }
+
+  /**
+   * Initialize the configs and components required for this test.
+   * 
+   * @param backend
+   * @throws IOException
+   */
+  private static void setupGatewayConfig(final String backend)
+      throws IOException {
+    services = new DefaultGatewayServices();
+
+    topoDir = createDir();
+    URL serviceUrl = ClassLoader.getSystemResource("websocket-services");
+
+    final File descriptor = new File(topoDir, "websocket.xml");
+    final FileOutputStream stream = new FileOutputStream(descriptor);
+    createKnoxTopology(backend).toStream(stream);
+    stream.close();
+
+    final TestTopologyListener topoListener = new TestTopologyListener();
+
+    final Map<String, String> options = new HashMap<String, String>();
+    options.put("persist-master", "false");
+    options.put("master", "password");
+
+    gatewayConfig = EasyMock.createNiceMock(GatewayConfig.class);
+    EasyMock.expect(gatewayConfig.getGatewayTopologyDir())
+        .andReturn(topoDir.toString()).anyTimes();
+
+    EasyMock.expect(gatewayConfig.getGatewayServicesDir())
+        .andReturn(serviceUrl.getFile()).anyTimes();
+
+    EasyMock.expect(gatewayConfig.getEphemeralDHKeySize()).andReturn("2048")
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getGatewaySecurityDir())
+        .andReturn(topoDir.toString()).anyTimes();
+
+    /* Websocket configs */
+    EasyMock.expect(gatewayConfig.isWebsocketEnabled()).andReturn(true)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageSize())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageSize())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxTextMessageBufferSize())
+        .andReturn(
+            GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketMaxBinaryMessageBufferSize())
+        .andReturn(
+            GatewayConfigImpl.DEFAULT_WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketInputBufferSize())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_INPUT_BUFFER_SIZE)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketAsyncWriteTimeout())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_ASYNC_WRITE_TIMEOUT)
+        .anyTimes();
+
+    EasyMock.expect(gatewayConfig.getWebsocketIdleTimeout())
+        .andReturn(GatewayConfigImpl.DEFAULT_WEBSOCKET_IDLE_TIMEOUT).anyTimes();
+
+    EasyMock.replay(gatewayConfig);
+
+    try {
+      services.init(gatewayConfig, options);
+    } catch (ServiceLifecycleException e) {
+      e.printStackTrace();
+    }
+
+    DeploymentFactory.setGatewayServices(services);
+    final TopologyService monitor = services
+        .getService(GatewayServices.TOPOLOGY_SERVICE);
+    monitor.addTopologyChangeListener(topoListener);
+    monitor.reloadTopologies();
+
+  }
+
+  private static File createDir() throws IOException {
+    return TestUtils
+        .createTempDir(WebsocketEchoTest.class.getSimpleName() + "-");
+  }
+
+  private static XMLTag createKnoxTopology(final String backend) {
+    XMLTag xml = XMLDoc.newDocument(true).addRoot("topology").addTag("service")
+        .addTag("role").addText("WEBSOCKET").addTag("url").addText(backend)
+        .gotoParent().gotoRoot();
+    // System.out.println( "GATEWAY=" + xml.toString() );
+    return xml;
+  }
+
+  private static class TestTopologyListener implements TopologyListener {
+
+    public ArrayList<List<TopologyEvent>> events = new ArrayList<List<TopologyEvent>>();
+
+    @Override
+    public void handleTopologyEvent(List<TopologyEvent> events) {
+      this.events.add(events);
+
+      synchronized (this) {
+        for (TopologyEvent event : events) {
+          if (!event.getType().equals(TopologyEvent.Type.DELETED)) {
+
+            /* for this test we only care about this part */
+            DeploymentFactory.createDeployment(gatewayConfig,
+                event.getTopology());
+
+          }
+        }
+
+      }
+
+    }
+
+  }
+}


Mime
View raw message