servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r1149275 - in /servicemix/smx5/trunk/core/src: main/scala/org/apache/servicemix/core/ test/scala/org/apache/servicemix/core/
Date Thu, 21 Jul 2011 17:24:00 GMT
Author: gertv
Date: Thu Jul 21 17:23:59 2011
New Revision: 1149275

URL: http://svn.apache.org/viewvc?rev=1149275&view=rev
Log:
Use a global ProcessorFactory instead of a global InterceptStrategy

Added:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
      - copied, changed from r1149199, servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
      - copied, changed from r1149199, servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala
Removed:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalInterceptStrategy.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala
Modified:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala

Copied: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
(from r1149199, servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala)
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala?p2=servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala&p1=servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala&r1=1149199&r2=1149275&rev=1149275&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/BreadcrumbStrategy.scala
(original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
Thu Jul 21 17:23:59 2011
@@ -16,30 +16,28 @@
  */
 package org.apache.servicemix.core
 
-import org.apache.camel.spi.InterceptStrategy
-import org.apache.camel.model.ProcessorDefinition
-import org.apache.camel.{Exchange, Processor, CamelContext}
+import org.apache.camel.processor.DelegateAsyncProcessor
+import org.apache.camel.{AsyncCallback, Exchange, Processor, CamelContext}
 
 /**
  * The ServiceMix bread crumb strategy adds a header to the message to ensure we can follow
the message throughout
  * different routes and processors.
  */
-class BreadcrumbStrategy extends InterceptStrategy {
+class Breadcrumbs extends DelegateProcessorFactory {
 
-  import BreadcrumbStrategy.{hasBreadCrumb, addBreadCrumb}
+  import Breadcrumbs.{hasBreadCrumb, addBreadCrumb}
 
-  def wrapProcessorInInterceptors(context: CamelContext, definition: ProcessorDefinition[_],
target: Processor, nextTarget: Processor) : Processor =
-    new Processor() {
-      def process(exchange: Exchange) {
-        if (!hasBreadCrumb(exchange)) {
-          addBreadCrumb(exchange)
-        }
-        target.process(exchange)
+  def create(delegate: Processor) = new DelegateAsyncProcessor(delegate) {
+    override def process(exchange: Exchange, callback: AsyncCallback) = {
+      if (!hasBreadCrumb(exchange)) {
+        addBreadCrumb(exchange)
       }
+      processNext(exchange, callback)
     }
+  }
 }
 
-object BreadcrumbStrategy {
+object Breadcrumbs {
 
   /**
    * ServiceMix bread crumb header name
@@ -62,4 +60,26 @@ object BreadcrumbStrategy {
   def addBreadCrumb(exchange: Exchange) : Unit = exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB,
                                                                           exchange.getContext.getUuidGenerator.generateUuid())
 
+  /**
+   * Enable bread crumbs on the target CamelContext
+   */
+  def enable(context: CamelContext) = {
+    context.getProcessorFactory match {
+      case global: GlobalProcessorFactory => global.addFactory(new Breadcrumbs)
+      case _ => //unable to enable bread crumbs
+    }
+  }
+
+  /**
+   * Disable bread crumbs on the target CamelContext
+   */
+  def disable(context: CamelContext) = {
+    context.getProcessorFactory match {
+      case global: GlobalProcessorFactory => for (breadcrumb <- global.factories.filter(_.isInstanceOf[Breadcrumbs]))
{
+        global.removeFactory(breadcrumb)
+      }
+      case _ => //unable to enable bread crumbs
+    }
+  }
+
 }

Added: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala?rev=1149275&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
(added)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/DelegateProcessorFactory.scala
Thu Jul 21 17:23:59 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel.{AsyncProcessor, Processor}
+
+/**
+ * Trait to allow creating delegate processor on demand
+ */
+trait DelegateProcessorFactory {
+
+  /**
+   * Create a new AsyncProcessor instance that can delegate part of its work to the Processor
instance provided
+   */
+  def create(delegate: Processor) : AsyncProcessor
+
+}
\ No newline at end of file

Added: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala?rev=1149275&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
(added)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
Thu Jul 21 17:23:59 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.core
+
+import org.apache.camel.spi.{RouteContext, ProcessorFactory}
+import org.apache.camel.model.ProcessorDefinition
+import collection.mutable.ListBuffer
+import org.apache.camel.processor.DelegateAsyncProcessor
+import org.apache.camel.{AsyncProcessor, Processor, AsyncCallback, Exchange}
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * Global ServiceMix ProcessorFactory implementation, which will take care of wrapping processors
with the additional
+ * functionality provided by the {@link DelegateProcessorFactory} instances
+ */
+class GlobalProcessorFactory extends ProcessorFactory {
+
+  val factories = new ListBuffer[DelegateProcessorFactory]
+  val version = new AtomicInteger(1)
+
+  def addFactory(factory: DelegateProcessorFactory) = triggerUpdate(factories += factory);
+  def removeFactory(factory: DelegateProcessorFactory) = triggerUpdate(factories -= factory);
+
+  def createChildProcessor(context: RouteContext, definition: ProcessorDefinition[_], mandatory:
Boolean) = {
+    nullOrElse(definition.createProcessor(context))(new GlobalDelegateProcessor(context,
definition, _))
+  }
+
+  def createProcessor(context: RouteContext, definition: ProcessorDefinition[_]) = {
+    nullOrElse(definition.createProcessor(context))(new GlobalDelegateProcessor(context,
definition, _))
+  }
+
+  def nullOrElse[S,T](value: S)(function: S => T) = if (value == null) {
+    null.asInstanceOf[T]
+  } else {
+    function(value)
+  }
+
+  def triggerUpdate(block: => Unit) = {
+    block
+    version.incrementAndGet()
+  }
+
+  def configure(original: AsyncProcessor) : AsyncProcessor = {
+    factories.foldLeft(original){ (delegate: AsyncProcessor, factory: DelegateProcessorFactory)
=>
+      factory.create(delegate)
+    }
+  }
+
+  class GlobalDelegateProcessor(context: RouteContext, definition: ProcessorDefinition[_],
target: Processor) extends DelegateAsyncProcessor(target) {
+
+    var currentProcessor = GlobalProcessorFactory.this.configure(getProcessor())
+    var version = GlobalProcessorFactory.this.version.get()
+
+    override def process(exchange: Exchange, callback: AsyncCallback) = {
+      // let's check if processor factories have changed and reconfigure things if necessary
+      if (version < GlobalProcessorFactory.this.version.get) {
+        currentProcessor = GlobalProcessorFactory.this.configure(getProcessor())
+      }
+
+      currentProcessor.process(exchange, callback)
+    }
+
+    override def toString = "ServiceMix Wrapper[" + processor + "]"
+  }
+}
\ No newline at end of file

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala?rev=1149275&r1=1149274&r2=1149275&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
(original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/ProfilerStrategy.scala
Thu Jul 21 17:23:59 2011
@@ -17,7 +17,7 @@
 package org.apache.servicemix.core
 
 import org.apache.camel._
-import model.{RouteDefinition, ChoiceDefinition, WhenDefinition, ProcessorDefinition}
+import model.{RouteDefinition, ProcessorDefinition}
 import processor.DelegateAsyncProcessor
 import spi.{RouteContext, ProcessorFactory}
 import collection.mutable.LinkedHashMap

Copied: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
(from r1149199, servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala)
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala?p2=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala&p1=servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala&r1=1149199&r2=1149275&rev=1149275&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbStrategyTest.scala
(original)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
Thu Jul 21 17:23:59 2011
@@ -26,20 +26,18 @@ import org.apache.camel.scala.dsl.builde
 import scala.collection.JavaConversions.asScalaBuffer
 import org.apache.camel.impl.{DefaultCamelContext, DefaultProducerTemplate}
 
-import org.apache.servicemix.core.BreadcrumbStrategy.{hasBreadCrumb, getBreadCrumb}
+import org.apache.servicemix.core.Breadcrumbs.{hasBreadCrumb, getBreadCrumb}
 import org.scalatest.Assertions._
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
 
 @RunWith(classOf[JUnitRunner])
-class BreadcrumbStrategyTest extends FunSuite with RouteBuilderSupport with BeforeAndAfterAll
with BeforeAndAfterEach {
+class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAfterAll with
BeforeAndAfterEach {
 
   val messages = List("<gingerbread/>", "<cakes/>", "<sugar/>")
 
-  val globals = new GlobalInterceptStrategy
-
   lazy val context = {
     val result = new DefaultCamelContext()
-    result.addInterceptStrategy(globals)
+    result.setProcessorFactory(new GlobalProcessorFactory)
     result.addRoutes(createRouteBuilder())
     result.start()
     result
@@ -57,7 +55,7 @@ class BreadcrumbStrategyTest extends Fun
   }
 
   test("add breadcrumbs to message headers") {
-    globals.addStrategy(new BreadcrumbStrategy)
+    Breadcrumbs.enable(context)
 
     for (body <- messages) {
       template.sendBody("direct:test", body)
@@ -79,8 +77,7 @@ class BreadcrumbStrategyTest extends Fun
   }
 
   test("bread crumb strategy can be disabled if necessary") {
-    val breadcrumbs = new BreadcrumbStrategy
-    globals.addStrategy(breadcrumbs)
+    Breadcrumbs.enable(context)
 
     for (body <- messages) {
       template.sendBody("direct:test", body)
@@ -101,7 +98,7 @@ class BreadcrumbStrategyTest extends Fun
     assert(hansels == gretels, "Gretel should be able to find all of Hansel's bread crumbs")
 
     // let's now disable the bread crumbs and just continue with same context/processors/...
-    globals.removeStrategy(breadcrumbs)
+    Breadcrumbs.disable(context)
     MockEndpoint.resetMocks(context)
 
     for (body <- messages) {
@@ -123,7 +120,7 @@ class BreadcrumbStrategyTest extends Fun
 
   override protected def afterEach() = {
     MockEndpoint.resetMocks(context)
-    globals.strategies.clear()
+    context.getProcessorFactory.asInstanceOf[GlobalProcessorFactory].factories.clear
   }
 
   def getMockEndpoint(name: String) = context.getEndpoint(name, classOf[MockEndpoint])



Mime
View raw message