servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r1149640 - in /servicemix/smx5/trunk/core/src: main/scala/org/apache/servicemix/core/ test/resources/ test/scala/org/apache/servicemix/core/
Date Fri, 22 Jul 2011 15:48:44 GMT
Author: gnodet
Date: Fri Jul 22 15:48:43 2011
New Revision: 1149640

URL: http://svn.apache.org/viewvc?rev=1149640&view=rev
Log:
Support multiple bread crumbs and wrap the AggregationStrategy accordingly

Added:
    servicemix/smx5/trunk/core/src/test/resources/
    servicemix/smx5/trunk/core/src/test/resources/log4j.properties
Modified:
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
    servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
    servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala?rev=1149640&r1=1149639&r2=1149640&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
(original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/Breadcrumbs.scala
Fri Jul 22 15:48:43 2011
@@ -16,8 +16,11 @@
  */
 package org.apache.servicemix.core
 
-import org.apache.camel.processor.DelegateAsyncProcessor
 import org.apache.camel.{AsyncCallback, Exchange, Processor, CamelContext}
+import org.apache.camel.processor.{DelegateProcessor, DelegateAsyncProcessor}
+import org.apache.camel.processor.aggregate.{AggregationStrategy, AggregateProcessor}
+import collection.mutable.HashSet
+import collection.Iterable
 
 /**
  * The ServiceMix bread crumb strategy adds a header to the message to ensure we can follow
the message throughout
@@ -25,9 +28,9 @@ import org.apache.camel.{AsyncCallback, 
  */
 class Breadcrumbs extends DelegateProcessorFactory {
 
-  import Breadcrumbs.{hasBreadCrumb, addBreadCrumb}
+  import Breadcrumbs.{hasBreadCrumb, addBreadCrumb, getBreadCrumb}
 
-  def create(delegate: Processor) = new DelegateAsyncProcessor(delegate) {
+  def create(delegate: Processor) = new DelegateAsyncProcessor(process(delegate)) {
     override def process(exchange: Exchange, callback: AsyncCallback) = {
       if (!hasBreadCrumb(exchange)) {
         addBreadCrumb(exchange)
@@ -35,6 +38,29 @@ class Breadcrumbs extends DelegateProces
       processNext(exchange, callback)
     }
   }
+
+  private def process(delegate: Processor) : Processor = {
+    var p = delegate
+    if (p.isInstanceOf[DelegateProcessor]) {
+      p = p.asInstanceOf[DelegateProcessor].getProcessor
+    }
+    if (p.isInstanceOf[AggregateProcessor]) {
+      val agg = p.asInstanceOf[AggregateProcessor]
+      val oldstrat = agg.getAggregationStrategy
+      val strategy = new AggregationStrategy {
+        def aggregate(oldExchange: Exchange, newExchange: Exchange) : Exchange = {
+          val ex = oldstrat.aggregate(oldExchange, newExchange)
+          if (oldExchange == null)
+            addBreadCrumb(ex, List(getBreadCrumb(newExchange)))
+          else
+            addBreadCrumb(ex, List(getBreadCrumb(oldExchange), getBreadCrumb(newExchange)))
+          ex
+        }
+      }
+      agg.setAggregationStrategy(strategy)
+    }
+    delegate
+  }
 }
 
 object Breadcrumbs {
@@ -50,15 +76,43 @@ object Breadcrumbs {
   def hasBreadCrumb(exchange: Exchange) : Boolean = getBreadCrumb(exchange) != null
 
   /**
-   * Get the ServiceMix bread crumb value for an Exchange
+   * Get the ServiceMix bread crumb value for an Exchange  (eventually a comma separated
list)
    */
   def getBreadCrumb(exchange: Exchange) : String = exchange.getIn.getHeader(SERVICEMIX_BREAD_CRUMB,
classOf[String])
 
   /**
+   * Get the ServiceMix bread crumb values for an Exchange
+   */
+  def getBreadCrumbs(exchange: Exchange) : Set[String] = getBreadCrumbs(getBreadCrumb(exchange))
+
+  def getBreadCrumbs(breadcrumbs: String) : Set[String] = if (breadcrumbs == null) Set[String]()
else breadcrumbs.split(",").toSet
+
+  /**
    * Add a ServiceMix bread crumb to an Exchange
    */
-  def addBreadCrumb(exchange: Exchange) : Unit = exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB,
-                                                                          exchange.getContext.getUuidGenerator.generateUuid())
+  def addBreadCrumb(exchange: Exchange) : Unit = setBreadCrumb(exchange, exchange.getContext.getUuidGenerator.generateUuid())
+
+  /**
+   * Add a number of ServiceMix bread crumbs to an Exchange
+   */
+  def addBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit = {
+    var bcs = new HashSet[String]()
+    bcs = bcs ++ getBreadCrumbs(exchange)
+    for (bc <- breadcrumbs) {
+      bcs = bcs ++ getBreadCrumbs(bc)
+    }
+    setBreadCrumb(exchange, bcs)
+  }
+
+  /**
+   * Set the ServiceMix bread crumb to an Exchange
+   */
+  def setBreadCrumb(exchange: Exchange, breadcrumb: String) : Unit = exchange.getIn.setHeader(SERVICEMIX_BREAD_CRUMB,
breadcrumb)
+
+  /**
+   * Set the ServiceMix bread crumbs to an Exchange
+   */
+  def setBreadCrumb(exchange: Exchange, breadcrumbs: Iterable[String]) : Unit = setBreadCrumb(exchange,
breadcrumbs.mkString(","))
 
   /**
    * Enable bread crumbs on the target CamelContext
@@ -82,4 +136,15 @@ object Breadcrumbs {
     }
   }
 
+  private def nullOrElse[S,T](value: S)(function: S => T) : T = if (value == null) {
+    null.asInstanceOf[T]
+  } else {
+    function(value)
+  }
+  private def nullOrElse[S,T](value: S, default: T)(function: S => T) : T = if (value
== null) {
+    default
+  } else {
+    function(value)
+  }
+
 }

Modified: servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala?rev=1149640&r1=1149639&r2=1149640&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
(original)
+++ servicemix/smx5/trunk/core/src/main/scala/org/apache/servicemix/core/GlobalProcessorFactory.scala
Fri Jul 22 15:48:43 2011
@@ -36,7 +36,7 @@ class GlobalProcessorFactory extends Pro
   def removeFactory(factory: DelegateProcessorFactory) = triggerUpdate(factories -= factory);
 
   def createChildProcessor(context: RouteContext, definition: ProcessorDefinition[_], mandatory:
Boolean) = {
-    nullOrElse(definition.createProcessor(context))(new GlobalDelegateProcessor(context,
definition, _))
+    nullOrElse(context.createProcessor(definition))(new GlobalDelegateProcessor(context,
definition, _))
   }
 
   def createProcessor(context: RouteContext, definition: ProcessorDefinition[_]) = {

Added: servicemix/smx5/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/resources/log4j.properties?rev=1149640&view=auto
==============================================================================
--- servicemix/smx5/trunk/core/src/test/resources/log4j.properties (added)
+++ servicemix/smx5/trunk/core/src/test/resources/log4j.properties Fri Jul 22 15:48:43 2011
@@ -0,0 +1,26 @@
+#
+# Copyright (C) 2011, FuseSource Corp.  All rights reserved.
+# http://fusesource.com
+#
+# The software in this package is published under the terms of the
+# CDDL license a copy of which has been included with this distribution
+# in the license.txt file.
+#
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, console, file
+
+# Console will only display warnnings
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
+log4j.appender.console.threshold=WARN
+
+# File appender will contain all info messages
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
+log4j.appender.file.file=target/test.log
+log4j.appender.file.append=true

Modified: servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
URL: http://svn.apache.org/viewvc/servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala?rev=1149640&r1=1149639&r2=1149640&view=diff
==============================================================================
--- servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
(original)
+++ servicemix/smx5/trunk/core/src/test/scala/org/apache/servicemix/core/BreadcrumbsTest.scala
Fri Jul 22 15:48:43 2011
@@ -26,9 +26,9 @@ import org.apache.camel.scala.dsl.builde
 import scala.collection.JavaConversions.asScalaBuffer
 import org.apache.camel.impl.{DefaultCamelContext, DefaultProducerTemplate}
 
-import org.apache.servicemix.core.Breadcrumbs.{hasBreadCrumb, getBreadCrumb}
-import org.scalatest.Assertions._
+import org.apache.servicemix.core.Breadcrumbs.{hasBreadCrumb, getBreadCrumb, getBreadCrumbs}
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
 
 @RunWith(classOf[JUnitRunner])
 class BreadcrumbsTest extends FunSuite with RouteBuilderSupport with BeforeAndAfterAll with
BeforeAndAfterEach {
@@ -118,6 +118,23 @@ class BreadcrumbsTest extends FunSuite w
       assert(!hasBreadCrumb(exchange), "There should be no more bread crumbs here")
   }
 
+  test("bread crumb strategy with aggregator") {
+    Breadcrumbs.enable(context)
+
+    for (body <- messages) {
+      template.sendBody("direct:aggregate", body)
+    }
+
+    val aggres = getMockEndpoint("mock:aggres")
+    aggres.expectedMessageCount(1)
+    aggres.assertIsSatisfied()
+
+    val exchange = aggres.getExchanges.get(0)
+    val bcs = getBreadCrumbs(exchange)
+    assert(bcs.size == messages.size, "There should be no more bread crumbs here")
+  }
+
+
   override protected def afterEach() = {
     MockEndpoint.resetMocks(context)
     context.getProcessorFactory.asInstanceOf[GlobalProcessorFactory].factories.clear
@@ -126,11 +143,18 @@ class BreadcrumbsTest extends FunSuite w
   def getMockEndpoint(name: String) = context.getEndpoint(name, classOf[MockEndpoint])
 
   def createRouteBuilder() = new RouteBuilder {
+      "direct:aggregate" ==> {
+        aggregate (true, new UseLatestAggregationStrategy()).completionSize(messages.size)
{
+          to("mock:aggres")
+        }
+      }
+
       "direct:test" ==> {
         to("mock:hansel")
         to("seda:forest")
       }
 
       "seda:forest" to "mock:gretel"
+
     }
 }
\ No newline at end of file



Mime
View raw message