Author: chirino Date: Fri Jun 8 16:04:00 2012 New Revision: 1348133 URL: http://svn.apache.org/viewvc?rev=1348133&view=rev Log: Network bridging now working with a large load of messages. Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala?rev=1348133&r1=1348132&r2=1348133&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerLoadMonitor.scala Fri Jun 8 16:04:00 2012 @@ -39,16 +39,15 @@ trait BrokerLoadListener { def on_load_change(broker_load:LoadStatusDTO) } object RestLoadMonitor extends Log -class RestLoadMonitor extends BaseService with BrokerLoadMonitor { +class RestLoadMonitor(manager:NetworkManager) extends BaseService with BrokerLoadMonitor { import collection.JavaConversions._ import RestLoadMonitor._ val dispatch_queue = createQueue("rest load monitor") val members = HashMap[String, LoadMonitor]() - var poll_interval = 5*1000; protected def _start(on_completed: Task) = { - schedule_reoccurring(1, SECONDS) { + schedule_reoccurring(manager.monitoring_interval, SECONDS) { for(monitor <- members.values) { monitor.poll } @@ -110,7 +109,9 @@ class RestLoadMonitor extends BaseServic dispatch_queue { for(service <- member.services) { if( service.kind == "web_admin" ) { - members.put(member.id, LoadMonitor(member.id, new URL(service.address))) + var monitor = LoadMonitor(member.id, new URL(service.address)) + members.put(member.id, monitor) + monitor.poll } } } Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala?rev=1348133&r1=1348132&r2=1348133&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/BrokerMetrics.scala Fri Jun 8 16:04:00 2012 @@ -63,7 +63,7 @@ class BrokerMetrics() { dest_load.message_size = dest.message_size // Lets not include the network consumers in the the consumer rates.. - val consumers = dest.consumers.filter(_.user == network_user).toArray + val consumers = dest.consumers.filter(_.user != network_user).toArray dest_load.consumer_count = consumers.size dest_load.dequeue_size_rate = 0 Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala?rev=1348133&r1=1348132&r2=1348133&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/NetworkManager.scala Fri Jun 8 16:04:00 2012 @@ -56,6 +56,10 @@ class NetworkManager(broker: Broker) ext var metrics_map = HashMap[String, BrokerMetrics]() val bridges = HashMap[BridgeInfo, BridgeDeployer]() + def network_user = Option(config.user).getOrElse("network") + def network_password = config.password + def monitoring_interval = OptionSupport(config.monitoring_interval).getOrElse(5) + protected def _start(on_completed: Task) = { import collection.JavaConversions._ @@ -65,7 +69,7 @@ class NetworkManager(broker: Broker) ext membership_monitor.listener = this membership_monitor.start(NOOP) - load_monitor = new RestLoadMonitor + load_monitor = new RestLoadMonitor(this) load_monitor.listener = this load_monitor.start(NOOP) @@ -93,7 +97,7 @@ class NetworkManager(broker: Broker) ext } def on_load_change(dto: LoadStatusDTO) = dispatch_queue { - metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, config.user) + metrics_map.getOrElseUpdate(dto.id, new BrokerMetrics()).update(dto, network_user) } def load_analysis = { Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala?rev=1348133&r1=1348132&r2=1348133&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/StompBridgingStrategy.scala Fri Jun 8 16:04:00 2012 @@ -40,20 +40,22 @@ class StompBridgingStrategy(val manager: val bridges = HashMap[(String, String), Bridge]() - def bridge_user = manager.config.user - def bridge_password = manager.config.password + def network_user = manager.network_user + def network_password = manager.network_password - def deploy(info:BridgeInfo) = { + def deploy(bridge_info:BridgeInfo) = { dispatch_queue.assertExecuting() - val bridge = bridges.getOrElseUpdate((info.from, info.to), new Bridge(info.from, info.to)) - bridge.deploy(info.kind, info.dest) + val bridge = bridges.getOrElseUpdate((bridge_info.from, bridge_info.to), new Bridge(bridge_info.from, bridge_info.to)) + info("Deploying bridge for destination %s, from %s to %s", bridge_info.dest, bridge_info.from, bridge_info.to) + bridge.deploy(bridge_info.kind, bridge_info.dest) } - def undeploy(info:BridgeInfo) = { + def undeploy(bridge_info:BridgeInfo) = { dispatch_queue.assertExecuting() - for( bridge <- bridges.get((info.from, info.to)) ) { - bridge.undeploy(info.kind, info.dest) + for( bridge <- bridges.get((bridge_info.from, bridge_info.to)) ) { + info("Undeploying bridge for destination %s, from %s to %s", bridge_info.dest, bridge_info.from, bridge_info.to) + bridge.undeploy(bridge_info.kind, bridge_info.dest) } } @@ -69,7 +71,8 @@ class StompBridgingStrategy(val manager: case MESSAGE => // forward it.. frame.action(SEND) - println("forwarding message: "+frame.getHeader(MESSAGE_ID)) + var msgid = frame.getHeader(MESSAGE_ID) + debug("forwarding message: %s", msgid) to_connection.send(frame, ()=>{ // Ack it if the original connection is still up... // TODO: if it's not a we will probably get a dup/redelivery. @@ -77,13 +80,13 @@ class StompBridgingStrategy(val manager: if( from_connection.state eq original_state ) { val ack = new StompFrame(ACK); ack.addHeader(SUBSCRIPTION, frame.getHeader(SUBSCRIPTION)) - ack.addHeader(MESSAGE_ID, frame.getHeader(MESSAGE_ID)) + ack.addHeader(MESSAGE_ID, msgid) from_connection.send(ack, null) - println("forwarded message, now acking: "+frame.getHeader(MESSAGE_ID)) + debug("forwarded message, now acking: %s", msgid) } }) case _ => - println("unhandled stomp frame: "+frame) + println("unhandled stomp frame: %s", frame) } } @@ -128,8 +131,8 @@ class StompBridgingStrategy(val manager: val to_stomp = new Stomp() to_stomp.setDispatchQueue(dispatch_queue) to_stomp.setRemoteURI(uri) - to_stomp.setLogin(bridge_user) - to_stomp.setPasscode(bridge_password) + to_stomp.setLogin(network_user) + to_stomp.setPasscode(network_password) to_stomp.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL) val headers = new Properties() headers.put("client-type", "apollo-bridge") @@ -175,17 +178,19 @@ class StompBridgingStrategy(val manager: // Reconnect any subscriptions. subscriptions.keySet.foreach(subscribe(_)) // Re-send messages.. - pending_sends.values.foreach(x => do_send(x._1, x._2)) + pending_sends.values.foreach(x => request(x._1, x._2)) } - def do_send(frame:StompFrame, on_complete: ()=>Unit) = { + def request(frame:StompFrame, on_complete: ()=>Unit) = { connection.request(frame, new org.fusesource.stomp.client.Callback[StompFrame] { override def onSuccess(response: StompFrame) = on_complete() override def onFailure(value: Throwable) = failed(value) }) } + def send(frame:StompFrame) = connection.send(frame, null) + def failed(value: Throwable)= { debug("Bridge connection to %s failed due to: ", uri, value) close(ReconnectDelayState(1000)) @@ -235,14 +240,18 @@ class StompBridgingStrategy(val manager: } } - def send(destination:StompFrame, on_complete: ()=>Unit) = { - val id = next_id - val cb = ()=>{ - pending_sends.remove(id) - on_complete() + def send(frame:StompFrame, on_complete: ()=>Unit) = { + if( on_complete!=null ) { + val id = next_id + val cb = ()=>{ + pending_sends.remove(id) + on_complete() + } + pending_sends.put(id, (frame, cb)) + react[ConnectedState] { state => state.request(frame, cb) } + } else { + react[ConnectedState] { state => state.send(frame) } } - pending_sends.put(id, (destination, cb)) - react[ConnectedState] { state => state.do_send(destination, cb) } } } Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java?rev=1348133&r1=1348132&r2=1348133&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/dto/NetworkManagerDTO.java Fri Jun 8 16:04:00 2012 @@ -43,6 +43,9 @@ public class NetworkManagerDTO extends C @XmlAttribute(name="duplex") public Boolean duplex; + @XmlAttribute(name="monitoring_interval") + public Integer monitoring_interval; + @XmlElement(name="member") public ArrayList members = new ArrayList(); } Modified: activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala?rev=1348133&r1=1348132&r2=1348133&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-network/src/test/scala/org/apache/activemq/apollo/broker/network/NetworkTest.scala Fri Jun 8 16:04:00 2012 @@ -19,12 +19,13 @@ package org.apache.activemq.apollo.broke import org.scalatest.matchers.ShouldMatchers import org.scalatest.BeforeAndAfterEach -import org.apache.activemq.apollo.broker.MultiBrokerTestSupport import javax.jms.Session._ import org.fusesource.stomp.jms.{StompJmsDestination, StompJmsConnectionFactory} import collection.mutable.ListBuffer import javax.jms.{Message, TextMessage, Connection, ConnectionFactory} import java.util.concurrent.TimeUnit._ +import org.apache.activemq.apollo.broker.{Broker, MultiBrokerTestSupport} +import org.fusesource.hawtdispatch._ class NetworkTest extends MultiBrokerTestSupport with ShouldMatchers with BeforeAndAfterEach { @@ -69,17 +70,27 @@ class NetworkTest extends MultiBrokerTes case _ => None } - test("forward one message") { + test("forward 10000 messages") { val connections = create_connections - - val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE) - val p0 = s0.createProducer(test_destination()) - p0.send(s0.createTextMessage("1")) + val message_count = 10000; + + var dest = test_destination() + val data = "x" * 1024 + + Broker.BLOCKABLE_THREAD_POOL { + val s0 = connections(0).createSession(false, AUTO_ACKNOWLEDGE) + val p0 = s0.createProducer(dest) + for( i <- 0 until message_count ) { + p0.send(s0.createTextMessage(i+":"+data)) + } + } val s1 = connections(1).createSession(false, AUTO_ACKNOWLEDGE) - val c1 = s1.createConsumer(test_destination()) + val c1 = s1.createConsumer(dest) within(30, SECONDS) { - text(c1.receive()) should be(Some("1")) + for( i <- 0 until message_count ) { + text(c1.receive()) should be(Some(i+":"+data)) + } } }