servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r1132898 - in /servicemix/components/trunk/bindings/servicemix-http/src: main/java/org/apache/servicemix/http/endpoints/ main/java/org/apache/servicemix/http/jetty/ test/java/org/apache/servicemix/http/jetty/
Date Tue, 07 Jun 2011 07:59:22 GMT
Author: gertv
Date: Tue Jun  7 07:59:21 2011
New Revision: 1132898

URL: http://svn.apache.org/viewvc?rev=1132898&view=rev
Log:
SMXCOMP-880: Refactor servicemix-http component to avoid use of wrapper for locking

Added:
    servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
    servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
Modified:
    servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

Modified: servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=1132898&r1=1132897&r2=1132898&view=diff
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
(original)
+++ servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Tue Jun  7 07:59:21 2011
@@ -55,6 +55,8 @@ import org.mortbay.util.ajax.Continuatio
 import org.mortbay.util.ajax.ContinuationSupport;
 import org.mortbay.util.ajax.WaitingContinuation;
 
+import static org.apache.servicemix.http.jetty.ContinuationHelper.isNewContinuation;
+
 /**
  * Plain HTTP consumer endpoint. This endpoint can be used to handle plain HTTP request (without
SOAP) or to be able to
  * process the request in a non standard way. For HTTP requests, a WSDL2 HTTP binding can
be used.
@@ -73,7 +75,8 @@ public class HttpConsumerEndpoint extend
     private long timeout; // 0 => default to the timeout configured on component
     private URI defaultMep = JbiConstants.IN_OUT;
     private Map<String, Object> resources = new HashMap<String, Object>();
-    private Map<String, Continuation> locks = new ConcurrentHashMap<String, Continuation>();
+    private Map<String, Continuation> continuations = new ConcurrentHashMap<String,
Continuation>();
+    private Map<String, Object> mutexes = new ConcurrentHashMap<String, Object>();
     private Object httpContext;
     private boolean started = false;
 
@@ -224,139 +227,200 @@ public class HttpConsumerEndpoint extend
         super.stop();
     }
 
+    /*
+     * Process the reponse message exchange
+     */
     public void process(MessageExchange exchange) throws Exception {
-        // Receive the exchange response
-        // First, check if the continuation has not been removed from the map,
-        // which would mean it has timed out.  If this is the case, throw an exception
-        // that will set the exchange status to ERROR.
-        Continuation cont = locks.get(exchange.getExchangeId());
-        if (cont == null) {
-            throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
-        }
-        synchronized (cont) {
-            logger.debug("Resuming continuation for exchange: {}", exchange.getExchangeId());
-            // In case of the SEDA flow isn't used, the exchange could be a different instance,
so it should be updated.
-            cont.setObject(exchange);
-            // Resume continuation
-            cont.resume();
-            if (!cont.isResumed()) {
-                logger.debug("Could not resume continuation for exchange: {}", exchange.getExchangeId());
-                throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
+        final String id = exchange.getExchangeId();
+        final Object mutex = mutexes.get(id);
+
+        // if the mutex is no longer available, the HTTP request timed out before the message
exchange got handled by the ESB
+        if (mutex == null) {
+            handleLateResponse(exchange);
+            return;
+        }
+
+        // Synchronize on the mutex object while we're tinkering with the continuation object
+        synchronized (mutex) {
+            final Continuation continuation = continuations.get(id);
+            if (continuation != null && continuation.isPending()) {
+                logger.debug("Resuming continuation for exchange: {}", exchange.getExchangeId());
+
+                // in case of the JMS/JCA flow, you might have a different instance of the
message exchange here
+                continuation.setObject(exchange);
+
+                continuation.resume();
+
+                // if the continuation could no longer be resumed, the HTTP request might
have timed out before the message
+                // exchange got handled by the ESB
+                if (!continuation.isResumed()) {
+                    handleLateResponse(exchange);
+                }
+            } else {
+                // it the continuation is no longer available or no longer pending, the HTTP
request has time out before
+                // the message exchange got handled by the ESB
+                handleLateResponse(exchange);
             }
         }
     }
 
+    /*
+     * Process the HTTP request/response - this method gets invoked:
+     * - when a new HTTP request is received
+     * - when a suspended HTTP request is being resumed
+     *   (either because the exchange was received or because the request timed out)
+     */
     public void process(HttpServletRequest request, HttpServletResponse response) throws
Exception {
-        logger.debug("Receiving HTTP request: {}", request);
+
         MessageExchange exchange = null;
+
         try {
             // Handle WSDLs, XSDs
             if (handleStaticResource(request, response)) {
                 return;
             }
-            Continuation cont = createContinuation(request);
 
+            // configure the timeout
             long to = this.timeout;
             if (to == 0) {
                 to = ((HttpComponent) getServiceUnit().getComponent()).getConfiguration().getConsumerProcessorSuspendTime();
             }
 
-            if (!cont.isPending()) {
-                // Check endpoint is started
-                if (!started) {
-                    response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint
is stopped");
-                    return;
-                }
-                // Create the exchange
-                exchange = createExchange(request);
-                // Put the exchange into the continuation for retrieval later.
-                cont.setObject(exchange);
-                // Put the continuation in a map under the exchange id key
-                locks.put(exchange.getExchangeId(), cont);
-                logger.debug("Suspending continuation for exchange: {}", exchange.getExchangeId());
-                synchronized (cont) {
-                    // Send the exchange and then suspend the request.
-                    send(exchange);
-                    // Suspend the continuation for the configured timeout
-                    // If a SelectConnector is used, the call to suspend
-                    // will throw a RetryRequest exception
-                    // else, the call will block until the continuation is
-                    // resumed
-                    boolean istimeout = !cont.suspend(to);
-                    // The call has not thrown a RetryRequest, which means
-                    // we don't use a SelectConnector
-                    // and we must handle the exchange in this very method
-                    // call.
-                    // If result is false, the continuation has timed out.
-                    locks.remove(exchange.getExchangeId());
-
-                    // Timeout if SelectConnector is not used
-                    if (istimeout) {
-                        throw new Exception("HTTP request has timed out for exchange: " +
exchange.getExchangeId());
+            final Continuation continuation = ContinuationSupport.getContinuation(request,
null);
+            exchange = (MessageExchange) continuation.getObject();
+            final Object mutex = getOrCreateMutex(exchange);
+
+            // Synchronize on the mutex object while we're tinkering with the continuation
object
+            synchronized (mutex) {
+                if (isNewContinuation(continuation)) {
+                    logger.debug("Receiving HTTP request: {}", request);
+
+                    // send back HTTP status 503 (Not Avaialble) to reject any new requests
if the endpoint is not started
+                    if (!started) {
+                        response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Endpoint
is stopped");
+                        return;
                     }
+
+                    // Create the exchange
+                    exchange = createExchange(request);
+                    final String id = exchange.getExchangeId();
+
+                    // Put the exchange into the continuation for later retrieval and store
the mutex/continuation objects
+                    continuation.setObject(exchange);
+                    mutexes.put(id, mutex);
+                    continuations.put(id, continuation);
+
+                    // Send the exchange and then suspend the HTTP request until the message
exchange gets answered
+                    send(exchange);
+
+                    // Right after this if-block, we will try suspending the continuation
+                    // If a SelectConnector is being used, the call to suspend will throw
a RetryRequest
+                    // This will free the thread - this method will be invoked again when
the continuation gets resumed
+                    logger.debug("Suspending continuation for exchange: {}", exchange.getExchangeId());
+                } else {
+                    logger.debug("Resuming HTTP request: {}", request);
                 }
-            } else {
-                // The continuation is a retry.
-                // This happens when the SelectConnector is used and in two cases:
-                //  * the continuation has been resumed because the exchange has been received
-                //  * the continuation has timed out
-                boolean istimeout = !cont.suspend(to);
-                exchange = (MessageExchange) cont.getObject();
-                // Remove the continuation from the map, indicating it has been processed
or timed out
-                locks.remove(exchange.getExchangeId());
 
-                // Timeout
+                boolean istimeout = !continuation.suspend(to);
+
+                // Continuation is being ended (either because the message exchange was handled
or the continuation timed out)
+                // Cleaning up the stored objects for this continuation now
+                exchange = doEndContinuation(continuation);
+
+                // Timeout if SelectConnector is not used
                 if (istimeout) {
                     throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
                 }
             }
-            // At this point, we have received the exchange response,
-            // so process it and send back the HTTP response
-            if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                Exception e = exchange.getError();
-                if (e == null) {
-                    e = new Exception("Unkown error (exchange aborted ?)");
-                }
-                throw e;
-            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                try {
-                    Fault fault = exchange.getFault();
-                    if (fault != null) {
-                        sendFault(exchange, fault, request, response);
-                    } else {
-                        NormalizedMessage outMsg = exchange.getMessage("out");
-                        if (outMsg != null) {
-                            sendOut(exchange, outMsg, request, response);
-                        }
-                    }
-                    done(exchange);
-                } catch (Exception e) {
-                    fail(exchange, e);
-                    throw e;
-                }
-            } else if (exchange.getStatus() == ExchangeStatus.DONE) {
-                // This happens when there is no response to send back
-                sendAccepted(exchange, request, response);
-            }
+
+            // message exchange has been completed, so we're ready to send back an HTTP response
now
+            handleResponse(exchange, request, response);
         } catch (RetryRequest e) {
+            // retrow the RetryRequest to allow Jetty to re-invoke this method when the continuation
is being resumed
             throw e;
         } catch (Exception e) {
             sendError(exchange, e, request, response);
         }
     }
 
-    private Continuation createContinuation(HttpServletRequest request) {
-        // not giving a specific mutex will synchronize on the continuation itself
-        Continuation continuation = ContinuationSupport.getContinuation(request, null);
-        if (continuation instanceof WaitingContinuation) {
-            return continuation;
-        } else {
-            // wrap the continuation to avoid a deadlock between this endpoint and the Jetty
continuation timeout mechanism
-            // the endpoint now synchronizes on the wrapper while Jetty synchronizes on the
continuation itself
-            return new ContinuationWrapper(continuation);
+    /*
+     * Handle the HTTP response based on the information in the message exchange we received
+     */
+    private void handleResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse
response) throws Exception {
+        // At this point, we have received the exchange response,
+        // so process it and send back the HTTP response
+        if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            Exception e = exchange.getError();
+            if (e == null) {
+                e = new Exception("Unkown error (exchange aborted ?)");
+            }
+            throw e;
+        } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            try {
+                Fault fault = exchange.getFault();
+                if (fault != null) {
+                    sendFault(exchange, fault, request, response);
+                } else {
+                    NormalizedMessage outMsg = exchange.getMessage("out");
+                    if (outMsg != null) {
+                        sendOut(exchange, outMsg, request, response);
+                    }
+                }
+                done(exchange);
+            } catch (Exception e) {
+                fail(exchange, e);
+                throw e;
+            }
+        } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+            // This happens when there is no response to send back
+            sendAccepted(exchange, request, response);
         }
     }
 
+    /*
+     * Handle a message exchange that is being received after the corresponding HTTP request
has timed out
+     */
+    private void handleLateResponse(MessageExchange exchange) throws Exception {
+        throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
+
+        // TODO: allow multiple options for handling late response from the ESB
+        // - by throwing an exception to make the exchange end in error
+        // - by logging a warning (make sure MEP gets handled appropriately here!)
+    }
+
+    /*
+     * Get or create an object that can be used for synchronizing code blocks for a given
exchange
+     */
+    private Object getOrCreateMutex(MessageExchange exchange) {
+        Object result = null;
+
+        // let's try to find the object that corresponds to the exchange first
+        if (exchange != null) {
+            result = mutexes.get(exchange.getExchangeId());
+        }
+
+        // no luck finding an existing object, let's create a new one
+        if (result == null) {
+            result = new Object();
+        }
+
+        return result;
+    }
+
+    /*
+     * End the continuation by removing all objects stored on/for the continuation
+     * and returning the MessageExchange that was represented by the continuation
+     */
+    private MessageExchange doEndContinuation(Continuation continuation) {
+        final MessageExchange exchange = (MessageExchange) continuation.getObject();
+        final String id = exchange.getExchangeId();
+
+        continuation.setObject(null);
+        mutexes.remove(id);
+        continuations.remove(id);
+        return exchange;
+    }
+
     protected void loadStaticResources() throws Exception {
     }
 
@@ -472,49 +536,4 @@ public class HttpConsumerEndpoint extend
             ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
         }
     }
-
-    /*
-    * Continuation wrapper just delegates everything to the underlying Continuation
-    */
-    private static final class ContinuationWrapper implements Continuation {
-
-        private final Continuation continuation;
-
-        private ContinuationWrapper(Continuation continuation) {
-            super();
-            this.continuation = continuation;
-        }
-
-        public Object getObject() {
-            return continuation.getObject();
-        }
-
-        public boolean isNew() {
-            return continuation.isNew();
-        }
-
-        public boolean isPending() {
-            return continuation.isPending();
-        }
-
-        public boolean isResumed() {
-            return continuation.isResumed();
-        }
-
-        public void reset() {
-            continuation.reset();
-        }
-
-        public void resume() {
-            continuation.resume();
-        }
-
-        public void setObject(Object o) {
-            continuation.setObject(o);
-        }
-
-        public boolean suspend(long timeout) {
-            return continuation.suspend(timeout);
-        }
-    }
 }

Added: servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java?rev=1132898&view=auto
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
(added)
+++ servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/jetty/ContinuationHelper.java
Tue Jun  7 07:59:21 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http.jetty;
+
+import org.mortbay.util.ajax.Continuation;
+
+/**
+ * A few helper methods for working with Jetty continuations
+ */
+public class ContinuationHelper {
+
+    private ContinuationHelper() {
+        // static helper methods only - no need for a public constructor
+    }
+
+    /**
+     * Is this a new continuation object?
+     *
+     * @param continuation
+     * @return <code>true</code> for a new continuation object
+     */
+    public static boolean isNewContinuation(Continuation continuation) {
+        return continuation.isNew() ||
+               (!continuation.isPending() && !continuation.isResumed());
+    }
+}

Added: servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java?rev=1132898&view=auto
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
(added)
+++ servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/jetty/ContinuationHelperTest.java
Tue Jun  7 07:59:21 2011
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http.jetty;
+
+import junit.framework.TestCase;
+import org.mortbay.util.ajax.Continuation;
+
+import static org.apache.servicemix.http.jetty.ContinuationHelper.isNewContinuation;
+
+/**
+ * Test case for {@link ContinuationHelper}
+ */
+public class ContinuationHelperTest extends TestCase {
+
+
+    public void testIsNewContinuation() {
+        assertTrue(isNewContinuation(new MockContinuation(true, null, null)));
+        assertTrue(isNewContinuation(new MockContinuation(false, false, false)));
+
+        assertFalse(isNewContinuation(new MockContinuation(false, true, false)));
+        assertFalse(isNewContinuation(new MockContinuation(false, false, true)));
+    }
+
+    protected static final class MockContinuation implements Continuation {
+
+        private boolean isNew;
+        private Boolean pending;
+        private Boolean resumed;
+
+        protected MockContinuation(boolean isNew, Boolean pending, Boolean resumed) {
+            super();
+            this.isNew = isNew;
+            this.pending = pending;
+            this.resumed = resumed;
+        }
+
+        public boolean suspend(long l) {
+            throw new UnsupportedOperationException("Not yet supported");
+        }
+
+        public void resume() {
+            throw new UnsupportedOperationException("Not yet supported");
+        }
+
+        public void reset() {
+            throw new UnsupportedOperationException("Not yet supported");
+        }
+
+        public boolean isNew() {
+            return isNew;
+        }
+
+        public boolean isPending() {
+            return pending;
+        }
+
+        public boolean isResumed() {
+            return resumed;
+        }
+
+        public Object getObject() {
+            throw new UnsupportedOperationException("Not yet supported");
+        }
+
+        public void setObject(Object o) {
+            throw new UnsupportedOperationException("Not yet supported");
+        }
+    }
+}



Mime
View raw message