servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r690960 - in /servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools: DroolsEndpoint.java DroolsExecutionContext.java model/JbiHelper.java
Date Mon, 01 Sep 2008 12:43:18 GMT
Author: gertv
Date: Mon Sep  1 05:43:16 2008
New Revision: 690960

URL: http://svn.apache.org/viewvc?rev=690960&view=rev
Log:
SM-1524: Fix memory leak in servicemix-drools

Added:
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsExecutionContext.java
Modified:
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
    servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java?rev=690960&r1=690959&r2=690960&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
(original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsEndpoint.java
Mon Sep  1 05:43:16 2008
@@ -27,6 +27,7 @@
 import javax.jbi.JBIException;
 import javax.jbi.management.DeploymentException;
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
@@ -39,10 +40,7 @@
 import org.apache.servicemix.common.ServiceUnit;
 import org.apache.servicemix.common.endpoints.ProviderEndpoint;
 import org.apache.servicemix.common.util.MessageUtil;
-import org.apache.servicemix.drools.model.JbiHelper;
 import org.drools.RuleBase;
-import org.drools.WorkingMemory;
-import org.drools.StatefulSession;
 import org.drools.compiler.RuleBaseLoader;
 import org.springframework.core.io.Resource;
 
@@ -51,6 +49,7 @@
  * @author gnodet
  * @org.apache.xbean.XBean element="endpoint"
  */
+
 public class DroolsEndpoint extends ProviderEndpoint {
 
     private RuleBase ruleBase;
@@ -61,7 +60,19 @@
     private String defaultTargetURI;
     private Map<String, Object> globals;
     private List<Object> assertedObjects;
-    private ConcurrentMap<String, JbiHelper> pending = new ConcurrentHashMap<String,
JbiHelper>();
+    
+    @SuppressWarnings("serial")
+    private ConcurrentMap<String, DroolsExecutionContext> pending = new ConcurrentHashMap<String,
DroolsExecutionContext>() {
+        public DroolsExecutionContext remove(Object key) {
+            DroolsExecutionContext context = super.remove(key);
+            if (context != null) {
+              // stop the execution context -- updating and disposing of any working memory
+              context.update();
+              context.stop();
+            }
+            return context;
+        };
+    };
 
     public DroolsEndpoint() {
         super();
@@ -194,9 +205,9 @@
      */
     private void handleConsumerExchange(MessageExchange exchange) throws MessagingException
{
         String correlation = (String) exchange.getProperty(DroolsComponent.DROOLS_CORRELATION_ID);

-        JbiHelper helper = pending.get(correlation);
-        if (helper != null) {
-            MessageExchange original = helper.getExchange().getInternalExchange();
+        DroolsExecutionContext drools = pending.get(correlation);
+        if (drools != null) {
+            MessageExchange original = drools.getExchange();
             if (exchange.getStatus() == ExchangeStatus.DONE) {
                 done(original);
             } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -209,8 +220,6 @@
                 }
                 send(original);
             }
-            // update the rule engine's working memory to trigger post-done rules
-            helper.update();
         } else {
             logger.debug("No pending exchange found for " + correlation + ", no additional
rules will be triggered");
         }
@@ -219,6 +228,9 @@
     private void handleProviderExchange(MessageExchange exchange) throws Exception {
         if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
             drools(exchange);
+        } else {
+            //must be a DONE/ERROR so removing any pending contexts
+            pending.remove(exchange.getExchangeId());
         }
     }
 
@@ -232,44 +244,25 @@
     }
 
     protected void drools(MessageExchange exchange) throws Exception {
-        StatefulSession memory = createWorkingMemory(exchange);
-        try {
-            JbiHelper helper = populateWorkingMemory(memory, exchange);
-            pending.put(exchange.getExchangeId(), helper);
-            memory.fireAllRules();
-            //no rules were fired --> must be config problem
-            if (helper.getRulesFired() < 1) {
-                fail(exchange, new Exception("No rules have handled the exchange. Check your
rule base."));
-            } else {
-                //a rule was triggered and the message has been answered or faulted by the
drools endpoint
-                if (helper.isExchangeHandled()) {
-                    pending.remove(exchange);
-                }
+        DroolsExecutionContext drools = startDroolsExecutionContext(exchange);
+        if (drools.getRulesFired() < 1) {
+            fail(exchange, new Exception("No rules have handled the exchange. Check your
rule base."));
+        } else {
+            //the exchange has been answered or faulted by the drools endpoint
+            if (drools.isExchangeHandled() && exchange instanceof InOnly) {
+                //only removing InOnly
+                pending.remove(exchange.getExchangeId());
             }
-        } finally {
-            memory.dispose();
         }
     }
     
-    protected StatefulSession createWorkingMemory(MessageExchange exchange) throws Exception
{
-        return ruleBase.newStatefulSession();
+    private DroolsExecutionContext startDroolsExecutionContext(MessageExchange exchange)
{
+        DroolsExecutionContext drools = new DroolsExecutionContext(this, exchange);
+        pending.put(exchange.getExchangeId(), drools);
+        drools.start();
+        return drools;
     }
 
-    protected JbiHelper populateWorkingMemory(WorkingMemory memory, MessageExchange exchange)
throws Exception {
-        JbiHelper helper = new JbiHelper(this, exchange, memory);
-        memory.setGlobal("jbi", helper);
-        if (assertedObjects != null) {
-            for (Object o : assertedObjects) {
-                memory.insert(o);
-            }
-        }
-        if (globals != null) {
-            for (Map.Entry<String, Object> e : globals.entrySet()) {
-                memory.setGlobal(e.getKey(), e.getValue());
-            }
-        }
-        return helper;
-    }
 
     public QName getDefaultTargetService() {
         return defaultTargetService;
@@ -309,8 +302,10 @@
     
     @Override
     protected void send(MessageExchange me) throws MessagingException {
-        //remove the exchange from the list of pending exchanges
-        pending.remove(me.getExchangeId());
+        if (me.getStatus() != ExchangeStatus.ACTIVE) {
+            // must be a DONE/ERROR so removing any pending contexts
+            pending.remove(me.getExchangeId());
+        }
         super.send(me);
     }
  }

Added: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsExecutionContext.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsExecutionContext.java?rev=690960&view=auto
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsExecutionContext.java
(added)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/DroolsExecutionContext.java
Mon Sep  1 05:43:16 2008
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.drools;
+
+import java.util.Map;
+
+import javax.jbi.messaging.MessageExchange;
+
+import org.apache.servicemix.drools.model.JbiHelper;
+import org.drools.StatefulSession;
+import org.drools.WorkingMemory;
+import org.drools.event.ActivationCreatedEvent;
+import org.drools.event.DefaultAgendaEventListener;
+
+/**
+ * Represents the execution context of the Drools rules for a single {@link MessageExchange}
+ */
+public class DroolsExecutionContext extends DefaultAgendaEventListener {
+    
+    private final StatefulSession memory;
+    private final JbiHelper helper;
+    private int rulesFired;
+    private MessageExchange exchange;
+    
+    /**
+     * Start a new execution for the specified exchange.
+     * 
+     * This will create and fill {@link WorkingMemory} and register listeners on it to keep
track of things.
+     * 
+     * @param endpoint
+     * @param exchange
+     */
+    public DroolsExecutionContext(DroolsEndpoint endpoint, MessageExchange exchange) {
+        super();
+        this.memory = endpoint.getRuleBase().newStatefulSession();
+        this.memory.addEventListener(this);
+        this.exchange = exchange;
+        this.helper = new JbiHelper(endpoint, exchange, memory);
+        populateWorkingMemory(endpoint);
+    }
+
+    private void populateWorkingMemory(DroolsEndpoint endpoint) {
+        memory.setGlobal("jbi", helper);
+        if (endpoint.getAssertedObjects() != null) {
+            for (Object o : endpoint.getAssertedObjects()) {
+                memory.insert(o);
+            }
+        }
+        if (endpoint.getGlobals() != null) {
+            for (Map.Entry<String, Object> e : endpoint.getGlobals().entrySet()) {
+                memory.setGlobal(e.getKey(), e.getValue());
+            }
+        }
+    }
+    
+    /**
+     * Start the execution context.
+     * This will fire all rules in the rule base.
+     */
+    public void start() {
+        memory.fireAllRules();
+    }
+    
+    /**
+     * Update the working memory, potentially triggering additional rules
+     */
+    public void update() {
+        helper.update();
+    }
+    
+    /**
+     * Stop the context, disposing of all event listeners and working memory contents
+     */
+    public void stop() {
+        memory.removeEventListener(this);
+        memory.dispose();
+    }
+    
+    /**
+     * Get the number of rules that were fired
+     */
+    public int getRulesFired() {
+        return rulesFired;
+    }
+
+    /**
+     * Returns <code>true</code> if the {@link MessageExchange} was handled by
the rules themselves 
+     * (e.g. by answering or faulting the exchange}
+     */
+    public boolean isExchangeHandled() {
+        return helper.isExchangeHandled();
+    }
+    
+    /**
+     * Return the {@link MessageExchange} we are evaluating rules on
+     */
+    public MessageExchange getExchange() {
+        return exchange;
+    }
+    
+    // event handler callbacks
+    @Override
+    public void activationCreated(ActivationCreatedEvent event, WorkingMemory workingMemory)
{
+        rulesFired++;
+    }
+}

Modified: servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java?rev=690960&r1=690959&r2=690960&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
(original)
+++ servicemix/components/engines/servicemix-drools/trunk/src/main/java/org/apache/servicemix/drools/model/JbiHelper.java
Mon Sep  1 05:43:16 2008
@@ -37,28 +37,24 @@
 import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.drools.FactHandle;
 import org.drools.WorkingMemory;
-import org.drools.event.ActivationCreatedEvent;
-import org.drools.event.DefaultAgendaEventListener;
 
 /**
  * A helper class for use inside a rule to forward a message to an endpoint
  * 
  * @version $Revision: 426415 $
  */
-public class JbiHelper extends DefaultAgendaEventListener {
+public class JbiHelper {
 
     private DroolsEndpoint endpoint;
     private Exchange exchange;
     private WorkingMemory memory;
     private FactHandle exchangeFactHandle;
-    private int rulesFired;
     private boolean exchangeHandled = false;
 
     public JbiHelper(DroolsEndpoint endpoint, MessageExchange exchange, WorkingMemory memory)
{
         this.endpoint = endpoint;
         this.exchange = new Exchange(exchange, endpoint.getNamespaceContext());
         this.memory = memory;
-        this.memory.addEventListener(this);
         this.exchangeFactHandle = this.memory.insert(this.exchange);
     }
 
@@ -203,7 +199,7 @@
         NormalizedMessage out = me.createMessage();
         out.setContent(content);
         me.setMessage(out, "out");
-        getChannel().sendSync(me);
+        getChannel().send(me);
         exchangeHandled = true;
         update();
     }
@@ -214,15 +210,6 @@
     public void update() {
         this.memory.update(this.exchangeFactHandle, this.exchange);
     }
-
-    /**
-     * Get the number of rules that were fired
-     * 
-     * @return the number of rules
-     */
-    public int getRulesFired() {
-        return rulesFired;
-    }
     
     /**
      * Has the MessageExchange been handled by the drools endpoint?
@@ -233,10 +220,4 @@
         return exchangeHandled;
     }
 
-    // event handler callbacks
-    @Override
-    public void activationCreated(ActivationCreatedEvent event, WorkingMemory workingMemory)
{
-        rulesFired++;
-    }
-
 }



Mime
View raw message