servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r586570 - in /incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src: main/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/ test/java/org/apache/servicemix/bean/beans/
Date Fri, 19 Oct 2007 18:29:58 GMT
Author: gnodet
Date: Fri Oct 19 11:29:58 2007
New Revision: 586570

URL: http://svn.apache.org/viewvc?rev=586570&view=rev
Log:
SM-1110: ServiceMix is not sending a response back to the calling Service in an In-Out Message
Exchange

Added:
    incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
      - copied, changed from r579576, incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
Modified:
    incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
    incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java?rev=586570&r1=586569&r2=586570&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
(original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/main/java/org/apache/servicemix/bean/BeanEndpoint.java
Fri Oct 19 11:29:58 2007
@@ -19,6 +19,8 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.logging.Logger;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 
@@ -30,10 +32,19 @@
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
-import javax.jbi.messaging.MessageExchange.Role;
 import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.jbi.JBIException;
+import javax.jbi.management.MBeanNames;
+import javax.xml.namespace.QName;
+import javax.management.MBeanServer;
+import javax.naming.InitialContext;
+
+import org.w3c.dom.DocumentFragment;
+import org.w3c.dom.Document;
 
 import org.aopalliance.intercept.MethodInvocation;
 import org.apache.commons.jexl.Expression;
@@ -55,8 +66,8 @@
 import org.apache.servicemix.jbi.resolver.URIResolver;
 import org.apache.servicemix.jbi.util.MessageUtil;
 import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContextAware;
 import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 
 /**
  * Represents a bean endpoint which consists of a together with a {@link MethodInvocationStrategy}
@@ -75,7 +86,7 @@
     private String beanClassName;
     private MethodInvocationStrategy methodInvocationStrategy;
     private org.apache.servicemix.expression.Expression correlationExpression;
-    
+
     private Map<String, Holder> exchanges = new ConcurrentHashMap<String, Holder>();
     private Map<Object, Request> requests = new ConcurrentHashMap<Object, Request>();
     private ThreadLocal<Request> currentRequest = new ThreadLocal<Request>();
@@ -87,7 +98,7 @@
 
     public BeanEndpoint(BeanComponent component, ServiceEndpoint serviceEndpoint) {
         super(component, serviceEndpoint);
-        setApplicationContext(component.getApplicationContext());
+        this.applicationContext = component.getApplicationContext();
     }
 
     public void start() throws Exception {
@@ -201,7 +212,7 @@
             throw new IllegalStateException("Unknown role: " + exchange.getRole());
         }
     }
-    
+
     protected void onProviderExchange(MessageExchange exchange) throws Exception {
         Object corId = getCorrelation(exchange);
         Request req = requests.get(corId);
@@ -225,16 +236,16 @@
                 // Exchange is finished
                 if (exchange.getStatus() == ExchangeStatus.DONE) {
                     return;
-                }
                 // Exchange has been aborted with an exception
-                else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                     return;
-                    // Fault message
+                // Fault message
                 } else if (exchange.getFault() != null) {
                     // TODO: find a way to send it back to the bean before setting the DONE
status
                     done(exchange);
                 } else {
-                    MethodInvocation invocation = getMethodInvocationStrategy().createInvocation(req.getBean(),
getBeanInfo(), exchange, this);
+                    MethodInvocation invocation = getMethodInvocationStrategy().createInvocation(
+                            req.getBean(), getBeanInfo(), exchange, this);
                     if (invocation == null) {
                         throw new UnknownMessageExchangeTypeException(exchange, this);
                     }
@@ -258,7 +269,7 @@
             currentRequest.set(null);
         }
     }
-    
+
     protected void onConsumerExchange(MessageExchange exchange) throws Exception {
         Object corId = exchange.getExchangeId();
         Request req = requests.remove(corId);
@@ -281,7 +292,7 @@
         checkEndOfRequest(req, corId);
         currentRequest.set(null);
     }
-    
+
     protected Object getCorrelation(MessageExchange exchange) throws MessagingException {
         return getCorrelationExpression().evaluate(exchange, exchange.getMessage("in"));
     }
@@ -315,30 +326,32 @@
     /**
      * A strategy method to allow implementations to perform some custom JBI based injection
of the POJO
      *
-     * @param bean the bean to be injected
+     * @param pojo the bean to be injected
      */
-    protected void injectBean(final Object bean) {
+    protected void injectBean(final Object pojo) {
+        final PojoContext ctx = new PojoContext();
+        final DeliveryChannel ch = ctx.channel;
         // Inject fields
-        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback()
{
+        ReflectionUtils.doWithFields(pojo.getClass(), new ReflectionUtils.FieldCallback()
{
             public void doWith(Field f) throws IllegalArgumentException, IllegalAccessException
{
                 ExchangeTarget et = f.getAnnotation(ExchangeTarget.class);
                 if (et != null) {
-                    ReflectionUtils.setField(f, bean, new DestinationImpl(et.uri(), BeanEndpoint.this));
+                    ReflectionUtils.setField(f, pojo, new DestinationImpl(et.uri(), BeanEndpoint.this));
                 }
                 if (f.getAnnotation(Resource.class) != null) {
                     if (ComponentContext.class.isAssignableFrom(f.getType())) {
-                        ReflectionUtils.setField(f, bean, context);
+                        ReflectionUtils.setField(f, pojo, ctx);
                     } else if (DeliveryChannel.class.isAssignableFrom(f.getType())) {
-                        ReflectionUtils.setField(f, bean, channel);
+                        ReflectionUtils.setField(f, pojo, ch);
                     }
                 }
             }
         });
     }
-    
+
     protected void evaluateCallbacks(final Request req) {
-        final Object bean = req.getBean();
-        ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback()
{
+        final Object pojo = req.getBean();
+        ReflectionUtils.doWithMethods(pojo.getClass(), new ReflectionUtils.MethodCallback()
{
             @SuppressWarnings("unchecked")
             public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException
{
                 if (method.getAnnotation(Callback.class) != null) {
@@ -347,14 +360,15 @@
                         JexlContext jc = JexlHelper.createContext();
                         jc.getVars().put("this", bean);
                         Object r = e.evaluate(jc);
-                        if (r instanceof Boolean == false) {
+                        if (!(r instanceof Boolean)) {
                             throw new RuntimeException("Expression did not returned a boolean
value but: " + r);
                         }
                         Boolean oldVal = req.getCallbacks().get(method);
                         Boolean newVal = (Boolean) r;
-                        if ((oldVal == null || oldVal == false) && newVal == true)
{
+                        if ((oldVal == null || !oldVal) && newVal) {
                             req.getCallbacks().put(method, newVal);
-                            Object o = method.invoke(bean, new Object[0]);
+                            Object o = method.invoke(pojo, new Object[0]);
+                            o.toString();
                             // TODO: handle return value and sent it as the answer
                         }
                     } catch (Exception e) {
@@ -364,7 +378,7 @@
             }
         });
     }
-    
+
     /**
      * Used by POJOs acting as a consumer
      * @param uri
@@ -376,7 +390,7 @@
             InOut me = getExchangeFactory().createInOutExchange();
             URIResolver.configureExchange(me, getServiceUnit().getComponent().getComponentContext(),
uri);
             MessageUtil.transferTo(message, me, "in");
-            final Holder h = new Holder(); 
+            final Holder h = new Holder();
             requests.put(me.getExchangeId(), currentRequest.get());
             exchanges.put(me.getExchangeId(), h);
             BeanEndpoint.this.send(me);
@@ -385,7 +399,7 @@
             throw new RuntimeException(e);
         }
     }
-    
+
     protected void checkEndOfRequest(Request request, Object corId) {
         if (request.getExchange().getStatus() != ExchangeStatus.ACTIVE) {
             ReflectionUtils.callLifecycleMethod(request.getBean(), PreDestroy.class);
@@ -425,5 +439,138 @@
      */
     public void setCorrelationExpression(org.apache.servicemix.expression.Expression correlationExpression)
{
         this.correlationExpression = correlationExpression;
+    }
+
+    protected class PojoContext implements ComponentContext {
+
+        private DeliveryChannel channel = new PojoChannel();
+
+        public ServiceEndpoint activateEndpoint(QName qName, String s) throws JBIException
{
+            return context.activateEndpoint(qName, s);
+        }
+
+        public void deactivateEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException
{
+            context.deactivateEndpoint(serviceEndpoint);
+        }
+
+        public void registerExternalEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException
{
+            context.registerExternalEndpoint(serviceEndpoint);
+        }
+
+        public void deregisterExternalEndpoint(ServiceEndpoint serviceEndpoint) throws JBIException
{
+            context.deregisterExternalEndpoint(serviceEndpoint);
+        }
+
+        public ServiceEndpoint resolveEndpointReference(DocumentFragment documentFragment)
{
+            return context.resolveEndpointReference(documentFragment);
+        }
+
+        public String getComponentName() {
+            return context.getComponentName();
+        }
+
+        public DeliveryChannel getDeliveryChannel() throws MessagingException {
+            return channel;
+        }
+
+        public ServiceEndpoint getEndpoint(QName qName, String s) {
+            return context.getEndpoint(qName, s);
+        }
+
+        public Document getEndpointDescriptor(ServiceEndpoint serviceEndpoint) throws JBIException
{
+            return context.getEndpointDescriptor(serviceEndpoint);
+        }
+
+        public ServiceEndpoint[] getEndpoints(QName qName) {
+            return context.getEndpoints(qName);
+        }
+
+        public ServiceEndpoint[] getEndpointsForService(QName qName) {
+            return context.getEndpointsForService(qName);
+        }
+
+        public ServiceEndpoint[] getExternalEndpoints(QName qName) {
+            return context.getExternalEndpoints(qName);
+        }
+
+        public ServiceEndpoint[] getExternalEndpointsForService(QName qName) {
+            return context.getExternalEndpointsForService(qName);
+        }
+
+        public String getInstallRoot() {
+            return context.getInstallRoot();
+        }
+
+        public Logger getLogger(String s, String s1) throws MissingResourceException, JBIException
{
+            return context.getLogger(s, s1);
+        }
+
+        public MBeanNames getMBeanNames() {
+            return context.getMBeanNames();
+        }
+
+        public MBeanServer getMBeanServer() {
+            return context.getMBeanServer();
+        }
+
+        public InitialContext getNamingContext() {
+            return context.getNamingContext();
+        }
+
+        public Object getTransactionManager() {
+            return context.getTransactionManager();
+        }
+
+        public String getWorkspaceRoot() {
+            return context.getWorkspaceRoot();
+        }
+    }
+
+    protected class PojoChannel implements DeliveryChannel {
+
+        public void close() throws MessagingException {
+            BeanEndpoint.this.channel.close();
+        }
+
+        public MessageExchangeFactory createExchangeFactory() {
+            return BeanEndpoint.this.channel.createExchangeFactory();
+        }
+
+        public MessageExchangeFactory createExchangeFactory(QName qName) {
+            return BeanEndpoint.this.channel.createExchangeFactory(qName);
+        }
+
+        public MessageExchangeFactory createExchangeFactoryForService(QName qName) {
+            return BeanEndpoint.this.channel.createExchangeFactoryForService(qName);
+        }
+
+        public MessageExchangeFactory createExchangeFactory(ServiceEndpoint serviceEndpoint)
{
+            return BeanEndpoint.this.channel.createExchangeFactory(serviceEndpoint);
+        }
+
+        public MessageExchange accept() throws MessagingException {
+            return BeanEndpoint.this.channel.accept();
+        }
+
+        public MessageExchange accept(long l) throws MessagingException {
+            return BeanEndpoint.this.channel.accept(l);
+        }
+
+        public void send(MessageExchange messageExchange) throws MessagingException {
+            if (messageExchange.getRole() == MessageExchange.Role.CONSUMER
+                    && messageExchange.getStatus() == ExchangeStatus.ACTIVE) {
+                requests.put(messageExchange.getExchangeId(), currentRequest.get());
+            }
+            BeanEndpoint.this.channel.send(messageExchange);
+        }
+
+        public boolean sendSync(MessageExchange messageExchange) throws MessagingException
{
+            return BeanEndpoint.this.channel.sendSync(messageExchange);
+        }
+
+        public boolean sendSync(MessageExchange messageExchange, long l) throws MessagingException
{
+            return BeanEndpoint.this.channel.sendSync(messageExchange, l);
+        }
+
     }
 }

Copied: incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
(from r579576, incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java)
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java?p2=incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java&p1=incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java&r1=579576&r2=586570&rev=586570&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerBeanTest.java
(original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/ConsumerListenerTest.java
Fri Oct 19 11:29:58 2007
@@ -24,7 +24,7 @@
 
 import junit.framework.TestCase;
 
-import org.apache.servicemix.bean.beans.ConsumerBean;
+import org.apache.servicemix.bean.beans.ConsumerListener;
 import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.client.ServiceMixClient;
 import org.apache.servicemix.components.util.EchoComponent;
@@ -32,30 +32,27 @@
 import org.apache.servicemix.jbi.jaxp.SourceTransformer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
 
-public class ConsumerBeanTest extends TestCase {
+public class ConsumerListenerTest extends TestCase {
 
     protected JBIContainer jbi;
-    
+
     protected void setUp() throws Exception {
         jbi = new JBIContainer();
         jbi.setEmbedded(true);
         jbi.init();
     }
-    
+
     public void test() throws Exception {
         BeanComponent bc = new BeanComponent();
         BeanEndpoint ep = new BeanEndpoint();
         ep.setService(new QName("bean"));
         ep.setEndpoint("endpoint");
-        ep.setBeanType(ConsumerBean.class);
+        ep.setBeanType(ConsumerListener.class);
         bc.setEndpoints(new BeanEndpoint[] { ep });
         jbi.activateComponent(bc, "servicemix-bean");
-        
-        EchoComponent echo1 = new EchoComponent(new QName("urn", "service1"), "endpoint");
-        jbi.activateComponent(echo1, "echo1");
-        
-        EchoComponent echo2 = new EchoComponent(new QName("urn", "service2"), "endpoint");
-        jbi.activateComponent(echo2, "echo2");
+
+        EchoComponent echo = new EchoComponent(new QName("echo"), "endpoint");
+        jbi.activateComponent(echo, "echo");
 
         jbi.start();
 
@@ -69,7 +66,7 @@
         assertExchangeWorked(me);
         client.done(me);
     }
-    
+
     protected void assertExchangeWorked(MessageExchange me) throws Exception {
         if (me.getStatus() == ExchangeStatus.ERROR) {
             if (me.getError() != null) {
@@ -84,4 +81,4 @@
         }
     }
 
-}
+}
\ No newline at end of file

Modified: incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java?rev=586570&r1=586569&r2=586570&view=diff
==============================================================================
--- incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
(original)
+++ incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-bean/src/test/java/org/apache/servicemix/bean/beans/ConsumerListener.java
Fri Oct 19 11:29:58 2007
@@ -16,15 +16,17 @@
  */
 package org.apache.servicemix.bean.beans;
 
-import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.jbi.util.MessageUtil;
-
 import javax.annotation.Resource;
 import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOut;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.jbi.util.MessageUtil;
 
 public class ConsumerListener implements MessageExchangeListener {
 
@@ -32,16 +34,28 @@
     private DeliveryChannel channel;
 
     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-        MessageExchangeFactory factory = channel.createExchangeFactory();
-        InOut io = factory.createInOutExchange();
-        try {
-            MessageUtil.transferInToIn(exchange, io);
-        }
-        catch (MessagingException e) {
-            throw e;
-        }
-        catch (Exception e) {
-            throw new MessagingException(e);
+        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                MessageExchange io = (MessageExchange) exchange.getProperty("exchange");
+                MessageUtil.transferOutToOut(exchange, io);
+                io.setProperty("exchange", exchange);
+                channel.send(io);
+            } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+                MessageExchange io = (MessageExchange) exchange.getProperty("exchange");
+                io.setStatus(ExchangeStatus.DONE);
+                channel.send(io);
+            }
+        } else {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                MessageExchangeFactory factory = channel.createExchangeFactory();
+                InOut io = factory.createInOutExchange();
+                MessageUtil.transferInToIn(exchange, io);
+                io.setService(new QName("echo"));
+                io.setProperty("exchange", exchange);
+                channel.send(io);
+            } else if (exchange.getStatus() == ExchangeStatus.DONE) {
+                // Do nothing
+            }
         }
     }
 



Mime
View raw message