knox-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kmin...@apache.org
Subject [2/4] KNOX-88: Support HDFS HA
Date Fri, 15 Aug 2014 20:41:30 GMT
http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
new file mode 100644
index 0000000..a7ab1cb
--- /dev/null
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs.dispatch;
+
+import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
+import org.apache.hadoop.gateway.filter.AbstractGatewayFilter;
+import org.apache.hadoop.gateway.ha.provider.HaProvider;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.HaServletContextListener;
+import org.apache.hadoop.gateway.hdfs.i18n.WebHdfsMessages;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.BufferedHttpEntity;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class WebHdfsHaHttpClientDispatch extends HttpClientDispatch {
+
+   private static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter";
+
+   private static final String RETRY_COUNTER_ATTRIBUTE = "dispatch.ha.retry.counter";
+
+   public static final String RESOURCE_ROLE_ATTRIBUTE = "resource.role";
+
+   private static final WebHdfsMessages LOG = MessagesFactory.get(WebHdfsMessages.class);
+
+   private int maxFailoverAttempts;
+
+   private int failoverSleep;
+
+   private int maxRetryAttempts;
+
+   private int retrySleep;
+
+   private String resourceRole;
+
+   private HaProvider haProvider;
+
+   @Override
+   public void init(FilterConfig filterConfig) throws ServletException {
+      resourceRole = filterConfig.getInitParameter(RESOURCE_ROLE_ATTRIBUTE);
+      LOG.initializingForResourceRole(resourceRole);
+      haProvider = HaServletContextListener.getHaProvider(filterConfig.getServletContext());
+      HaServiceConfig serviceConfig = haProvider.getHaDescriptor().getServiceConfig(resourceRole);
+      maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
+      failoverSleep = serviceConfig.getFailoverSleep();
+      maxRetryAttempts = serviceConfig.getMaxRetryAttempts();
+      retrySleep = serviceConfig.getRetrySleep();
+   }
+
+   @Override
+   protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+      HttpResponse inboundResponse = null;
+      try {
+         inboundResponse = executeOutboundRequest(outboundRequest);
+         writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+      } catch (StandbyException e) {
+         LOG.errorReceivedFromStandbyNode(e);
+         failoverRequest(outboundRequest, inboundRequest, outboundResponse);
+      } catch (SafeModeException e) {
+         LOG.errorReceivedFromSafeModeNode(e);
+         retryRequest(outboundRequest, inboundRequest, outboundResponse);
+      } catch (IOException e) {
+         LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e);
+         failoverRequest(outboundRequest, inboundRequest, outboundResponse);
+      }
+   }
+
+   /**
+    * Checks for specific outbound response codes/content to trigger a retry or failover
+    */
+   @Override
+   protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
+      if (inboundResponse.getStatusLine().getStatusCode() == 403) {
+         BufferedHttpEntity entity = new BufferedHttpEntity(inboundResponse.getEntity());
+         inboundResponse.setEntity(entity);
+         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+         inboundResponse.getEntity().writeTo(outputStream);
+         String body = new String(outputStream.toByteArray());
+         if (body.contains("StandbyException")) {
+            throw new StandbyException();
+         }
+         if (body.contains("SafeModeException")) {
+            throw new SafeModeException();
+         }
+      }
+      super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+   }
+
+   private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+      LOG.failingOverRequest(outboundRequest.getURI().toString());
+      AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
+      if (counter == null) {
+         counter = new AtomicInteger(1);
+      }
+      inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
+      if (counter.incrementAndGet() <= maxFailoverAttempts) {
+         haProvider.markFailedURL(resourceRole, outboundRequest.getURI().toString());
+         //null out target url so that rewriters run again
+         inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null);
+         URI uri = getDispatchUrl(inboundRequest);
+         ((HttpRequestBase) outboundRequest).setURI(uri);
+         if (failoverSleep > 0) {
+            try {
+               Thread.sleep(failoverSleep);
+            } catch (InterruptedException e) {
+               LOG.failoverSleepFailed(resourceRole, e);
+            }
+         }
+         executeRequest(outboundRequest, inboundRequest, outboundResponse);
+      } else {
+         LOG.maxFailoverAttemptsReached(maxFailoverAttempts, resourceRole);
+      }
+   }
+
+   private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) {
+      LOG.retryingRequest(outboundRequest.getURI().toString());
+      AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE);
+      if (counter == null) {
+         counter = new AtomicInteger(1);
+      }
+      inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter);
+      if (counter.incrementAndGet() <= maxRetryAttempts) {
+         if (retrySleep > 0) {
+            try {
+               Thread.sleep(retrySleep);
+            } catch (InterruptedException e) {
+               LOG.retrySleepFailed(resourceRole, e);
+            }
+         }
+         executeRequest(outboundRequest, inboundRequest, outboundResponse);
+      } else {
+         LOG.maxRetryAttemptsReached(maxRetryAttempts, resourceRole, outboundRequest.getURI().toString());
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java
new file mode 100644
index 0000000..b3671cb
--- /dev/null
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/i18n/WebHdfsMessages.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs.i18n;
+
+import org.apache.hadoop.gateway.i18n.messages.Message;
+import org.apache.hadoop.gateway.i18n.messages.MessageLevel;
+import org.apache.hadoop.gateway.i18n.messages.Messages;
+import org.apache.hadoop.gateway.i18n.messages.StackTrace;
+
+@Messages(logger = "org.apache.hadoop.gateway")
+public interface WebHdfsMessages {
+
+   @Message(level = MessageLevel.INFO, text = "Initializing Ha Dispatch for: {0}")
+   void initializingForResourceRole(String resourceRole);
+
+   @Message(level = MessageLevel.INFO, text = "Received an error from a node in Standby: {0}")
+   void errorReceivedFromStandbyNode(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+   @Message(level = MessageLevel.INFO, text = "Could not connect to server: {0} {1}")
+   void errorConnectingToServer(String uri, @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+   @Message(level = MessageLevel.INFO, text = "Received an error from a node in SafeMode: {0}")
+   void errorReceivedFromSafeModeNode(@StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+   @Message(level = MessageLevel.INFO, text = "Failing over request to a different server: {0}")
+   void failingOverRequest(String uri);
+
+   @Message(level = MessageLevel.INFO, text = "Retrying request to a server: {0}")
+   void retryingRequest(String uri);
+
+   @Message(level = MessageLevel.INFO, text = "Maximum attempts {0} to failover reached for service: {1}")
+   void maxFailoverAttemptsReached(int attempts, String service);
+
+   @Message(level = MessageLevel.INFO, text = "Maximum attempts {0} to retry reached for service: {1} at url : {2}")
+   void maxRetryAttemptsReached(int attempts, String service, String url);
+
+   @Message(level = MessageLevel.INFO, text = "Error occurred while trying to sleep for failover : {0} {1}")
+   void failoverSleepFailed(String service, @StackTrace(level = MessageLevel.DEBUG) Exception e);
+
+   @Message(level = MessageLevel.INFO, text = "Error occurred while trying to sleep for retry : {0} {1}")
+   void retrySleepFailed(String service, @StackTrace(level = MessageLevel.DEBUG) Exception e);
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-service-webhdfs/src/main/resources/META-INF/services/org.apache.hadoop.gateway.deploy.ProviderDeploymentContributor
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/resources/META-INF/services/org.apache.hadoop.gateway.deploy.ProviderDeploymentContributor b/gateway-service-webhdfs/src/main/resources/META-INF/services/org.apache.hadoop.gateway.deploy.ProviderDeploymentContributor
new file mode 100644
index 0000000..43a6b3f
--- /dev/null
+++ b/gateway-service-webhdfs/src/main/resources/META-INF/services/org.apache.hadoop.gateway.deploy.ProviderDeploymentContributor
@@ -0,0 +1,19 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+
+org.apache.hadoop.gateway.hdfs.NameNodeHaDispatchDeploymentContributor
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/NameNodeHaDispatchDeploymentContributorTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/NameNodeHaDispatchDeploymentContributorTest.java b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/NameNodeHaDispatchDeploymentContributorTest.java
new file mode 100644
index 0000000..596ac81
--- /dev/null
+++ b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/NameNodeHaDispatchDeploymentContributorTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs;
+
+import org.apache.hadoop.gateway.deploy.ProviderDeploymentContributor;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class NameNodeHaDispatchDeploymentContributorTest {
+
+   @Test
+   public void testServiceLoader() throws Exception {
+      ServiceLoader loader = ServiceLoader.load( ProviderDeploymentContributor.class );
+      Iterator iterator = loader.iterator();
+      assertThat( "Service iterator empty.", iterator.hasNext() );
+      while( iterator.hasNext() ) {
+         Object object = iterator.next();
+         if( object instanceof NameNodeHaDispatchDeploymentContributor) {
+            return;
+         }
+      }
+      fail( "Failed to find " + NameNodeHaDispatchDeploymentContributor.class.getName() + " via service loader." );
+   }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
new file mode 100644
index 0000000..a9aa5a6
--- /dev/null
+++ b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs.dispatch;
+
+import org.apache.hadoop.gateway.ha.provider.HaDescriptor;
+import org.apache.hadoop.gateway.ha.provider.HaProvider;
+import org.apache.hadoop.gateway.ha.provider.HaServletContextListener;
+import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaProvider;
+import org.apache.hadoop.gateway.ha.provider.impl.HaDescriptorFactory;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.params.BasicHttpParams;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class WebHdfsHaHttpClientDispatchTest {
+
+   @Test
+   public void testConnectivityFailover() throws Exception {
+      String serviceName = "WEBHDFS";
+      HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
+      descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", "2", "1000"));
+      HaProvider provider = new DefaultHaProvider(descriptor);
+      URI uri1 = new URI( "http://unreachable-host" );
+      URI uri2 = new URI( "http://reachable-host" );
+      ArrayList<String> urlList = new ArrayList<String>();
+      urlList.add(uri1.toString());
+      urlList.add(uri2.toString());
+      provider.addHaService(serviceName, urlList);
+      FilterConfig filterConfig = EasyMock.createNiceMock(FilterConfig.class);
+      ServletContext servletContext = EasyMock.createNiceMock(ServletContext.class);
+
+      EasyMock.expect(filterConfig.getInitParameter(WebHdfsHaHttpClientDispatch.RESOURCE_ROLE_ATTRIBUTE)).andReturn(serviceName).anyTimes();
+      EasyMock.expect(filterConfig.getServletContext()).andReturn(servletContext).anyTimes();
+      EasyMock.expect(servletContext.getAttribute(HaServletContextListener.PROVIDER_ATTRIBUTE_NAME)).andReturn(provider).anyTimes();
+
+      BasicHttpParams params = new BasicHttpParams();
+
+      HttpUriRequest outboundRequest = EasyMock.createNiceMock(HttpRequestBase.class);
+      EasyMock.expect(outboundRequest.getMethod()).andReturn( "GET" ).anyTimes();
+      EasyMock.expect(outboundRequest.getURI()).andReturn( uri1  ).anyTimes();
+      EasyMock.expect(outboundRequest.getParams()).andReturn( params ).anyTimes();
+
+      HttpServletRequest inboundRequest = EasyMock.createNiceMock(HttpServletRequest.class);
+      EasyMock.expect(inboundRequest.getRequestURL()).andReturn( new StringBuffer(uri2.toString()) ).once();
+      EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(0)).once();
+      EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(1)).once();
+
+      HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class);
+      EasyMock.expect(outboundResponse.getOutputStream()).andAnswer( new IAnswer<ServletOutputStream>() {
+         @Override
+         public ServletOutputStream answer() throws Throwable {
+            return new ServletOutputStream() {
+               @Override
+               public void write( int b ) throws IOException {
+                  throw new IOException( "unreachable-host" );
+               }
+            };
+         }
+      }).once();
+      EasyMock.replay(filterConfig, servletContext, outboundRequest, inboundRequest, outboundResponse);
+      Assert.assertEquals(uri1.toString(), provider.getActiveURL(serviceName));
+      WebHdfsHaHttpClientDispatch dispatch = new WebHdfsHaHttpClientDispatch();
+      dispatch.init(filterConfig);
+      long startTime = System.currentTimeMillis();
+      dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
+      long elapsedTime = System.currentTimeMillis() - startTime;
+      Assert.assertEquals(uri2.toString(), provider.getActiveURL(serviceName));
+      //test to make sure the sleep took place
+      Assert.assertTrue(elapsedTime > 1000);
+   }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
index c1b9625..c79ad6b 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
@@ -58,245 +58,253 @@ import org.apache.http.message.BasicHeader;
  *
  */
 public class HttpClientDispatch extends AbstractGatewayDispatch {
-  
-  private static final String REPLAY_BUFFER_SIZE = "replayBufferSize";
-  
-  // private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
-  // private static final String CT_APP_XML = "application/xml";
-  protected static final String Q_DELEGATION_EQ = "?delegation=";
-  protected static final String AMP_DELEGATION_EQ = "&delegation=";
-  protected static final String COOKIE = "Cookie";
-  protected static final String SET_COOKIE = "Set-Cookie";
-  protected static final String WWW_AUTHENTICATE = "WWW-Authenticate";
-  protected static final String NEGOTIATE = "Negotiate";
-
-  protected static SpiGatewayMessages LOG = MessagesFactory.get( SpiGatewayMessages.class );
-  protected static SpiGatewayResources RES = ResourcesFactory.get( SpiGatewayResources.class );
-  protected static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor( AuditConstants.DEFAULT_AUDITOR_NAME,
-          AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME );
-  private static final int DEFAULT_REPLAY_BUFFER_SIZE =  4 * 1024; // 4K
-
-  protected AppCookieManager appCookieManager;
-  
-  protected static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
-  
-  private int replayBufferSize = 0;
-
-  @Override
-  public void init( FilterConfig filterConfig ) throws ServletException {
-    this.init(filterConfig, new AppCookieManager() );
-  }
-
-  protected void init( FilterConfig filterConfig, AppCookieManager cookieManager ) throws ServletException {
-    super.init( filterConfig );
-    appCookieManager = cookieManager;
-    String replayBufferSizeString = filterConfig.getInitParameter( REPLAY_BUFFER_SIZE_PARAM );
-    if ( replayBufferSizeString != null ) {
-      setReplayBufferSize(Integer.valueOf(replayBufferSizeString));
-    }
-  }
-
-  protected void executeRequest(
-      HttpUriRequest outboundRequest,
-      HttpServletRequest inboundRequest,
-      HttpServletResponse outboundResponse )
-          throws IOException {
-    LOG.dispatchRequest( outboundRequest.getMethod(), outboundRequest.getURI() );
-    DefaultHttpClient client = new DefaultHttpClient();
-
-    HttpResponse inboundResponse = null;
-    try {
-      String query = outboundRequest.getURI().getQuery();
-      if (!"true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
-        // Hadoop cluster not Kerberos enabled
-        addCredentialsToRequest(outboundRequest);
-        inboundResponse = client.execute(outboundRequest);
-      } else if (query.contains(Q_DELEGATION_EQ) ||
-        // query string carries delegation token
-        query.contains(AMP_DELEGATION_EQ)) {
-        inboundResponse = client.execute(outboundRequest);
-      } else { 
-        // Kerberos secured, no delegation token in query string
-        inboundResponse = executeKerberosDispatch(outboundRequest, client);
+
+   private static final String REPLAY_BUFFER_SIZE = "replayBufferSize";
+
+   // private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
+   // private static final String CT_APP_XML = "application/xml";
+   protected static final String Q_DELEGATION_EQ = "?delegation=";
+   protected static final String AMP_DELEGATION_EQ = "&delegation=";
+   protected static final String COOKIE = "Cookie";
+   protected static final String SET_COOKIE = "Set-Cookie";
+   protected static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+   protected static final String NEGOTIATE = "Negotiate";
+
+   protected static SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class);
+   protected static SpiGatewayResources RES = ResourcesFactory.get(SpiGatewayResources.class);
+   protected static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME,
+         AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME);
+   private static final int DEFAULT_REPLAY_BUFFER_SIZE = 4 * 1024; // 4K
+
+   protected AppCookieManager appCookieManager;
+
+   protected static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+
+   private int replayBufferSize = 0;
+
+   @Override
+   public void init(FilterConfig filterConfig) throws ServletException {
+      this.init(filterConfig, new AppCookieManager());
+   }
+
+   protected void init(FilterConfig filterConfig, AppCookieManager cookieManager) throws ServletException {
+      super.init(filterConfig);
+      appCookieManager = cookieManager;
+      String replayBufferSizeString = filterConfig.getInitParameter(REPLAY_BUFFER_SIZE_PARAM);
+      if (replayBufferSizeString != null) {
+         setReplayBufferSize(Integer.valueOf(replayBufferSizeString));
       }
-    } catch (IOException e) {
-      // we do not want to expose back end host. port end points to clients, see JIRA KNOX-58
-      LOG.dispatchServiceConnectionException( outboundRequest.getURI(), e );
-      auditor.audit( Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.FAILURE );
-      throw new IOException( RES.dispatchConnectionError() );
-    } finally {
-      if (inboundResponse != null) {
-        int statusCode = inboundResponse.getStatusLine().getStatusCode();
-        if( statusCode != 201 ) {
-          LOG.dispatchResponseStatusCode( statusCode );
-        } else {
-          Header location = inboundResponse.getFirstHeader( "Location" );
-          if( location == null ) {
-            LOG.dispatchResponseStatusCode( statusCode );
-          } else {
-            LOG.dispatchResponseCreatedStatusCode( statusCode, location.getValue() );
-          }
-        }
-        auditor.audit( Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus( statusCode ) );
-      } else {
-        auditor.audit( Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE );
+   }
+
+   protected void executeRequest(
+         HttpUriRequest outboundRequest,
+         HttpServletRequest inboundRequest,
+         HttpServletResponse outboundResponse)
+         throws IOException {
+      HttpResponse inboundResponse = executeOutboundRequest(outboundRequest);
+      writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+   }
+
+   protected HttpResponse executeOutboundRequest(HttpUriRequest outboundRequest) throws IOException {
+      LOG.dispatchRequest(outboundRequest.getMethod(), outboundRequest.getURI());
+      HttpResponse inboundResponse = null;
+      DefaultHttpClient client = new DefaultHttpClient();
+
+      try {
+         String query = outboundRequest.getURI().getQuery();
+         if (!"true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
+            // Hadoop cluster not Kerberos enabled
+            addCredentialsToRequest(outboundRequest);
+            inboundResponse = client.execute(outboundRequest);
+         } else if (query.contains(Q_DELEGATION_EQ) ||
+               // query string carries delegation token
+               query.contains(AMP_DELEGATION_EQ)) {
+            inboundResponse = client.execute(outboundRequest);
+         } else {
+            // Kerberos secured, no delegation token in query string
+            inboundResponse = executeKerberosDispatch(outboundRequest, client);
+         }
+      } catch (IOException e) {
+         // we do not want to expose back end host. port end points to clients, see JIRA KNOX-58
+         LOG.dispatchServiceConnectionException(outboundRequest.getURI(), e);
+         auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.FAILURE);
+         throw new IOException(RES.dispatchConnectionError());
+      } finally {
+         if (inboundResponse != null) {
+            int statusCode = inboundResponse.getStatusLine().getStatusCode();
+            if (statusCode != 201) {
+               LOG.dispatchResponseStatusCode(statusCode);
+            } else {
+               Header location = inboundResponse.getFirstHeader("Location");
+               if (location == null) {
+                  LOG.dispatchResponseStatusCode(statusCode);
+               } else {
+                  LOG.dispatchResponseCreatedStatusCode(statusCode, location.getValue());
+               }
+            }
+            auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus(statusCode));
+         } else {
+            auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE);
+         }
+
       }
-      
-    }
-
-    // Copy the client respond header to the server respond.
-    outboundResponse.setStatus( inboundResponse.getStatusLine().getStatusCode() );
-    Header[] headers = inboundResponse.getAllHeaders();
-    for( Header header : headers ) {
-      String name = header.getName();
-      if (name.equals(SET_COOKIE) || name.equals(WWW_AUTHENTICATE)) {
-        continue;
+      return inboundResponse;
+   }
+
+   protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
+      // Copy the client respond header to the server respond.
+      outboundResponse.setStatus(inboundResponse.getStatusLine().getStatusCode());
+      Header[] headers = inboundResponse.getAllHeaders();
+      for (Header header : headers) {
+         String name = header.getName();
+         if (name.equals(SET_COOKIE) || name.equals(WWW_AUTHENTICATE)) {
+            continue;
+         }
+         String value = header.getValue();
+         outboundResponse.addHeader(name, value);
       }
-      String value = header.getValue();
-      outboundResponse.addHeader( name, value );
-    }
-
-    HttpEntity entity = inboundResponse.getEntity();
-    if( entity != null ) {
-      Header contentType = entity.getContentType();
-      if( contentType != null ) {
-        outboundResponse.setContentType( contentType.getValue() );
+
+      HttpEntity entity = inboundResponse.getEntity();
+      if (entity != null) {
+         Header contentType = entity.getContentType();
+         if (contentType != null) {
+            outboundResponse.setContentType(contentType.getValue());
+         }
+         //KM[ If this is set here it ends up setting the content length to the content returned from the server.
+         // This length might not match if the the content is rewritten.
+         //      long contentLength = entity.getContentLength();
+         //      if( contentLength <= Integer.MAX_VALUE ) {
+         //        outboundResponse.setContentLength( (int)contentLength );
+         //      }
+         //]
+         writeResponse(inboundRequest, outboundResponse, entity.getContent());
+      }
+   }
+
+   /**
+    * This method provides a hook for specialized credential propagation
+    * in subclasses.
+    *
+    * @param outboundRequest
+    */
+   protected void addCredentialsToRequest(HttpUriRequest outboundRequest) {
+   }
+
+   protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest,
+                                                  DefaultHttpClient client) throws IOException, ClientProtocolException {
+      HttpResponse inboundResponse;
+      outboundRequest.removeHeaders(COOKIE);
+      String appCookie = appCookieManager.getCachedAppCookie();
+      if (appCookie != null) {
+         outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
       }
-//KM[ If this is set here it ends up setting the content length to the content returned from the server.
-// This length might not match if the the content is rewritten.
-//      long contentLength = entity.getContentLength();
-//      if( contentLength <= Integer.MAX_VALUE ) {
-//        outboundResponse.setContentLength( (int)contentLength );
-//      }
-//]
-      writeResponse( inboundRequest, outboundResponse, entity.getContent() );
-    }
-  }
-
-  /**
-   * This method provides a hook for specialized credential propagation
-   * in subclasses.
-   * 
-   * @param outboundRequest
-   */
-  protected void addCredentialsToRequest(HttpUriRequest outboundRequest) {
-  }
-
-  protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest,
-      DefaultHttpClient client) throws IOException, ClientProtocolException {
-    HttpResponse inboundResponse;
-    outboundRequest.removeHeaders(COOKIE);
-    String appCookie = appCookieManager.getCachedAppCookie();
-    if (appCookie != null) {
-      outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
-    }
-    inboundResponse = client.execute(outboundRequest);
-    // if inBoundResponse has status 401 and header WWW-Authenticate: Negoitate
-    // refresh hadoop.auth.cookie and attempt one more time
-    int statusCode = inboundResponse.getStatusLine().getStatusCode();
-    if (statusCode == HttpStatus.SC_UNAUTHORIZED ) {
-      Header[] wwwAuthHeaders = inboundResponse.getHeaders(WWW_AUTHENTICATE) ;
-      if (wwwAuthHeaders != null && wwwAuthHeaders.length != 0 && 
-          wwwAuthHeaders[0].getValue().trim().startsWith(NEGOTIATE)) {
-        appCookie = appCookieManager.getAppCookie(outboundRequest, true);
-        outboundRequest.removeHeaders(COOKIE);
-        outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
-        client = new DefaultHttpClient();
-        inboundResponse = client.execute(outboundRequest);
+      inboundResponse = client.execute(outboundRequest);
+      // if inBoundResponse has status 401 and header WWW-Authenticate: Negoitate
+      // refresh hadoop.auth.cookie and attempt one more time
+      int statusCode = inboundResponse.getStatusLine().getStatusCode();
+      if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
+         Header[] wwwAuthHeaders = inboundResponse.getHeaders(WWW_AUTHENTICATE);
+         if (wwwAuthHeaders != null && wwwAuthHeaders.length != 0 &&
+               wwwAuthHeaders[0].getValue().trim().startsWith(NEGOTIATE)) {
+            appCookie = appCookieManager.getAppCookie(outboundRequest, true);
+            outboundRequest.removeHeaders(COOKIE);
+            outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
+            client = new DefaultHttpClient();
+            inboundResponse = client.execute(outboundRequest);
+         } else {
+            // no supported authentication type found
+            // we would let the original response propagate
+         }
       } else {
-        // no supported authentication type found
-        // we would let the original response propagate
+         // not a 401 Unauthorized status code
+         // we would let the original response propagate
       }
-    } else {
-      // not a 401 Unauthorized status code
-      // we would let the original response propagate
-    }
-    return inboundResponse;
-  }
-  
-  protected HttpEntity createRequestEntity(HttpServletRequest request)
-      throws IOException {
-
-    String contentType = request.getContentType();
-    int contentLength = request.getContentLength();
-    InputStream contentStream = request.getInputStream();
-
-    HttpEntity entity;
-    if( contentType == null ) {
-      entity = new InputStreamEntity( contentStream, contentLength );
-    } else {
-      entity = new InputStreamEntity(contentStream, contentLength, ContentType.parse( contentType ) );
-    }
-
- 
-    if ("true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {  
-   
-      //Check if delegation token is supplied in the request
-      boolean delegationTokenPresent = false;
-      String queryString = request.getQueryString();
-      if (queryString != null) {
-        delegationTokenPresent = queryString.startsWith("delegation=") || 
-            queryString.contains("&delegation=");
-      }     
-      if (!delegationTokenPresent && getReplayBufferSize() > 0 ) {
-          entity = new CappedBufferHttpEntity( entity, getReplayBufferSize() * 1024 );
+      return inboundResponse;
+   }
+
+   protected HttpEntity createRequestEntity(HttpServletRequest request)
+         throws IOException {
+
+      String contentType = request.getContentType();
+      int contentLength = request.getContentLength();
+      InputStream contentStream = request.getInputStream();
+
+      HttpEntity entity;
+      if (contentType == null) {
+         entity = new InputStreamEntity(contentStream, contentLength);
+      } else {
+         entity = new InputStreamEntity(contentStream, contentLength, ContentType.parse(contentType));
       }
-    }
-
-    return entity;
-  }
-
-  @Override
-  public void doGet( URI url, HttpServletRequest request, HttpServletResponse response )
-      throws IOException, URISyntaxException {
-    HttpGet method = new HttpGet( url );
-    // https://issues.apache.org/jira/browse/KNOX-107 - Service URLs not rewritten for WebHDFS GET redirects
-    method.getParams().setBooleanParameter( "http.protocol.handle-redirects", false );
-    copyRequestHeaderFields( method, request );
-    executeRequest( method, request, response );
-  }
-
-  @Override
-  public void doOptions( URI url, HttpServletRequest request, HttpServletResponse response )
-      throws IOException, URISyntaxException {
-    HttpOptions method = new HttpOptions( url );
-    executeRequest( method, request, response );
-  }
-
-  @Override
-  public void doPut( URI url, HttpServletRequest request, HttpServletResponse response )
-      throws IOException, URISyntaxException {
-    HttpPut method = new HttpPut( url );
-    HttpEntity entity = createRequestEntity( request );
-    method.setEntity( entity );
-    copyRequestHeaderFields( method, request );
-    executeRequest( method, request, response );
-  }
-
-  @Override
-  public void doPost( URI url, HttpServletRequest request, HttpServletResponse response )
-      throws IOException, URISyntaxException {
-    HttpPost method = new HttpPost( url );
-    HttpEntity entity = createRequestEntity( request );
-    method.setEntity( entity );
-    copyRequestHeaderFields( method, request );
-    executeRequest( method, request, response );
-  }
-
-  @Override
-  public void doDelete( URI url, HttpServletRequest request, HttpServletResponse response )
-      throws IOException, URISyntaxException {
-    HttpDelete method = new HttpDelete( url );
-    copyRequestHeaderFields( method, request );
-    executeRequest( method, request, response );
-  }
-
-  protected int getReplayBufferSize() {
-    return replayBufferSize;
-  }
-  
-  protected void setReplayBufferSize(int size) {
-    replayBufferSize = size;
-  }
-  
+
+
+      if ("true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
+
+         //Check if delegation token is supplied in the request
+         boolean delegationTokenPresent = false;
+         String queryString = request.getQueryString();
+         if (queryString != null) {
+            delegationTokenPresent = queryString.startsWith("delegation=") ||
+                  queryString.contains("&delegation=");
+         }
+         if (!delegationTokenPresent && getReplayBufferSize() > 0) {
+            entity = new CappedBufferHttpEntity(entity, getReplayBufferSize() * 1024);
+         }
+      }
+
+      return entity;
+   }
+
+   @Override
+   public void doGet(URI url, HttpServletRequest request, HttpServletResponse response)
+         throws IOException, URISyntaxException {
+      HttpGet method = new HttpGet(url);
+      // https://issues.apache.org/jira/browse/KNOX-107 - Service URLs not rewritten for WebHDFS GET redirects
+      method.getParams().setBooleanParameter("http.protocol.handle-redirects", false);
+      copyRequestHeaderFields(method, request);
+      executeRequest(method, request, response);
+   }
+
+   @Override
+   public void doOptions(URI url, HttpServletRequest request, HttpServletResponse response)
+         throws IOException, URISyntaxException {
+      HttpOptions method = new HttpOptions(url);
+      executeRequest(method, request, response);
+   }
+
+   @Override
+   public void doPut(URI url, HttpServletRequest request, HttpServletResponse response)
+         throws IOException, URISyntaxException {
+      HttpPut method = new HttpPut(url);
+      HttpEntity entity = createRequestEntity(request);
+      method.setEntity(entity);
+      copyRequestHeaderFields(method, request);
+      executeRequest(method, request, response);
+   }
+
+   @Override
+   public void doPost(URI url, HttpServletRequest request, HttpServletResponse response)
+         throws IOException, URISyntaxException {
+      HttpPost method = new HttpPost(url);
+      HttpEntity entity = createRequestEntity(request);
+      method.setEntity(entity);
+      copyRequestHeaderFields(method, request);
+      executeRequest(method, request, response);
+   }
+
+   @Override
+   public void doDelete(URI url, HttpServletRequest request, HttpServletResponse response)
+         throws IOException, URISyntaxException {
+      HttpDelete method = new HttpDelete(url);
+      copyRequestHeaderFields(method, request);
+      executeRequest(method, request, response);
+   }
+
+   protected int getReplayBufferSize() {
+      return replayBufferSize;
+   }
+
+   protected void setReplayBufferSize(int size) {
+      replayBufferSize = size;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java
new file mode 100644
index 0000000..0bc89c6
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.http.entity.HttpEntityWrapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+@NotThreadSafe
+public class PartiallyRepeatableHttpEntity extends HttpEntityWrapper {
+
+  public static final int DEFAULT_BUFFER_SIZE = 4096;
+
+  private int replayWriteIndex;
+  private int replayWriteLimit;
+  private byte[] replayBuffer;
+  private ReplayStream finalStream;
+  private InputStream wrappedStream;
+
+  public PartiallyRepeatableHttpEntity(final HttpEntity entity, int bufferSize) throws IOException {
+    super( entity );
+    this.wrappedStream = null;
+    this.finalStream = null;
+    this.replayWriteIndex = -1;
+    if( !entity.isRepeatable() ) {
+      this.replayBuffer = new byte[ bufferSize ];
+      this.replayWriteLimit = bufferSize-1;
+    } else {
+      this.replayBuffer = null;
+    }
+  }
+
+  public PartiallyRepeatableHttpEntity(final HttpEntity entity) throws IOException {
+    this( entity, DEFAULT_BUFFER_SIZE );
+  }
+
+  @Override
+  public boolean isRepeatable() {
+    return true;
+  }
+
+  @Override
+  public boolean isStreaming() {
+    return wrappedEntity.isStreaming();
+  }
+
+  @Override
+  public boolean isChunked() {
+    return wrappedEntity.isChunked();
+  }
+
+  @Override
+  public long getContentLength() {
+    return wrappedEntity.getContentLength();
+  }
+
+  // This will throw an IOException if an attempt is made to getContent a second time after
+  // more bytes than the buffer can hold has been read on the first stream.
+  @Override
+  public InputStream getContent() throws IOException {
+    // If the wrapped stream is repeatable return it directly.
+    if( replayBuffer == null ) {
+      return wrappedEntity.getContent();
+    // Else if the buffer has overflowed
+    } else if( finalStream != null ) {
+      throw new IOException( "TODO - Existing stream already past replay buffer capacity" );
+    } else {
+      if( wrappedStream == null ) {
+         wrappedStream = wrappedEntity.getContent();
+      }
+      return new ReplayStream();
+    }
+  }
+
+  @Override
+  public void writeTo( final OutputStream stream ) throws IOException {
+    IOUtils.copy( getContent(), stream );
+  }
+
+  @Override
+  @SuppressWarnings( "deprecation" )
+  public void consumeContent() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  private class ReplayStream extends InputStream {
+
+    private int replayReadIndex = -1;
+
+    @Override
+    public int read() throws IOException {
+      int b;
+      if( finalStream != null && finalStream != this ) {
+        throw new IOException( "TODO - Replay stream taken over by another consumer." );
+      }
+      // If we can read from the buffer do so.
+      if( replayReadIndex < replayWriteIndex ) {
+        b = replayBuffer[ ++replayReadIndex ];
+      } else {
+        b = wrappedStream.read();
+        // If the underlying stream is not closed.
+        if( b > -1 ) {
+          if( replayWriteIndex < replayWriteLimit ) {
+            replayBuffer[ ++replayWriteIndex ] = (byte)b;
+            replayReadIndex++;
+          } else {
+            finalStream = this;
+          }
+        }
+      }
+      return b;
+    }
+
+    public int read( byte buffer[], int offset, int limit ) throws IOException {
+      int count = -1;
+      if( finalStream != null && finalStream != this ) {
+        throw new IOException( "TODO - Replay stream taken over by another consumer." );
+      }
+      // If we can read from the buffer do so.
+      if( replayReadIndex < replayWriteIndex ) {
+        count = replayWriteIndex - replayReadIndex;
+        count = Math.min( limit, count );
+        System.arraycopy( replayBuffer, replayReadIndex+1, buffer, offset, count );
+        replayReadIndex += count;
+      } else {
+        count = wrappedStream.read( buffer, offset, limit );
+        // If the underlying stream is not closed.
+        if( count > -1 ) {
+          if( replayWriteIndex+count < replayWriteLimit ) {
+            System.arraycopy( buffer, offset, replayBuffer, replayWriteIndex+1, count );
+            replayReadIndex += count;
+            replayWriteIndex += count;
+          } else {
+            finalStream = this;
+          }
+        }
+      }
+      return count;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/registry/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/registry/ServiceRegistry.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/registry/ServiceRegistry.java
index fc41c57..65a4ad9 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/registry/ServiceRegistry.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/registry/ServiceRegistry.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.gateway.services.registry;
 
+import java.util.List;
+
 public interface ServiceRegistry {
 
   String getRegistrationCode(String clusterName);
   
-  boolean registerService(String regCode, String ClusterName, String serviceName, String url);
+  boolean registerService(String regCode, String clusterName, String serviceName, List<String> urls);
   
-  String lookupServiceURL(String ClusterName, String serviceName);
+  String lookupServiceURL(String clusterName, String serviceName);
+
+  List<String> lookupServiceURLs( String clusterName, String serviceName );
   
   void removeClusterServices(String clusterName);
 

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/Service.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/Service.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/Service.java
index 7d3dcb6..b55fa3e 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/Service.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/topology/Service.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.gateway.topology;
 
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 public class Service {
@@ -25,6 +27,7 @@ public class Service {
   private String role;
   private String name;
   private Map<String, String> params = new LinkedHashMap<String, String>();
+  private List<String> urls;
 
   public String getRole() {
     return role;
@@ -42,14 +45,26 @@ public class Service {
     this.name = name;
   }
 
-  private String url;
+  public List<String> getUrls() {
+    if ( urls == null ) {
+      urls = new ArrayList<String>();
+    }
+    return urls;
+  }
+
+  public void setUrls( List<String> urls ) {
+    this.urls = urls;
+  }
 
   public String getUrl() {
-    return url;
+    if ( !getUrls().isEmpty() ) {
+      return getUrls().get( 0 );
+    }
+    return null;
   }
 
-  public void setUrl( String url ) {
-    this.url = url;
+  public void addUrl( String url) {
+    getUrls().add( url );
   }
 
   public Map<String, String> getParams() {

http://git-wip-us.apache.org/repos/asf/knox/blob/9aaeeed1/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java b/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java
new file mode 100644
index 0000000..42528bb
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java
@@ -0,0 +1,874 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+
+public class PartiallyRepeatableHttpEntityTest {
+
+  private static Charset UTF8 = Charset.forName( "UTF-8" );
+
+  // Variables
+  // Consumers: C1, C2
+  // Reads: FC - Full Content, PC - Partial Content, AC - Any Content
+  // Reads: IB - In Buffer, OB - Overflow Buffer
+  // Close: XC
+  // Expect: EE
+
+  // Test Cases
+  // C1 FC
+  //   C1 FC/IB.
+  //   C1 FC/OB.
+  //   C1 FC/IB; C2 FC.
+  //   C1 FC/OB; C2 AC; EE
+  //   C1 FC/IB; C1 XC; C2 FC.
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  // C1 PC
+  //   C1 PC/IB.
+  //   C1 PC/OB.
+  //   C1 PC/IB; C2 FC.
+  //   C1 PC/OB; C2 AC; EE
+  //   C1 PC/IB; C1 XC; C2 FC.
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  // C1 C2 C1
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+
+  @Test
+  public void testS__C1_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB__C1_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS__C1_FC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB__C1_FC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS_C1_FC_IB__C2_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB_C1_FC_IB__C2_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS_C1_FC_OB__C2_AC__EE() throws Exception {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testB_C1_FC_OB__C2_AC__EE() throws Exception {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 FC/IB; C1 XC; C2 FC.
+  @Test
+  public void testS_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 FC/IB; C1 XC; C2 FC.
+  @Test
+  public void testB_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testS_C1_FC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testB_C1_FC_OB__C1_XC__C2_AC_EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB.
+  @Test
+  public void testS_C1_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 3 );
+    assertThat( text, is( "012" ) );
+  }
+
+  //   C1 PC/IB.
+  @Test
+  public void testB_C1_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 3, 3 );
+    assertThat( text, is( "012" ) );
+  }
+
+  //   C1 PC/OB.
+  @Test
+  public void testS_C1_PC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+  }
+
+  //   C1 PC/OB.
+  @Test
+  public void testB_C1_PC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 4 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+  }
+
+  //   C1 PC/IB; C2 FC.
+  @Test
+  public void testS_C1_PC_IB__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 4 );
+    assertThat( text, is( "0123" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/IB; C2 FC.
+  @Test
+  public void testB_C1_PC_IB__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 4, 1 );
+    assertThat( text, is( "0123" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 7 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/OB; C2 AC; EE
+  @Test
+  public void testS_C1_PC_OB__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/OB; C2 AC; EE
+  @Test
+  public void testB_C1_PC_OB__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C1 XC; C2 FC.
+  @Test
+  public void testS_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/IB; C1 XC; C2 FC.
+  @Test
+  public void testB_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 7 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testS_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testB_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  @Test
+  public void testS_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream1 = replay.getContent();
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = byteRead( stream2, 4 );
+    assertThat( text, is( "0123" ) );
+
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "345" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  @Test
+  public void testB_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+    stream1 = replay.getContent();
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = blockRead( stream2, UTF8, 4, 3 );
+    assertThat( text, is( "0123" ) );
+
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "345" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+  @Test
+  public void testS_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream1 = replay.getContent();
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = byteRead( stream2, 6 );
+    assertThat( text, is( "012345" ) );
+
+    try {
+      byteRead( stream1, 1 );
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+  @Test
+  public void testB_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream1 = replay.getContent();
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = blockRead( stream2, UTF8, 6, 4 );
+    assertThat( text, is( "012345" ) );
+
+    try {
+      blockRead( stream1, UTF8, 6, 4 );
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testWriteTo() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    replay.writeTo( buffer );
+    String output = new String( buffer.toByteArray(), UTF8 );
+    assertThat( output, is( input ) );
+  }
+
+  @Test
+  public void testIsRepeatable() throws Exception {
+    String text = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic );
+    assertThat( replay.isRepeatable(), is( true ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+    BufferedHttpEntity buffered = new BufferedHttpEntity( basic );
+    replay = new PartiallyRepeatableHttpEntity( buffered );
+    assertThat( replay.isRepeatable(), is( true ) );
+  }
+
+  @Test
+  public void testIsChunked() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isChunked(), is( false ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setChunked( true );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isChunked(), is( true ) );
+  }
+
+  @Test
+  public void testGetContentLength() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentLength(), is( -1L ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentLength( input.length() );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentLength(), is( 10L ) );
+  }
+
+  @Test
+  public void testGetContentType() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentType(), nullValue() );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentType( ContentType.APPLICATION_JSON.getMimeType() );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentType().getValue(), is( "application/json" ) );
+  }
+
+  @Test
+  public void testGetContentEncoding() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentEncoding(), nullValue() );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentEncoding( "UTF-8" );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentEncoding().getValue(), is( "UTF-8" ) );
+  }
+
+  @Test
+  public void testIsStreaming() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    InputStreamEntity streaming;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isStreaming(), is( true ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( null );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isStreaming(), is( false ) );
+
+    streaming = new InputStreamEntity( new ByteArrayInputStream( input.getBytes( UTF8 ) ), 10, ContentType.TEXT_PLAIN );
+    replay = new PartiallyRepeatableHttpEntity( streaming, 5 );
+    assertThat( replay.isStreaming(), is( true ) );
+  }
+
+  @Test
+  public void testConsumeContent() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    try {
+      replay.consumeContent();
+      fail( "Expected UnsupportedOperationException" );
+    } catch ( UnsupportedOperationException e ) {
+      // Expected.
+    }
+  }
+
+  private static String byteRead( InputStream stream, int total ) throws IOException {
+    StringBuilder string = null;
+    int c = 0;
+    if( total < 0 ) {
+      total = Integer.MAX_VALUE;
+    }
+    while( total > 0 && c >= 0 ) {
+      c = stream.read();
+      if( c >= 0 ) {
+        total--;
+        if( string == null ) {
+          string = new StringBuilder();
+        }
+        string.append( (char)c );
+      }
+    }
+    return string == null ? null : string.toString();
+  }
+
+  private static String blockRead( InputStream stream, Charset charset, int total, int chunk ) throws IOException {
+    StringBuilder string = null;
+    byte buffer[] = new byte[ chunk ];
+    int count = 0;
+    if( total < 0 ) {
+      total = Integer.MAX_VALUE;
+    }
+    while( total > 0 && count >= 0 ) {
+      count = stream.read( buffer, 0, Math.min( buffer.length, total ) );
+      if( count >= 0 ) {
+        total -= count;
+        if( string == null ) {
+          string = new StringBuilder();
+        }
+        string.append( new String( buffer, 0, count, charset ) );
+      }
+    }
+    return string == null ? null : string.toString();
+  }
+
+}


Mime
View raw message