servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r358003 - in /incubator/servicemix/trunk/servicemix-wsn2005: ./ src/main/java/org/servicemix/wsn/client/ src/test/java/org/servicemix/wsn/ src/test/java/org/servicemix/wsn/component/
Date Tue, 20 Dec 2005 14:47:37 GMT
Author: gnodet
Date: Tue Dec 20 06:47:29 2005
New Revision: 358003

URL: http://svn.apache.org/viewcvs?rev=358003&view=rev
Log:
Add a client api for wsn2005 component

Added:
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/JAXBMarshaller.java
      - copied, changed from r357814, incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/JAXBMarshaller.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/PullPoint.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Subscription.java
Removed:
    incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/JAXBMarshaller.java
Modified:
    incubator/servicemix/trunk/servicemix-wsn2005/project.xml
    incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java

Modified: incubator/servicemix/trunk/servicemix-wsn2005/project.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/project.xml?rev=358003&r1=358002&r2=358003&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/project.xml (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/project.xml Tue Dec 20 06:47:29 2005
@@ -94,7 +94,7 @@
       <artifactId>servicemix-jbi</artifactId>
       <version>${pom.currentVersion}</version>
       <properties>
-        <_eclipse.dependency>true</_eclipse.dependency>
+        <eclipse.dependency>true</eclipse.dependency>
       </properties>
     </dependency>
     <dependency>
@@ -102,7 +102,7 @@
       <artifactId>servicemix-core</artifactId>
       <version>${pom.currentVersion}</version>
       <properties>
-        <_eclipse.dependency>true</_eclipse.dependency>
+        <eclipse.dependency>true</eclipse.dependency>
       </properties>
     </dependency>
     <dependency>
@@ -118,7 +118,7 @@
       <artifactId>servicemix-components</artifactId>
       <version>${pom.currentVersion}</version>
       <properties>
-        <_eclipse.dependency>true</_eclipse.dependency>
+        <eclipse.dependency>true</eclipse.dependency>
       </properties>
     </dependency>
     <dependency>

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java?rev=358003&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java
(added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java
Tue Dec 20 06:47:29 2005
@@ -0,0 +1,73 @@
+package org.servicemix.wsn.client;
+
+import javax.jbi.JBIException;
+import javax.xml.namespace.QName;
+
+import org.servicemix.client.ServiceMixClient;
+import org.servicemix.jbi.resolver.EndpointResolver;
+import org.servicemix.jbi.resolver.ServiceAndEndpointNameResolver;
+import org.w3._2005._03.addressing.EndpointReferenceType;
+
+public abstract class AbstractWSAClient {
+
+	private EndpointReferenceType endpoint;
+	private EndpointResolver resolver;
+	private ServiceMixClient client;
+	
+	public AbstractWSAClient() {
+	}
+	
+	public AbstractWSAClient(EndpointReferenceType endpoint, ServiceMixClient client) {
+		this.endpoint = endpoint;
+		this.resolver = resolveWSA(endpoint);
+		this.client = client;
+	}
+
+	protected EndpointResolver resolveWSA(EndpointReferenceType ref) {
+		String[] parts = split(ref.getAddress().getValue());
+		return new ServiceAndEndpointNameResolver(new QName(parts[0], parts[1]), parts[2]);
+	}
+
+    protected String[] split(String uri) {
+		char sep;
+		if (uri.indexOf('/') > 0) {
+			sep = '/';
+		} else {
+			sep = ':';
+		}
+		int idx1 = uri.lastIndexOf(sep);
+		int idx2 = uri.lastIndexOf(sep, idx1 - 1);
+		String epName = uri.substring(idx1 + 1);
+		String svcName = uri.substring(idx2 + 1, idx1);
+		String nsUri   = uri.substring(0, idx2);
+    	return new String[] { nsUri, svcName, epName };
+    }
+
+	public EndpointReferenceType getEndpoint() {
+		return endpoint;
+	}
+
+	public void setEndpoint(EndpointReferenceType endpoint) {
+		this.endpoint = endpoint;
+	}
+
+	public EndpointResolver getResolver() {
+		return resolver;
+	}
+
+	public void setResolver(EndpointResolver resolver) {
+		this.resolver = resolver;
+	}
+	
+	public ServiceMixClient getClient() {
+		return client;
+	}
+
+	public void setClient(ServiceMixClient client) {
+		this.client = client;
+	}
+	protected Object request(Object request) throws JBIException {
+		return client.request(resolver, null, null, request);
+	}
+
+}

Copied: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/JAXBMarshaller.java
(from r357814, incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/JAXBMarshaller.java)
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/JAXBMarshaller.java?p2=incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/JAXBMarshaller.java&p1=incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/JAXBMarshaller.java&r1=357814&r2=358003&rev=358003&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/JAXBMarshaller.java
(original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/JAXBMarshaller.java
Tue Dec 20 06:47:29 2005
@@ -1,4 +1,4 @@
-package org.servicemix.wsn;
+package org.servicemix.wsn.client;
 
 import java.io.StringWriter;
 

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java?rev=358003&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java
(added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java
Tue Dec 20 06:47:29 2005
@@ -0,0 +1,109 @@
+package org.servicemix.wsn.client;
+
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.namespace.QName;
+
+import org.oasis_open.docs.wsn.b_1.CreatePullPoint;
+import org.oasis_open.docs.wsn.b_1.CreatePullPointResponse;
+import org.oasis_open.docs.wsn.b_1.FilterType;
+import org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_1.Notify;
+import org.oasis_open.docs.wsn.b_1.QueryExpressionType;
+import org.oasis_open.docs.wsn.b_1.Subscribe;
+import org.oasis_open.docs.wsn.b_1.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_1.TopicExpressionType;
+import org.oasis_open.docs.wsn.br_1.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse;
+import org.servicemix.client.DefaultServiceMixClient;
+import org.servicemix.client.ServiceMixClient;
+import org.servicemix.client.ServiceMixClientFacade;
+import org.servicemix.jbi.container.JBIContainer;
+import org.servicemix.jbi.resolver.EndpointResolver;
+import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
+import org.servicemix.wsn.AbstractSubscription;
+import org.w3._2005._03.addressing.EndpointReferenceType;
+
+public class NotificationBroker {
+
+	public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification",
"NotificationBroker"); 
+	
+	private ServiceMixClient client;
+	private EndpointResolver resolver;
+
+	
+	public NotificationBroker(ComponentContext context) {
+		this.client = new ServiceMixClientFacade(context);
+		resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+	}
+	
+	public NotificationBroker(JBIContainer container) throws JBIException, JAXBException {
+		DefaultServiceMixClient client = new DefaultServiceMixClient(container);
+		client.setMarshaler(new JAXBMarshaller(JAXBContext.newInstance(Subscribe.class)));
+		this.client = client;
+		resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+	}
+	
+	public NotificationBroker(ServiceMixClient client) {
+		this.client = client;
+		resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+	}
+
+	public void notify(String topic, Object msg) throws JBIException {
+		Notify notify = new Notify();
+		NotificationMessageHolderType holder = new NotificationMessageHolderType();
+		if (topic != null) {
+			TopicExpressionType topicExp = new TopicExpressionType();
+			topicExp.getContent().add(topic);
+			holder.setTopic(topicExp);
+		}
+		holder.setMessage(new NotificationMessageHolderType.Message());
+		holder.getMessage().setAny(msg);
+		notify.getNotificationMessage().add(holder);
+		client.send(resolver, null, null, notify);
+	}
+
+	public Subscription subscribe(EndpointReferenceType consumer, 
+			  					  String topic,
+			  					  String xpath) throws JBIException {
+		
+		Subscribe subscribeRequest = new Subscribe();
+		subscribeRequest.setConsumerReference(consumer);
+		subscribeRequest.setFilter(new FilterType());
+		if (topic != null) {
+			TopicExpressionType topicExp = new TopicExpressionType();
+			topicExp.getContent().add(topic);
+			subscribeRequest.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
TopicExpressionType.class, topicExp));
+		}
+		if (xpath != null) {
+			QueryExpressionType xpathExp = new QueryExpressionType();
+			xpathExp.setDialect(AbstractSubscription.XPATH1_URI);
+			xpathExp.getContent().add(xpath);
+			subscribeRequest.getFilter().getAny().add(new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT,
QueryExpressionType.class, xpathExp));
+		}
+		SubscribeResponse response = (SubscribeResponse) client.request(resolver, null, null, subscribeRequest);
+		return new Subscription(response.getSubscriptionReference(), client);
+	}
+
+	public GetCurrentMessageResponse getCurrentMessage(String topic) throws JBIException {
+		return null;
+	}
+
+	public Publisher registerPublisher(EndpointReferenceType publisherReference,
+									   String topic) throws JBIException {
+		
+		RegisterPublisher registerPublisherRequest = new RegisterPublisher();
+		RegisterPublisherResponse response = (RegisterPublisherResponse) client.request(resolver,
null, null, registerPublisherRequest);
+		return new Publisher(response.getPublisherRegistrationReference(), client);
+	}
+
+	public PullPoint createPullPoint() throws JBIException {
+		CreatePullPointResponse response = (CreatePullPointResponse) client.request(resolver, null,
null, new CreatePullPoint());
+		return new PullPoint(response.getPullPoint(), client);
+	}
+
+}

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java?rev=358003&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java
(added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java
Tue Dec 20 06:47:29 2005
@@ -0,0 +1,12 @@
+package org.servicemix.wsn.client;
+
+import org.servicemix.client.ServiceMixClient;
+import org.w3._2005._03.addressing.EndpointReferenceType;
+
+public class Publisher extends AbstractWSAClient {
+
+	public Publisher(EndpointReferenceType publisherRegistrationReference, ServiceMixClient
client) {
+		super(publisherRegistrationReference, client);
+	}
+
+}

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/PullPoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/PullPoint.java?rev=358003&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/PullPoint.java
(added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/PullPoint.java
Tue Dec 20 06:47:29 2005
@@ -0,0 +1,32 @@
+package org.servicemix.wsn.client;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import javax.jbi.JBIException;
+
+import org.oasis_open.docs.wsn.b_1.Destroy;
+import org.oasis_open.docs.wsn.b_1.GetMessages;
+import org.oasis_open.docs.wsn.b_1.GetMessagesResponse;
+import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
+import org.servicemix.client.ServiceMixClient;
+import org.w3._2005._03.addressing.EndpointReferenceType;
+
+public class PullPoint extends AbstractWSAClient {
+
+	public PullPoint(EndpointReferenceType pullPoint, ServiceMixClient client) {
+		super(pullPoint, client);
+	}
+
+	public List<NotificationMessageHolderType> getMessages(int max) throws JBIException
{
+		GetMessages getMessages = new GetMessages();
+		getMessages.setMaximumNumber(BigInteger.valueOf(max));
+		GetMessagesResponse response = (GetMessagesResponse) request(getMessages);
+		return response.getNotificationMessage();
+	}
+	
+	public void destroy() throws JBIException {
+		request(new Destroy());
+	}
+	
+}

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Subscription.java?rev=358003&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Subscription.java
(added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Subscription.java
Tue Dec 20 06:47:29 2005
@@ -0,0 +1,28 @@
+package org.servicemix.wsn.client;
+
+import javax.jbi.JBIException;
+
+import org.oasis_open.docs.wsn.b_1.PauseSubscription;
+import org.oasis_open.docs.wsn.b_1.ResumeSubscription;
+import org.oasis_open.docs.wsn.b_1.Unsubscribe;
+import org.servicemix.client.ServiceMixClient;
+import org.w3._2005._03.addressing.EndpointReferenceType;
+
+public class Subscription extends AbstractWSAClient {
+
+	public Subscription(EndpointReferenceType subscriptionReference, ServiceMixClient client)
{
+		super(subscriptionReference, client);
+	}
+	
+	public void pause() throws JBIException {
+		request(new PauseSubscription());
+	}
+	
+	public void resume() throws JBIException {
+		request(new ResumeSubscription());
+	}
+	
+	public void unsubscribe() throws JBIException {
+		request(new Unsubscribe());
+	}
+}

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java?rev=358003&r1=358002&r2=358003&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
(original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
Tue Dec 20 06:47:29 2005
@@ -1,11 +1,9 @@
 package org.servicemix.wsn.component;
 
 import java.io.StringReader;
-import java.math.BigInteger;
 import java.util.List;
 
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
+import javax.jbi.JBIException;
 import javax.xml.namespace.QName;
 import javax.xml.parsers.DocumentBuilder;
 
@@ -13,27 +11,15 @@
 
 import org.activemq.ActiveMQConnectionFactory;
 import org.activemq.broker.BrokerService;
-import org.oasis_open.docs.wsn.b_1.CreatePullPoint;
-import org.oasis_open.docs.wsn.b_1.CreatePullPointResponse;
-import org.oasis_open.docs.wsn.b_1.FilterType;
-import org.oasis_open.docs.wsn.b_1.GetMessages;
-import org.oasis_open.docs.wsn.b_1.GetMessagesResponse;
 import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
 import org.oasis_open.docs.wsn.b_1.Notify;
-import org.oasis_open.docs.wsn.b_1.QueryExpressionType;
-import org.oasis_open.docs.wsn.b_1.Subscribe;
-import org.oasis_open.docs.wsn.b_1.SubscribeResponse;
-import org.oasis_open.docs.wsn.b_1.TopicExpressionType;
-import org.servicemix.client.DefaultServiceMixClient;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.jaxp.SourceTransformer;
-import org.servicemix.jbi.resolver.EndpointResolver;
-import org.servicemix.jbi.resolver.ServiceAndEndpointNameResolver;
-import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
 import org.servicemix.tck.ReceiverComponent;
-import org.servicemix.wsn.AbstractSubscription;
-import org.servicemix.wsn.JAXBMarshaller;
+import org.servicemix.wsn.client.NotificationBroker;
+import org.servicemix.wsn.client.PullPoint;
+import org.servicemix.wsn.client.Subscription;
 import org.w3._2005._03.addressing.AttributedURIType;
 import org.w3._2005._03.addressing.EndpointReferenceType;
 import org.w3c.dom.Document;
@@ -45,15 +31,14 @@
 	public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification",
"NotificationBroker"); 
 	
 	private JBIContainer jbi;
-	private BrokerService broker;
-	private JAXBContext context;
-	private DefaultServiceMixClient client;
+	private BrokerService jmsBroker;
+	private NotificationBroker wsnBroker;
 	
 	protected void setUp() throws Exception {
-		broker = new BrokerService();
-		broker.setPersistent(false);
-		broker.addConnector("vm://localhost");
-		broker.start();
+		jmsBroker = new BrokerService();
+		jmsBroker.setPersistent(false);
+		jmsBroker.addConnector("vm://localhost");
+		jmsBroker.start();
 
 		jbi = new JBIContainer();
 		jbi.setEmbedded(true);
@@ -67,29 +52,35 @@
 		as.setComponent(component);
 		jbi.activateComponent(as);
 		
-		context = JAXBContext.newInstance(Subscribe.class, SubscribeResponse.class);
-
-		client = new DefaultServiceMixClient(jbi);
-		client.setMarshaler(new JAXBMarshaller(context));
+		wsnBroker = new NotificationBroker(jbi);
 	}
 	
 	protected void tearDown() throws Exception {
 		if (jbi != null) {
 			jbi.shutDown();
 		}
-		if (broker != null) {
-			broker.stop();
+		if (jmsBroker != null) {
+			jmsBroker.stop();
 		}
 	}
 	
-	public void testNB() throws Exception {
+	public void testInvalidSubscribription() throws Exception {
+		try {
+			wsnBroker.subscribe(null, null, null);
+			fail("Expected an exception");
+		} catch (JBIException e) {
+			// ok
+		}
+	}
+	
+	public void testNotify() throws Exception {
 		ReceiverComponent receiver = new ReceiverComponent();
 		jbi.activateComponent(receiver, "receiver");
 		
 		EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
-		EndpointReferenceType subscription = subscribe(consumer, "myTopic", null);
-
-		notify("myTopic", parse("<hello>world</hello>"));
+		wsnBroker.subscribe(consumer, "myTopic", null);
+		
+		wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
 		// Wait for notification
 		Thread.sleep(50);
 		
@@ -99,15 +90,45 @@
 		Thread.sleep(50);
 	}
 
+	public void testPauseResume() throws Exception {
+		PullPoint pullPoint = wsnBroker.createPullPoint();
+		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(1, pullPoint.getMessages(0).size());
+
+		subscription.pause();
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(0, pullPoint.getMessages(0).size());
+
+		subscription.resume();
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(1, pullPoint.getMessages(0).size());
+
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+
 	public void testPull() throws Exception {
-		EndpointReferenceType pullPoint = createPullPoint();
-		subscribe(pullPoint, "myTopic", null);
+		PullPoint pullPoint = wsnBroker.createPullPoint();
+		wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
 		
-		notify("myTopic", new Notify());
+		wsnBroker.notify("myTopic", new Notify());
 		// Wait for notification
 		Thread.sleep(50);
 		
-		List<NotificationMessageHolderType> msgs = getMessages(pullPoint, 0);
+		List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
 		assertNotNull(msgs);
 		assertEquals(1, msgs.size());
 
@@ -116,31 +137,31 @@
 	}
 	
 	public void testPullWithFilter() throws Exception {
-		EndpointReferenceType pullPoint1 = createPullPoint();
-		EndpointReferenceType pullPoint2 = createPullPoint();
-		EndpointReferenceType subscription1 = subscribe(pullPoint1, "myTopic", "@type = 'a'");
-		EndpointReferenceType subscription2 = subscribe(pullPoint2, "myTopic", "@type = 'b'");
+		PullPoint pullPoint1 = wsnBroker.createPullPoint();
+		PullPoint pullPoint2 = wsnBroker.createPullPoint();
+		wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
+		wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
 		
-		notify("myTopic", parse("<msg type='a'/>"));
+		wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
 		// Wait for notification
 		Thread.sleep(50);
 
-		assertEquals(1, getMessages(pullPoint1, 0).size());
-		assertEquals(0, getMessages(pullPoint2, 0).size());
+		assertEquals(1, pullPoint1.getMessages(0).size());
+		assertEquals(0, pullPoint2.getMessages(0).size());
 		
-		notify("myTopic", parse("<msg type='b'/>"));
+		wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
 		// Wait for notification
 		Thread.sleep(50);
 
-		assertEquals(0, getMessages(pullPoint1, 0).size());
-		assertEquals(1, getMessages(pullPoint2, 0).size());
-		
-		notify("myTopic", parse("<msg type='c'/>"));
+		assertEquals(0, pullPoint1.getMessages(0).size());
+		assertEquals(1, pullPoint2.getMessages(0).size());
+
+		wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
 		// Wait for notification
 		Thread.sleep(50);
 
-		assertEquals(0, getMessages(pullPoint1, 0).size());
-		assertEquals(0, getMessages(pullPoint2, 0).size());
+		assertEquals(0, pullPoint1.getMessages(0).size());
+		assertEquals(0, pullPoint2.getMessages(0).size());
 	}
 	
 	protected Element parse(String txt) throws Exception {
@@ -156,75 +177,4 @@
 		epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/"
+ endpoint);
 		return epr;
 	}
-	
-	protected EndpointReferenceType createPullPoint() throws Exception {
-		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
-		CreatePullPointResponse response = (CreatePullPointResponse) client.request(resolver, null,
null, new CreatePullPoint());
-		return response.getPullPoint();
-	}
-	
-	protected EndpointReferenceType subscribe(EndpointReferenceType consumer, 
-											  String topic,
-											  String xpath) throws Exception {
-		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
-		Subscribe subscribeRequest = new Subscribe();
-		subscribeRequest.setConsumerReference(consumer);
-		subscribeRequest.setFilter(new FilterType());
-		if (topic != null) {
-			TopicExpressionType topicExp = new TopicExpressionType();
-			topicExp.getContent().add(topic);
-			subscribeRequest.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
TopicExpressionType.class, topicExp));
-		}
-		if (xpath != null) {
-			QueryExpressionType xpathExp = new QueryExpressionType();
-			xpathExp.setDialect(AbstractSubscription.XPATH1_URI);
-			xpathExp.getContent().add(xpath);
-			subscribeRequest.getFilter().getAny().add(new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT,
QueryExpressionType.class, xpathExp));
-		}
-		SubscribeResponse response = (SubscribeResponse) client.request(resolver, null, null, subscribeRequest);
-		return response.getSubscriptionReference();
-	}
-	
-	protected void notify(String topic, Object msg) throws Exception {
-		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
-		Notify notify = new Notify();
-		NotificationMessageHolderType holder = new NotificationMessageHolderType();
-		if (topic != null) {
-			TopicExpressionType topicExp = new TopicExpressionType();
-			topicExp.getContent().add(topic);
-			holder.setTopic(topicExp);
-		}
-		holder.setMessage(new NotificationMessageHolderType.Message());
-		holder.getMessage().setAny(msg);
-		notify.getNotificationMessage().add(holder);
-		client.send(resolver, null, null, notify);
-	}
-	
-	protected List<NotificationMessageHolderType> getMessages(EndpointReferenceType pullPoint,
int max) throws Exception {
-		EndpointResolver resolver = resolveWSA(pullPoint);
-		GetMessages getMessages = new GetMessages();
-		getMessages.setMaximumNumber(BigInteger.valueOf(max));
-		GetMessagesResponse response = (GetMessagesResponse) client.request(resolver, null, null,
getMessages);
-		return response.getNotificationMessage();
-	}
-	
-	protected EndpointResolver resolveWSA(EndpointReferenceType ref) {
-		String[] parts = split(ref.getAddress().getValue());
-		return new ServiceAndEndpointNameResolver(new QName(parts[0], parts[1]), parts[2]);
-	}
-
-    protected String[] split(String uri) {
-		char sep;
-		if (uri.indexOf('/') > 0) {
-			sep = '/';
-		} else {
-			sep = ':';
-		}
-		int idx1 = uri.lastIndexOf(sep);
-		int idx2 = uri.lastIndexOf(sep, idx1 - 1);
-		String epName = uri.substring(idx1 + 1);
-		String svcName = uri.substring(idx2 + 1, idx1);
-		String nsUri   = uri.substring(0, idx2);
-    	return new String[] { nsUri, svcName, epName };
-    }
 }



Mime
View raw message