Author: gnodet
Date: Tue May 9 01:50:48 2006
New Revision: 405347
URL: http://svn.apache.org/viewcvs?rev=405347&view=rev
Log:
Try to fix test (timing issue i guess)
Modified:
incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java?rev=405347&r1=405346&r2=405347&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
(original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
Tue May 9 01:50:48 2006
@@ -55,230 +55,229 @@
import org.xml.sax.InputSource;
public class WSNComponentTest extends TestCase {
-
- public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification",
"NotificationBroker");
-
- private JBIContainer jbi;
- private BrokerService jmsBroker;
- private NotificationBroker wsnBroker;
+
+ public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification",
"NotificationBroker");
+
+ private JBIContainer jbi;
+ private BrokerService jmsBroker;
+ private NotificationBroker wsnBroker;
private CreatePullPoint wsnCreatePullPoint;
private WSNComponent wsnComponent;
-
- protected void setUp() throws Exception {
- jmsBroker = new BrokerService();
- jmsBroker.setPersistent(false);
- jmsBroker.addConnector("vm://localhost");
- jmsBroker.start();
-
- jbi = new JBIContainer();
- jbi.setEmbedded(true);
- jbi.init();
- jbi.start();
-
- wsnComponent = new WSNComponent();
+
+ protected void setUp() throws Exception {
+ jmsBroker = new BrokerService();
+ jmsBroker.setPersistent(false);
+ jmsBroker.addConnector("vm://localhost");
+ jmsBroker.start();
+
+ jbi = new JBIContainer();
+ jbi.setEmbedded(true);
+ jbi.init();
+ jbi.start();
+
+ wsnComponent = new WSNComponent();
wsnComponent.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
- ActivationSpec as = new ActivationSpec();
- as.setComponentName("servicemix-wsn2005");
- as.setComponent(wsnComponent);
- jbi.activateComponent(as);
-
- wsnBroker = new NotificationBroker(jbi);
+ ActivationSpec as = new ActivationSpec();
+ as.setComponentName("servicemix-wsn2005");
+ as.setComponent(wsnComponent);
+ jbi.activateComponent(as);
+
+ wsnBroker = new NotificationBroker(jbi);
wsnCreatePullPoint = new CreatePullPoint(jbi);
- }
-
- protected void tearDown() throws Exception {
- if (jbi != null) {
- jbi.shutDown();
- }
- if (jmsBroker != null) {
- jmsBroker.stop();
- }
- }
-
+ }
+
+ protected void tearDown() throws Exception {
+ if (jbi != null) {
+ jbi.shutDown();
+ }
+ if (jmsBroker != null) {
+ jmsBroker.stop();
+ }
+ }
+
public void testWSDL() throws Exception {
ServiceEndpoint[] ses = jbi.getRegistry().getEndpointsForInterface(
- new QName("http://docs.oasis-open.org/wsn/brw-2", "NotificationBroker"));
+ new QName("http://docs.oasis-open.org/wsn/brw-2", "NotificationBroker"));
assertNotNull(ses);
assertEquals(1, ses.length);
}
-
- public void testInvalidSubscribription() throws Exception {
- try {
- wsnBroker.subscribe(null, null, null);
- fail("Expected an exception");
- } catch (JBIException e) {
- // ok
- }
- }
-
- public void testNotify() throws Exception {
- ReceiverComponent receiver = new ReceiverComponent();
- jbi.activateComponent(receiver, "receiver");
-
- EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
- wsnBroker.subscribe(consumer, "myTopic", null);
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- // Wait for notification
- Thread.sleep(50);
-
- receiver.getMessageList().assertMessagesReceived(1);
- NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
- Node node = new SourceTransformer().toDOMNode(msg);
- assertEquals("Notify", node.getLocalName());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testRawNotify() throws Exception {
- ReceiverComponent receiver = new ReceiverComponent();
- jbi.activateComponent(receiver, "receiver");
-
-
- // START SNIPPET: notify
- EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
- wsnBroker.subscribe(consumer, "myTopic", null, true);
-
- Element body = parse("<hello>world</hello>");
+
+ public void testInvalidSubscribription() throws Exception {
+ try {
+ wsnBroker.subscribe(null, null, null);
+ fail("Expected an exception");
+ } catch (JBIException e) {
+ // ok
+ }
+ }
+
+ public void testNotify() throws Exception {
+ ReceiverComponent receiver = new ReceiverComponent();
+ jbi.activateComponent(receiver, "receiver");
+
+ EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+ wsnBroker.subscribe(consumer, "myTopic", null);
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ // Wait for notification
+ Thread.sleep(50);
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
+ Node node = new SourceTransformer().toDOMNode(msg);
+ assertEquals("Notify", node.getLocalName());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testRawNotify() throws Exception {
+ ReceiverComponent receiver = new ReceiverComponent();
+ jbi.activateComponent(receiver, "receiver");
+
+ // START SNIPPET: notify
+ EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+ wsnBroker.subscribe(consumer, "myTopic", null, true);
+
+ Element body = parse("<hello>world</hello>");
wsnBroker.notify("myTopic", body);
// END SNIPPET: notify
-
- // Wait for notification
- Thread.sleep(50);
-
- receiver.getMessageList().assertMessagesReceived(1);
- NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
- Node node = new SourceTransformer().toDOMNode(msg);
- assertEquals("hello", node.getLocalName());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
- public void testUnsubscribe() throws Exception {
+ // Wait for notification
+ Thread.sleep(50);
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
+ Node node = new SourceTransformer().toDOMNode(msg);
+ assertEquals("hello", node.getLocalName());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testUnsubscribe() throws Exception {
// START SNIPPET: sub
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic",
null);
// END SNIPPET: sub
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(1, pullPoint.getMessages(0).size());
-
- subscription.unsubscribe();
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(0, pullPoint.getMessages(0).size());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testPauseResume() throws Exception {
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(1, pullPoint.getMessages(0).size());
-
- subscription.pause();
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(0, pullPoint.getMessages(0).size());
-
- subscription.resume();
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(1, pullPoint.getMessages(0).size());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testPull() throws Exception {
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
- assertNotNull(msgs);
- assertEquals(1, msgs.size());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testPullWithFilter() throws Exception {
- PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint();
- PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint();
- wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
- wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
-
- wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
- // Wait for notification
- Thread.sleep(500);
-
- assertEquals(1, pullPoint1.getMessages(0).size());
- assertEquals(0, pullPoint2.getMessages(0).size());
-
- wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
- // Wait for notification
- Thread.sleep(500);
-
- assertEquals(0, pullPoint1.getMessages(0).size());
- assertEquals(1, pullPoint2.getMessages(0).size());
-
- wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
- // Wait for notification
- Thread.sleep(500);
-
- assertEquals(0, pullPoint1.getMessages(0).size());
- assertEquals(0, pullPoint2.getMessages(0).size());
- }
-
- public void testDemandBasedPublisher() throws Exception {
- PublisherComponent publisherComponent = new PublisherComponent();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(1, pullPoint.getMessages(0).size());
+
+ subscription.unsubscribe();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(0, pullPoint.getMessages(0).size());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testPauseResume() throws Exception {
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic",
null);
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(1, pullPoint.getMessages(0).size());
+
+ subscription.pause();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(0, pullPoint.getMessages(0).size());
+
+ subscription.resume();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(1, pullPoint.getMessages(0).size());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testPull() throws Exception {
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testPullWithFilter() throws Exception {
+ PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint();
+ PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint();
+ wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
+ wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
+
+ wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
+ // Wait for notification
+ Thread.sleep(500);
+
+ assertEquals(1, pullPoint1.getMessages(0).size());
+ assertEquals(0, pullPoint2.getMessages(0).size());
+
+ wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
+ // Wait for notification
+ Thread.sleep(500);
+
+ assertEquals(0, pullPoint1.getMessages(0).size());
+ assertEquals(1, pullPoint2.getMessages(0).size());
+
+ wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
+ // Wait for notification
+ Thread.sleep(500);
+
+ assertEquals(0, pullPoint1.getMessages(0).size());
+ assertEquals(0, pullPoint2.getMessages(0).size());
+ }
+
+ public void testDemandBasedPublisher() throws Exception {
+ PublisherComponent publisherComponent = new PublisherComponent();
publisherComponent.setService(new QName("http://servicemix.org/example", "publisher"));
publisherComponent.setEndpoint("publisher");
publisherComponent.setTopic("myTopic");
publisherComponent.setDemand(true);
- jbi.activateComponent(publisherComponent, "publisher");
-
- Thread.sleep(50);
- assertNull(publisherComponent.getSubscription());
-
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- Thread.sleep(500);
- assertNotNull(publisherComponent.getSubscription());
-
- subscription.unsubscribe();
-
- Thread.sleep(500);
- assertNull(publisherComponent.getSubscription());
-
- Thread.sleep(50);
- }
-
+ jbi.activateComponent(publisherComponent, "publisher");
+
+ Thread.sleep(50);
+ assertNull(publisherComponent.getSubscription());
+
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic",
null);
+
+ Thread.sleep(500);
+ assertNotNull(publisherComponent.getSubscription());
+
+ subscription.unsubscribe();
+
+ Thread.sleep(500);
+ assertNull(publisherComponent.getSubscription());
+
+ Thread.sleep(50);
+ }
+
public void testDeployPullPoint() throws Exception {
URL url = getClass().getClassLoader().getResource("pullpoint/pullpoint.xml");
File path = new File(new URI(url.toString()));
@@ -286,26 +285,28 @@
wsnComponent.getServiceUnitManager().deploy("pullpoint", path.getAbsolutePath());
wsnComponent.getServiceUnitManager().start("pullpoint");
-
+
wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- PullPoint pullPoint = new PullPoint(AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"),
- jbi);
+ PullPoint pullPoint = new PullPoint(
+ AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"),
+ jbi);
+ Thread.sleep(50);
assertEquals(1, pullPoint.getMessages(0).size());
}
-
+
public void testDeploySubscription() throws Exception {
URL url = getClass().getClassLoader().getResource("subscription/subscribe.xml");
File path = new File(new URI(url.toString()));
path = path.getParentFile();
wsnComponent.getServiceUnitManager().deploy("subscription", path.getAbsolutePath());
-
+
ActivationSpec consumer = new ActivationSpec();
consumer.setService(new QName("http://www.consumer.org", "service"));
consumer.setEndpoint("endpoint");
Receiver receiver = new ReceiverComponent();
consumer.setComponent(receiver);
jbi.activateComponent(consumer);
-
+
wsnComponent.getServiceUnitManager().start("subscription");
wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
@@ -313,14 +314,14 @@
Thread.sleep(50);
receiver.getMessageList().assertMessagesReceived(1);
receiver.getMessageList().flushMessages();
-
+
wsnComponent.getServiceUnitManager().stop("subscription");
wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
// Wait for notification
Thread.sleep(50);
assertEquals(0, receiver.getMessageList().flushMessages().size());
-
+
wsnComponent.getServiceUnitManager().start("subscription");
wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
@@ -329,53 +330,53 @@
receiver.getMessageList().assertMessagesReceived(1);
receiver.getMessageList().flushMessages();
}
-
+
public void testDynamicSubscription() throws Exception {
HttpSpringComponent httpComponent = new HttpSpringComponent();
-
+
HttpEndpoint httpWSNBroker = new HttpEndpoint();
httpWSNBroker.setService(new QName("http://servicemix.org/wsnotification", "NotificationBroker"));
httpWSNBroker.setEndpoint("Broker");
httpWSNBroker.setRoleAsString("consumer");
httpWSNBroker.setLocationURI("http://localhost:8192/WSNBroker/");
httpWSNBroker.setSoap(true);
-
+
HttpEndpoint httpReceiver = new HttpEndpoint();
httpReceiver.setService(new QName("receiver"));
httpReceiver.setEndpoint("endpoint");
httpReceiver.setLocationURI("http://localhost:8192/Receiver/");
httpReceiver.setDefaultMep(SoapHelper.IN_ONLY);
httpReceiver.setSoap(true);
-
+
httpComponent.setEndpoints(new HttpEndpoint[] { httpWSNBroker, httpReceiver });
jbi.activateComponent(new ActivationSpec("servicemix-http", httpComponent));
-
+
ReceiverComponent receiver = new ReceiverComponent();
receiver.setService(new QName("receiver"));
receiver.setEndpoint("endpoint");
jbi.activateComponent(new ActivationSpec("receiver", receiver));
-
+
EndpointReferenceType epr = new EndpointReferenceType();
epr.setAddress(new AttributedURIType());
epr.getAddress().setValue("http://localhost:8192/Receiver/?http.soap=true");
wsnBroker.subscribe(epr, "myTopic", null);
wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-
+
receiver.getMessageList().assertMessagesReceived(1);
}
-
- protected Element parse(String txt) throws Exception {
- DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
- InputSource is = new InputSource(new StringReader(txt));
- Document doc = builder.parse(is);
- return doc.getDocumentElement();
- }
-
- protected EndpointReferenceType createEPR(QName service, String endpoint) {
- EndpointReferenceType epr = new EndpointReferenceType();
- epr.setAddress(new AttributedURIType());
- epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/"
+ endpoint);
- return epr;
- }
+
+ protected Element parse(String txt) throws Exception {
+ DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
+ InputSource is = new InputSource(new StringReader(txt));
+ Document doc = builder.parse(is);
+ return doc.getDocumentElement();
+ }
+
+ protected EndpointReferenceType createEPR(QName service, String endpoint) {
+ EndpointReferenceType epr = new EndpointReferenceType();
+ epr.setAddress(new AttributedURIType());
+ epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart()
+ "/" + endpoint);
+ return epr;
+ }
}
|