servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r405781 - in /incubator/servicemix/trunk/servicemix-eip: ./ src/main/java/org/apache/servicemix/eip/ src/main/java/org/apache/servicemix/eip/patterns/ src/main/java/org/apache/servicemix/eip/support/ src/test/java/org/apache/servicemix/eip/...
Date Wed, 10 May 2006 15:37:58 GMT
Author: gnodet
Date: Wed May 10 08:37:54 2006
New Revision: 405781

URL: http://svn.apache.org/viewcvs?rev=405781&view=rev
Log:
SM-429: add an aggregator pattern

Added:
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Modified:
    incubator/servicemix/trunk/servicemix-eip/pom.xml
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
    incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SpringConfigurationTest.java
    incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/VMEIPTests.java
    incubator/servicemix/trunk/servicemix-eip/src/test/resources/org/apache/servicemix/eip/spring.xml

Modified: incubator/servicemix/trunk/servicemix-eip/pom.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/pom.xml?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/pom.xml (original)
+++ incubator/servicemix/trunk/servicemix-eip/pom.xml Wed May 10 08:37:54 2006
@@ -41,8 +41,26 @@
     </dependency>
     <dependency>
       <groupId>${pom.groupId}</groupId>
+      <artifactId>servicemix-services</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
       <artifactId>servicemix-core</artifactId>
       <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-commonj_1.1_spec</artifactId>
+    </dependency>
+    <dependency>        
+      <groupId>org.apache.geronimo.modules</groupId>
+      <artifactId>geronimo-commonj</artifactId>
+      <version>1.2-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
(original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/EIPEndpoint.java
Wed May 10 08:37:54 2006
@@ -25,13 +25,18 @@
 import javax.jbi.messaging.MessageExchange.Role;
 import javax.jbi.servicedesc.ServiceEndpoint;
 
+import org.apache.geronimo.commonj.timers.TimerManagerImpl;
 import org.apache.servicemix.common.BaseLifeCycle;
 import org.apache.servicemix.common.Endpoint;
 import org.apache.servicemix.common.ExchangeProcessor;
+import org.apache.servicemix.locks.LockManager;
+import org.apache.servicemix.locks.impl.SimpleLockManager;
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
 import org.apache.servicemix.store.memory.MemoryStoreFactory;
 
+import commonj.timers.TimerManager;
+
 /**
  * @author gnodet
  * @version $Revision: 376451 $
@@ -50,6 +55,14 @@
      */
     protected StoreFactory storeFactory;
     /**
+     * The lock manager.
+     */
+    protected LockManager lockManager;
+    /**
+     * The timer manager.
+     */
+    protected TimerManager timerManager;
+    /**
      * The exchange factory
      */
     protected MessageExchangeFactory exchangeFactory;
@@ -90,6 +103,30 @@
     public void setStoreFactory(StoreFactory storeFactory) {
         this.storeFactory = storeFactory;
     }
+    /**
+     * @return the lockManager
+     */
+    public LockManager getLockManager() {
+        return lockManager;
+    }
+    /**
+     * @param lockManager the lockManager to set
+     */
+    public void setLockManager(LockManager lockManager) {
+        this.lockManager = lockManager;
+    }
+    /**
+     * @return the timerManager
+     */
+    public TimerManager getTimerManager() {
+        return timerManager;
+    }
+    /**
+     * @param timerManager the timerManager to set
+     */
+    public void setTimerManager(TimerManager timerManager) {
+        this.timerManager = timerManager;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.common.Endpoint#getRole()
@@ -110,10 +147,19 @@
             }
             store = storeFactory.open(getService().toString() + getEndpoint());
         }
+        if (lockManager == null) {
+            lockManager = new SimpleLockManager();
+        }
+        if (timerManager == null) {
+            timerManager = new TimerManagerImpl();
+        }
         start();
     }
 
     public void deactivate() throws Exception {
+        if (timerManager != null) {
+            timerManager.stop();
+        }
         stop();
         ServiceEndpoint ep = activated;
         activated = null;

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java?rev=405781&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
(added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/SplitAggregator.java
Wed May 10 08:37:54 2006
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+import javax.xml.transform.dom.DOMSource;
+
+import org.apache.servicemix.eip.support.AbstractAggregator;
+import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.expression.Expression;
+import org.apache.servicemix.expression.PropertyExpression;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+/**
+ * Aggregator can be used to wait and combine several messages.
+ * This component implements the  
+ * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>

+ * pattern.
+ * 
+ * This aggregator collect  messages with a count, index and correlationId properties.
+ * These properties are automatically set by splitters.
+ * A timeout may be specified so that the aggregator will not keep data forever if a message
is missing.
+ * 
+ * @author gnodet
+ * @version $Revision: 376451 $
+ * @org.apache.xbean.XBean element="split-aggregator"
+ */
+public class SplitAggregator extends AbstractAggregator {
+
+    private Expression count = new PropertyExpression(AbstractSplitter.SPLITTER_COUNT);
+    private Expression index = new PropertyExpression(AbstractSplitter.SPLITTER_INDEX);
+    private Expression corrId = new PropertyExpression(AbstractSplitter.SPLITTER_CORRID);
+    
+    private QName aggregateElementName = new QName("aggregate");
+    private QName messageElementName = new QName("message");
+    private String countAttribute = "count";
+    private String indexAttribute = "index";
+    
+    private long timeout;
+    
+    /**
+     * @return the aggregateElementName
+     */
+    public QName getAggregateElementName() {
+        return aggregateElementName;
+    }
+
+    /**
+     * @param aggregateElementName the aggregateElementName to set
+     */
+    public void setAggregateElementName(QName aggregateElementName) {
+        this.aggregateElementName = aggregateElementName;
+    }
+
+    /**
+     * @return the corrId
+     */
+    public Expression getCorrId() {
+        return corrId;
+    }
+
+    /**
+     * @param corrId the corrId to set
+     */
+    public void setCorrId(Expression corrId) {
+        this.corrId = corrId;
+    }
+
+    /**
+     * @return the count
+     */
+    public Expression getCount() {
+        return count;
+    }
+
+    /**
+     * @param count the count to set
+     */
+    public void setCount(Expression count) {
+        this.count = count;
+    }
+
+    /**
+     * @return the countAttribute
+     */
+    public String getCountAttribute() {
+        return countAttribute;
+    }
+
+    /**
+     * @param countAttribute the countAttribute to set
+     */
+    public void setCountAttribute(String countAttribute) {
+        this.countAttribute = countAttribute;
+    }
+
+    /**
+     * @return the index
+     */
+    public Expression getIndex() {
+        return index;
+    }
+
+    /**
+     * @param index the index to set
+     */
+    public void setIndex(Expression index) {
+        this.index = index;
+    }
+
+    /**
+     * @return the indexAttribute
+     */
+    public String getIndexAttribute() {
+        return indexAttribute;
+    }
+
+    /**
+     * @param indexAttribute the indexAttribute to set
+     */
+    public void setIndexAttribute(String indexAttribute) {
+        this.indexAttribute = indexAttribute;
+    }
+
+    /**
+     * @return the messageElementName
+     */
+    public QName getMessageElementName() {
+        return messageElementName;
+    }
+
+    /**
+     * @param messageElementName the messageElementName to set
+     */
+    public void setMessageElementName(QName messageElementName) {
+        this.messageElementName = messageElementName;
+    }
+
+    /**
+     * @return the timeout
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * @param timeout the timeout to set
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /*(non-Javadoc)
+     * @see org.apache.servicemix.eip.support.AggregationFactory#createAggregation(java.lang.String)
+     */
+    public Object createAggregation(String correlationID) {
+        return new SplitterAggregation(correlationID);
+    }
+
+    /*(non-Javadoc)
+     * @see org.apache.servicemix.eip.support.AggregationFactory#getCorrelationID(javax.jbi.messaging.MessageExchange,
javax.jbi.messaging.NormalizedMessage)
+     */
+    public String getCorrelationID(MessageExchange exchange, NormalizedMessage message) throws
Exception {
+        return (String) corrId.evaluate(exchange, message);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.support.Aggregation#addMessage(javax.jbi.messaging.NormalizedMessage,
javax.jbi.messaging.MessageExchange)
+     */
+    public boolean addMessage(Object aggregation, NormalizedMessage message, MessageExchange
exchange) throws Exception {
+        NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
+        // Retrieve count, index
+        Integer count = (Integer) SplitAggregator.this.count.evaluate(exchange, message);
+        if (count == null) {
+            throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_COUNT
+ " not specified on message");
+        }
+        if (messages == null) {
+            messages = new NormalizedMessage[count.intValue()];
+            ((SplitterAggregation) aggregation).messages = messages;
+        } else if (count.intValue() != messages.length) {
+            throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_COUNT
+ " is not consistent (received " + count.intValue() + ", was " + messages.length + ")");
+        }
+        Integer index = (Integer) SplitAggregator.this.index.evaluate(exchange, message);
+        if (index == null) {
+            throw new IllegalArgumentException("Property " + AbstractSplitter.SPLITTER_INDEX
+ " not specified on message");
+        }
+        if (index.intValue() < 0 || index.intValue() >= messages.length) {
+            throw new IllegalArgumentException("Index is ouf of bound: " + index + " [0.."
+ messages.length + "]");
+        }
+        if (messages[index.intValue()] != null) {
+            throw new IllegalStateException("Message with index " + index.intValue() + "
has already been received");
+        }
+        // Store message
+        messages[index.intValue()] = message;
+        // Check if all messages have been received
+        for (int i = 0; i < messages.length; i++) {
+            if (messages[i] == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.support.Aggregation#buildAggregate(javax.jbi.messaging.NormalizedMessage,
javax.jbi.messaging.MessageExchange, boolean)
+     */
+    public void buildAggregate(Object aggregation, NormalizedMessage message, MessageExchange
exchange, boolean timeout) throws Exception {
+        NormalizedMessage[] messages = ((SplitterAggregation) aggregation).messages;
+        String correlationId = ((SplitterAggregation) aggregation).correlationId;
+        SourceTransformer st = new SourceTransformer();
+        Document doc = st.createDocument();
+        Element root = createChildElement(aggregateElementName, doc);
+        root.setAttribute(countAttribute, Integer.toString(messages.length));
+        for (int i = 0; i < messages.length; i++) {
+            if (messages[i] != null) {
+                Element msg = createChildElement(messageElementName, root);
+                msg.setAttribute(indexAttribute, Integer.toString(i));
+                Node node = st.toDOMNode(messages[i]);
+                Element elem;
+                if (node instanceof Document) {
+                    elem = ((Document) node).getDocumentElement();
+                } else if (node instanceof Element) {
+                    elem = (Element) node;
+                } else {
+                    throw new UnsupportedOperationException();
+                }
+                msg.appendChild(doc.importNode(elem, true));
+            }
+        }
+        message.setContent(new DOMSource(doc));
+        message.setProperty(AbstractSplitter.SPLITTER_CORRID, correlationId);
+    }
+    
+    private Element createChildElement(QName name, Node parent) {
+        Document doc = parent instanceof Document ? (Document) parent : parent.getOwnerDocument();
+        Element elem;
+        if ("".equals(name.getNamespaceURI())) {
+            elem = doc.createElement(name.getLocalPart());   
+        } else {
+            elem = doc.createElementNS(name.getNamespaceURI(),
+                                       name.getPrefix() + ":" + name.getLocalPart());
+        }
+        parent.appendChild(elem);
+        return elem;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.eip.support.Aggregation#getTimeout()
+     */
+    public Date getTimeout(Object aggregation) {
+        if (timeout > 0) {
+            return new Date(System.currentTimeMillis() + timeout);
+        }
+        return null;
+    }
+    
+    /**
+     * 
+     * @author gnodet
+     */
+    protected static class SplitterAggregation implements Serializable {
+
+        /**
+         * Serial version UID 
+         */
+        private static final long serialVersionUID = 8555934895155403923L;
+        
+        protected NormalizedMessage[] messages;
+        protected String correlationId;
+      
+        public SplitterAggregation(String correlationId) {
+            this.correlationId = correlationId;
+        }
+        
+        /**
+         * @return the correlationId
+         */
+        public String getCorrelationId() {
+            return correlationId;
+        }
+
+        /**
+         * @param correlationId the correlationId to set
+         */
+        public void setCorrelationId(String correlationId) {
+            this.correlationId = correlationId;
+        }
+
+        /**
+         * @return the messages
+         */
+        public NormalizedMessage[] getMessages() {
+            return messages;
+        }
+
+        /**
+         * @param messages the messages to set
+         */
+        public void setMessages(NormalizedMessage[] messages) {
+            this.messages = messages;
+        }
+
+    }
+    
+}

Added: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=405781&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
(added)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Wed May 10 08:37:54 2006
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support;
+
+import java.util.Date;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+
+import commonj.timers.Timer;
+import commonj.timers.TimerListener;
+
+import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
+
+/**
+ * Aggregator can be used to wait and combine several messages.
+ * This component implements the  
+ * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>

+ * pattern.
+ * 
+ * TODO: keep list of closed aggregations for a certain time
+ * TODO: distributed lock manager
+ * TODO: persistent / transactional timer
+ * 
+ * @author gnodet
+ * @version $Revision: 376451 $
+ */
+public abstract class AbstractAggregator extends EIPEndpoint {
+
+    private static final Log log = LogFactory.getLog(AbstractAggregator.class);
+
+    private ExchangeTarget target;
+    
+    /**
+     * @return the target
+     */
+    public ExchangeTarget getTarget() {
+        return target;
+    }
+
+    /**
+     * @param target the target to set
+     */
+    public void setTarget(ExchangeTarget target) {
+        this.target = target;
+    }
+    
+    /*(non-Javadoc)
+     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     */
+    public void process(MessageExchange exchange) throws Exception {
+        try {
+            // Skip DONE
+            if (exchange.getStatus() == ExchangeStatus.DONE) {
+                return;
+            // Skip ERROR
+            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+                return;
+            // Handle an ACTIVE exchange as a PROVIDER
+            } else if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+                if (exchange instanceof InOnly == false &&
+                    exchange instanceof RobustInOnly == false) {
+                    fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly
MEP"));
+                } else {
+                    NormalizedMessage in = MessageUtil.copyIn(exchange);
+                    final String correlationId = getCorrelationID(exchange, in);
+                    if (correlationId == null || correlationId.length() == 0) {
+                        throw new IllegalArgumentException("Could not retrieve correlation
id for incoming exchange");
+                    }
+                    // Load existing aggregation
+                    Lock lock = getLockManager().getLock(correlationId);
+                    lock.lock();
+                    try {
+                        Object aggregation = store.load(correlationId);
+                        // Create a new aggregate
+                        if (aggregation == null) {
+                            aggregation = createAggregation(correlationId);
+                            Date timeout = getTimeout(aggregation);
+                            if (timeout != null) {
+                                getTimerManager().schedule(new TimerListener() {
+                                    public void timerExpired(Timer timer) {
+                                        AbstractAggregator.this.onTimeout(correlationId);
+                                    }
+                                }, timeout);
+                            }
+                        }
+                        if (addMessage(aggregation, MessageUtil.copyIn(exchange), exchange))
{
+                            sendAggregate(aggregation, false);
+                        } else {
+                            store.store(correlationId, aggregation);
+                        }
+                        done(exchange);
+                    } finally {
+                        lock.unlock();
+                    }
+                }
+            // Handle an ACTIVE exchange as a CONSUMER
+            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                done(exchange);
+            }
+        // If an error occurs, log it and report the error back to the sender
+        // if the exchange is still ACTIVE 
+        } catch (Exception e) {
+            log.error("An exception occured while processing exchange", e);
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                fail(exchange, e);
+            }
+        }
+    }
+    
+    protected void sendAggregate(Object aggregation,
+                                 boolean timeout) throws Exception {
+        InOnly me = exchangeFactory.createInOnlyExchange();
+        target.configureTarget(me, getContext());
+        NormalizedMessage nm = me.createMessage();
+        me.setInMessage(nm);
+        buildAggregate(aggregation, nm, me, timeout);
+        send(me);
+    }
+
+    protected void onTimeout(String correlationId) {
+        Lock lock = getLockManager().getLock(correlationId);
+        lock.lock();
+        try {
+            Object aggregation = store.load(correlationId);
+            sendAggregate(aggregation, true);
+        } catch (Exception e) {
+            log.info("Caught exception while processing timeout aggregation", e);
+        } finally {
+            lock.unlock();
+        }
+    }
+    
+    /**
+     * Retrieve the correlation ID of the given exchange
+     * @param exchange
+     * @param message
+     * @return the correlationID
+     * @throws Exception 
+     */
+    protected abstract String getCorrelationID(MessageExchange exchange, NormalizedMessage
message) throws Exception;
+    
+    /**
+     * Creates a new empty aggregation.
+     * @param correlationID
+     * @return a newly created aggregation
+     */
+    protected abstract Object createAggregation(String correlationID) throws Exception;
+
+    /**
+     * Returns the date when the onTimeout method should be called if the aggregation is
not completed yet,
+     * or null if the aggregation has no timeout.
+     *
+     * @param aggregate
+     * @return
+     */
+    protected abstract Date getTimeout(Object aggregate);
+
+    /**
+     * Add a newly received message to this aggregation
+     * 
+     * @param aggregate
+     * @param message
+     * @param exchange
+     * @return <code>true</code> if the aggregate id complete
+     */
+    protected abstract boolean addMessage(Object aggregate,
+                                          NormalizedMessage message, 
+                                          MessageExchange exchange) throws Exception;
+    
+    /**
+     * Fill the given JBI message with the aggregation result.
+     * 
+     * @param aggregate
+     * @param message
+     * @param exchange
+     * @param timeout <code>false</code> if the aggregation has completed or
<code>true</code> if this aggregation has timed out
+     */
+    protected abstract void buildAggregate(Object aggregate,
+                                           NormalizedMessage message, 
+                                           MessageExchange exchange,
+                                           boolean timeout) throws Exception;
+}

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
(original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java
Wed May 10 08:37:54 2006
@@ -42,6 +42,10 @@
  * @version $Revision: 376451 $
  */
 public abstract class AbstractSplitter extends EIPEndpoint {
+    
+    public static final String SPLITTER_COUNT = "org.apache.servicemix.eip.splitter.count";
+    public static final String SPLITTER_INDEX = "org.apache.servicemix.eip.splitter.index";
+    public static final String SPLITTER_CORRID = "org.apache.servicemix.eip.splitter.corrid";
 
     private static final Log log = LogFactory.getLog(AbstractSplitter.class);
 
@@ -190,6 +194,10 @@
         MessageExchange[] parts = new MessageExchange[srcParts.length];
         for (int i = 0; i < srcParts.length; i++) {
             parts[i] = createPart(exchange.getPattern(), in, srcParts[i]);
+            NormalizedMessage msg = parts[i].getMessage("in");
+            msg.setProperty(SPLITTER_COUNT, new Integer(srcParts.length));
+            msg.setProperty(SPLITTER_INDEX, new Integer(i));
+            msg.setProperty(SPLITTER_CORRID, exchange.getExchangeId());
         }
         return parts;
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java
(original)
+++ incubator/servicemix/trunk/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/MessageUtil.java
Wed May 10 08:37:54 2006
@@ -16,6 +16,7 @@
 package org.apache.servicemix.eip.support;
 
 import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -119,8 +120,10 @@
         dest.setMessage(destMsg, name);
     }
     
-    private static class NormalizedMessageImpl implements NormalizedMessage {
+    private static class NormalizedMessageImpl implements NormalizedMessage, Serializable
{
 
+        private static final long serialVersionUID = -5813947566001096708L;
+        
         private Subject subject;
         private Source content;
         private Map properties = new HashMap();
@@ -195,6 +198,8 @@
     }
     
     private static class FaultImpl extends NormalizedMessageImpl implements Fault {
+        private static final long serialVersionUID = -6076815664102825860L;
+
         public FaultImpl(Fault fault) throws Exception {
             super(fault);
         }

Added: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java?rev=405781&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
(added)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SplitAggregatorTest.java
Wed May 10 08:37:54 2006
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.activemq.util.IdGenerator;
+import org.apache.servicemix.eip.patterns.SplitAggregator;
+import org.apache.servicemix.eip.support.AbstractSplitter;
+import org.apache.servicemix.store.memory.MemoryStore;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class SplitAggregatorTest extends AbstractEIPTest {
+
+    private SplitAggregator aggregator;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        aggregator = new SplitAggregator();
+        aggregator.setTarget(createServiceExchangeTarget(new QName("target")));
+        configureAggregator();
+        activateComponent(aggregator, "aggregator");
+    }
+    
+    protected void configureAggregator() throws Exception {
+        aggregator.setStore(new MemoryStore(new IdGenerator()) {
+            public void store(String id, Object exchange) throws IOException {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                new ObjectOutputStream(baos).writeObject(exchange);
+                super.store(id, exchange);
+            }
+        });
+    }
+    
+    protected NormalizedMessage testRun(boolean[] msgs) throws Exception {
+        ReceiverComponent rec = activateReceiver("target");
+        
+        int nbMessages = 3;
+        for (int i = 0; i < 3; i++) {
+            if (msgs == null || msgs[i]) {
+                InOnly me = client.createInOnlyExchange();
+                me.setService(new QName("aggregator"));
+                me.getInMessage().setContent(createSource("<hello id='" + i + "' />"));
+                me.getInMessage().setProperty(AbstractSplitter.SPLITTER_COUNT, new Integer(nbMessages));
+                me.getInMessage().setProperty(AbstractSplitter.SPLITTER_INDEX, new Integer(i));
+                me.getInMessage().setProperty(AbstractSplitter.SPLITTER_CORRID, "corrId");
+                client.send(me);
+            }
+        }        
+        
+        rec.getMessageList().assertMessagesReceived(1);
+        return (NormalizedMessage) rec.getMessageList().flushMessages().get(0);
+    }
+    
+    public void testSimple() throws Exception {
+        testRun(null);
+    }
+    
+    public void testSimpleWithQNames() throws Exception {
+        aggregator.setAggregateElementName(new QName("uri:test", "agg", "sm"));
+        aggregator.setMessageElementName(new QName("uri:test", "msg", "sm"));
+        testRun(null);
+    }
+    
+    public void testWithTimeout() throws Exception {
+        aggregator.setTimeout(500);
+        testRun(new boolean[] { true, false, true });
+    }
+}

Modified: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SpringConfigurationTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SpringConfigurationTest.java?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SpringConfigurationTest.java
(original)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/SpringConfigurationTest.java
Wed May 10 08:37:54 2006
@@ -34,7 +34,7 @@
         as.setComponentName("client");
         ServiceMixClient client = new DefaultServiceMixClient(jbi, as);
         InOnly me = client.createInOnlyExchange();
-        me.setService(new QName("http://test", "wireTap"));
+        me.setService(new QName("http://test", "entryPoint"));
         me.getInMessage().setContent(new StringSource("<test xmlns=\"http://test\"><echo/><world/><earth/></test>"));
         client.sendSync(me);
         
@@ -42,6 +42,7 @@
         ((Receiver) getBean("trace2")).getMessageList().assertMessagesReceived(1);
         ((Receiver) getBean("trace3")).getMessageList().assertMessagesReceived(1);
         ((Receiver) getBean("trace4")).getMessageList().assertMessagesReceived(2);
+        ((Receiver) getBean("trace5")).getMessageList().assertMessagesReceived(1);
         
         // Wait for all messages to be processed
         Thread.sleep(50);

Modified: incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/VMEIPTests.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/VMEIPTests.java?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/VMEIPTests.java
(original)
+++ incubator/servicemix/trunk/servicemix-eip/src/test/java/org/apache/servicemix/eip/VMEIPTests.java
Wed May 10 08:37:54 2006
@@ -15,6 +15,7 @@
         suite.addTestSuite(WireTapTest.class);
         suite.addTestSuite(StaticRecipientListTest.class);
         suite.addTestSuite(MessageFilterTest.class);
+        suite.addTestSuite(SplitAggregatorTest.class);
         //$JUnit-END$
         return suite;
     }

Modified: incubator/servicemix/trunk/servicemix-eip/src/test/resources/org/apache/servicemix/eip/spring.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-eip/src/test/resources/org/apache/servicemix/eip/spring.xml?rev=405781&r1=405780&r2=405781&view=diff
==============================================================================
    (empty)



Mime
View raw message