servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r358120 - in /incubator/servicemix/trunk/servicemix-wsn2005/src: main/java/org/servicemix/wsn/ main/java/org/servicemix/wsn/client/ main/java/org/servicemix/wsn/component/ main/java/org/servicemix/wsn/jbi/ main/java/org/servicemix/wsn/jms/ ...
Date Tue, 20 Dec 2005 22:00:11 GMT
Author: gnodet
Date: Tue Dec 20 13:59:53 2005
New Revision: 358120

URL: http://svn.apache.org/viewcvs?rev=358120&view=rev
Log:
Add demand-based publishing to wsn2005 component

Added:
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiPublisher.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/InvalidTopicException.java
Modified:
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractNotificationBroker.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPublisher.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPullPoint.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractSubscription.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfiguration.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfigurationMBean.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNLifeCycle.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiNotificationBroker.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsNotificationBroker.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsPublisher.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsTopicExpressionConverter.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/SubscriptionTest.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractNotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractNotificationBroker.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractNotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractNotificationBroker.java Tue Dec 20 13:59:53 2005
@@ -70,8 +70,8 @@
 	}
 
     public void init() throws Exception {
-        anonymousPublisher = createPublisher("Anonymous", null);
         register();
+        anonymousPublisher = createPublisher("Anonymous");
         anonymousPublisher.register();
     }
     
@@ -148,9 +148,9 @@
 		AbstractSubscription subscription = null;
 		boolean success = false;
 		try {
-			subscription = createSubcription(idGenerator.generateSanitizedId(), subscribeRequest);
+			subscription = createSubcription(idGenerator.generateSanitizedId());
 			subscriptions.put(subscription.getAddress(), subscription);
-			subscription.subscribe(subscribeRequest);
+			subscription.create(subscribeRequest);
 			subscription.register();
 			SubscribeResponse response = new SubscribeResponse();
 			response.setSubscriptionReference(createEndpointReference(subscription.getAddress()));
@@ -217,9 +217,10 @@
     	AbstractPublisher publisher = null;
     	boolean success = false;
     	try {
-    		publisher = createPublisher(idGenerator.generateSanitizedId(), registerPublisherRequest);
+    		publisher = createPublisher(idGenerator.generateSanitizedId());
     		publishers.put(publisher.getAddress(), publisher);
     		publisher.register();
+    		publisher.create(registerPublisherRequest);
     		RegisterPublisherResponse response = new RegisterPublisherResponse(); 
     		response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress()));
     		success = true;
@@ -258,8 +259,9 @@
     	AbstractPullPoint pullPoint = null;
     	boolean success = false;
     	try {
-    		pullPoint = createPullPoint(idGenerator.generateSanitizedId(), createPullPointRequest);
+    		pullPoint = createPullPoint(idGenerator.generateSanitizedId());
     		pullPoints.put(pullPoint.getAddress(), pullPoint);
+    		pullPoint.create(createPullPointRequest);
     		pullPoint.register();
     		CreatePullPointResponse response = new CreatePullPointResponse(); 
     		response.setPullPoint(createEndpointReference(pullPoint.getAddress()));
@@ -288,10 +290,10 @@
 		return epr;
 	}
 
-	protected abstract AbstractPublisher createPublisher(String name, RegisterPublisher registerPublisherRequest);
+	protected abstract AbstractPublisher createPublisher(String name);
 	
-	protected abstract AbstractPullPoint createPullPoint(String name, CreatePullPoint createPullPointRequest);
+	protected abstract AbstractPullPoint createPullPoint(String name);
 	
-	protected abstract AbstractSubscription createSubcription(String name, Subscribe subscribeRequest);
+	protected abstract AbstractSubscription createSubcription(String name);
 
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPublisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPublisher.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPublisher.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPublisher.java Tue Dec 20 13:59:53 2005
@@ -1,22 +1,37 @@
 package org.servicemix.wsn;
 
+import java.util.List;
+
 import javax.jws.WebMethod;
 import javax.jws.WebParam;
 import javax.jws.WebResult;
 import javax.jws.WebService;
 
+import org.oasis_open.docs.wsn.b_1.InvalidTopicExpressionFaultType;
 import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_1.TopicExpressionType;
 import org.oasis_open.docs.wsn.br_1.Destroy;
 import org.oasis_open.docs.wsn.br_1.DestroyResponse;
+import org.oasis_open.docs.wsn.br_1.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_1.RegisterPublisher;
 import org.oasis_open.docs.wsn.br_1.ResourceNotDestroyedFaultType;
+import org.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
 import org.servicemix.wsn.jaxws.PublisherRegistrationManager;
+import org.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
 import org.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
 import org.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.w3._2005._03.addressing.EndpointReferenceType;
 
 @WebService(endpointInterface = "org.servicemix.wsn.jaxws.PublisherRegistrationManager")
 public abstract class AbstractPublisher extends AbstractEndpoint 
 									    implements PublisherRegistrationManager {
 
+	protected EndpointReferenceType publisherReference;
+	protected boolean demand;
+	protected List<TopicExpressionType> topic;
+	
 	public AbstractPublisher(String name) {
 		super(name);
 	}
@@ -54,4 +69,31 @@
 	protected String createAddress() {
 		return "http://servicemix.org/wsnotification/Publisher/" + getName();
 	}
+
+	public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+		validatePublisher(registerPublisherRequest);
+		start();
+	}
+	
+	protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+		// Check consumer reference
+		publisherReference = registerPublisherRequest.getPublisherReference();
+		// Check topic
+		topic = registerPublisherRequest.getTopic();
+		// Check demand based
+		demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue() : false;
+		// Check all parameters
+		if (publisherReference == null) {
+			PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+			throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
+		}
+		if (demand) {
+			if (topic == null || topic.size() == 0) {
+				InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+				throw new InvalidTopicExpressionFault("Must specify at least one topic for demand-based publishing", fault);
+			}
+		}
+	}
+	
+	protected abstract void start() throws PublisherRegistrationFailedFault;
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPullPoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPullPoint.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPullPoint.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractPullPoint.java Tue Dec 20 13:59:53 2005
@@ -11,6 +11,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.oasis_open.docs.wsn.b_1.CreatePullPoint;
 import org.oasis_open.docs.wsn.b_1.Destroy;
 import org.oasis_open.docs.wsn.b_1.DestroyResponse;
 import org.oasis_open.docs.wsn.b_1.GetMessages;
@@ -21,6 +22,7 @@
 import org.servicemix.wsn.jaxws.NotificationConsumer;
 import org.servicemix.wsn.jaxws.PullPoint;
 import org.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.servicemix.wsn.jaxws.UnableToCreatePullPoint;
 import org.servicemix.wsn.jaxws.UnableToDestroyPullPoint;
 
 @WebService(endpointInterface = "org.servicemix.wsn.PullPointConsumer")
@@ -88,6 +90,9 @@
     	log.debug("Destroy");
     	destroy();
     	return new DestroyResponse();
+    }
+    
+    public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPoint {
     }
     
 	protected abstract void store(NotificationMessageHolderType messageHolder);

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractSubscription.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/AbstractSubscription.java Tue Dec 20 13:59:53 2005
@@ -236,7 +236,12 @@
 		this.terminationTime = terminationTime;
 	}
 	
-	public abstract void subscribe(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, InvalidUseRawValueFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault;
+	public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, InvalidUseRawValueFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+		validateSubscription(subscribeRequest);
+		start();
+	}
+	
+	protected abstract void start() throws SubscribeCreationFailedFault;
 	
 	protected abstract void pause() throws PauseFailedFault;
 	

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/AbstractWSAClient.java Tue Dec 20 13:59:53 2005
@@ -6,6 +6,7 @@
 import org.servicemix.client.ServiceMixClient;
 import org.servicemix.jbi.resolver.EndpointResolver;
 import org.servicemix.jbi.resolver.ServiceAndEndpointNameResolver;
+import org.w3._2005._03.addressing.AttributedURIType;
 import org.w3._2005._03.addressing.EndpointReferenceType;
 
 public abstract class AbstractWSAClient {
@@ -23,12 +24,20 @@
 		this.client = client;
 	}
 
-	protected EndpointResolver resolveWSA(EndpointReferenceType ref) {
-		String[] parts = split(ref.getAddress().getValue());
+	public static EndpointReferenceType createWSA(String address) {
+		EndpointReferenceType epr = new EndpointReferenceType();
+		AttributedURIType attUri = new AttributedURIType();
+		attUri.setValue(address);
+		epr.setAddress(attUri);
+		return epr;
+	}
+	
+	public static EndpointResolver resolveWSA(EndpointReferenceType ref) {
+		String[] parts = splitUri(ref.getAddress().getValue());
 		return new ServiceAndEndpointNameResolver(new QName(parts[0], parts[1]), parts[2]);
 	}
 
-    protected String[] split(String uri) {
+	public static String[] splitUri(String uri) {
 		char sep;
 		if (uri.indexOf('/') > 0) {
 			sep = '/';
@@ -66,8 +75,13 @@
 	public void setClient(ServiceMixClient client) {
 		this.client = client;
 	}
+	
 	protected Object request(Object request) throws JBIException {
 		return client.request(resolver, null, null, request);
+	}
+	
+	protected void send(Object request) throws JBIException {
+		client.sendSync(resolver, null, null, request);
 	}
 
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/NotificationBroker.java Tue Dec 20 13:59:53 2005
@@ -1,5 +1,7 @@
 package org.servicemix.wsn.client;
 
+import java.util.List;
+
 import javax.jbi.JBIException;
 import javax.jbi.component.ComponentContext;
 import javax.xml.bind.JAXBContext;
@@ -10,6 +12,7 @@
 import org.oasis_open.docs.wsn.b_1.CreatePullPoint;
 import org.oasis_open.docs.wsn.b_1.CreatePullPointResponse;
 import org.oasis_open.docs.wsn.b_1.FilterType;
+import org.oasis_open.docs.wsn.b_1.GetCurrentMessage;
 import org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse;
 import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
 import org.oasis_open.docs.wsn.b_1.Notify;
@@ -23,34 +26,56 @@
 import org.servicemix.client.ServiceMixClient;
 import org.servicemix.client.ServiceMixClientFacade;
 import org.servicemix.jbi.container.JBIContainer;
-import org.servicemix.jbi.resolver.EndpointResolver;
 import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
 import org.servicemix.wsn.AbstractSubscription;
 import org.w3._2005._03.addressing.EndpointReferenceType;
 
-public class NotificationBroker {
+public class NotificationBroker extends AbstractWSAClient {
 
-	public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification", "NotificationBroker"); 
+	public static String WSN_URI = "http://servicemix.org/wsnotification";
+	public static String WSN_SERVICE = "NotificationBroker";
 	
-	private ServiceMixClient client;
-	private EndpointResolver resolver;
-
+	public static QName NOTIFICATION_BROKER = new QName(WSN_URI, WSN_SERVICE); 
 	
-	public NotificationBroker(ComponentContext context) {
-		this.client = new ServiceMixClientFacade(context);
-		resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+	public NotificationBroker(ComponentContext context) throws JAXBException {
+		ServiceMixClientFacade client = new ServiceMixClientFacade(context); 
+		client.setMarshaler(new JAXBMarshaller(JAXBContext.newInstance(Subscribe.class, RegisterPublisher.class)));
+		setClient(client);
+		setResolver(new ServiceNameEndpointResolver(NOTIFICATION_BROKER));
+	}
+	
+	public NotificationBroker(ComponentContext context, String brokerName) throws JAXBException {
+		ServiceMixClientFacade client = new ServiceMixClientFacade(context); 
+		client.setMarshaler(new JAXBMarshaller(JAXBContext.newInstance(Subscribe.class, RegisterPublisher.class)));
+		setClient(client);
+		setEndpoint(createWSA(WSN_URI + "/" + WSN_SERVICE + "/" + brokerName));
+		setResolver(resolveWSA(getEndpoint()));
 	}
 	
 	public NotificationBroker(JBIContainer container) throws JBIException, JAXBException {
 		DefaultServiceMixClient client = new DefaultServiceMixClient(container);
+		client.setMarshaler(new JAXBMarshaller(JAXBContext.newInstance(Subscribe.class, RegisterPublisher.class)));
+		setClient(client);
+		setResolver(new ServiceNameEndpointResolver(NOTIFICATION_BROKER));
+	}
+	
+	public NotificationBroker(JBIContainer container, String brokerName) throws JBIException, JAXBException {
+		DefaultServiceMixClient client = new DefaultServiceMixClient(container);
 		client.setMarshaler(new JAXBMarshaller(JAXBContext.newInstance(Subscribe.class)));
-		this.client = client;
-		resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+		setClient(client);
+		setEndpoint(createWSA(WSN_URI + "/" + WSN_SERVICE + "/" + brokerName));
+		setResolver(resolveWSA(getEndpoint()));
 	}
 	
 	public NotificationBroker(ServiceMixClient client) {
-		this.client = client;
-		resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+		setClient(client);
+		setResolver(new ServiceNameEndpointResolver(NOTIFICATION_BROKER));
+	}
+
+	public NotificationBroker(ServiceMixClient client, String brokerName) {
+		setClient(client);
+		setEndpoint(createWSA(WSN_URI + "/" + WSN_SERVICE + "/" + brokerName));
+		setResolver(resolveWSA(getEndpoint()));
 	}
 
 	public void notify(String topic, Object msg) throws JBIException {
@@ -64,7 +89,7 @@
 		holder.setMessage(new NotificationMessageHolderType.Message());
 		holder.getMessage().setAny(msg);
 		notify.getNotificationMessage().add(holder);
-		client.send(resolver, null, null, notify);
+		send(notify);
 	}
 
 	public Subscription subscribe(EndpointReferenceType consumer, 
@@ -85,25 +110,40 @@
 			xpathExp.getContent().add(xpath);
 			subscribeRequest.getFilter().getAny().add(new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT, QueryExpressionType.class, xpathExp));
 		}
-		SubscribeResponse response = (SubscribeResponse) client.request(resolver, null, null, subscribeRequest);
-		return new Subscription(response.getSubscriptionReference(), client);
+		SubscribeResponse response = (SubscribeResponse) request(subscribeRequest);
+		return new Subscription(response.getSubscriptionReference(), getClient());
 	}
 
-	public GetCurrentMessageResponse getCurrentMessage(String topic) throws JBIException {
-		return null;
+	public List<Object> getCurrentMessage(String topic) throws JBIException {
+		GetCurrentMessage getCurrentMessageRequest = new GetCurrentMessage();
+		if (topic != null) {
+			TopicExpressionType topicExp = new TopicExpressionType();
+			topicExp.getContent().add(topic);
+			getCurrentMessageRequest.setTopic(topicExp);
+		}
+		GetCurrentMessageResponse response = (GetCurrentMessageResponse) request(getCurrentMessageRequest);
+		return response.getAny();
 	}
 
 	public Publisher registerPublisher(EndpointReferenceType publisherReference,
-									   String topic) throws JBIException {
+									   String topic,
+									   boolean demand) throws JBIException {
 		
 		RegisterPublisher registerPublisherRequest = new RegisterPublisher();
-		RegisterPublisherResponse response = (RegisterPublisherResponse) client.request(resolver, null, null, registerPublisherRequest);
-		return new Publisher(response.getPublisherRegistrationReference(), client);
+		registerPublisherRequest.setPublisherReference(publisherReference);
+		if (topic != null) {
+			TopicExpressionType topicExp = new TopicExpressionType();
+			topicExp.getContent().add(topic);
+			registerPublisherRequest.getTopic().add(topicExp);
+		}
+		registerPublisherRequest.setDemand(Boolean.valueOf(demand));
+		RegisterPublisherResponse response = (RegisterPublisherResponse) request(registerPublisherRequest);
+		return new Publisher(response.getPublisherRegistrationReference(), getClient());
 	}
 
 	public PullPoint createPullPoint() throws JBIException {
-		CreatePullPointResponse response = (CreatePullPointResponse) client.request(resolver, null, null, new CreatePullPoint());
-		return new PullPoint(response.getPullPoint(), client);
+		CreatePullPointResponse response = (CreatePullPointResponse) request(new CreatePullPoint());
+		return new PullPoint(response.getPullPoint(), getClient());
 	}
 
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/client/Publisher.java Tue Dec 20 13:59:53 2005
@@ -1,5 +1,8 @@
 package org.servicemix.wsn.client;
 
+import javax.jbi.JBIException;
+
+import org.oasis_open.docs.wsn.br_1.Destroy;
 import org.servicemix.client.ServiceMixClient;
 import org.w3._2005._03.addressing.EndpointReferenceType;
 
@@ -9,4 +12,8 @@
 		super(publisherRegistrationReference, client);
 	}
 
+	public void destroy() throws JBIException {
+		request(new Destroy());
+	}
+	
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfiguration.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfiguration.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfiguration.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfiguration.java Tue Dec 20 13:59:53 2005
@@ -6,6 +6,8 @@
 	private String jndiProviderURL;
 	private String jndiConnectionFactoryName;
 	
+	private String brokerName = "Broker";
+	
 	public String getInitialContextFactory() {
 		return initialContextFactory;
 	}
@@ -23,5 +25,11 @@
 	}
 	public void setJndiProviderURL(String jndiProviderURL) {
 		this.jndiProviderURL = jndiProviderURL;
+	}
+	public String getBrokerName() {
+		return brokerName;
+	}
+	public void setBrokerName(String brokerName) {
+		this.brokerName = brokerName;
 	}
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfigurationMBean.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfigurationMBean.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfigurationMBean.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNConfigurationMBean.java Tue Dec 20 13:59:53 2005
@@ -11,4 +11,6 @@
 	String getJndiConnectionFactoryName();
 	void setJndiConnectionFactoryName(String jndiConnectionFactoryName);
 	
+	String getBrokerName();
+	void setBrokerName(String brokerName);
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNLifeCycle.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNLifeCycle.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/component/WSNLifeCycle.java Tue Dec 20 13:59:53 2005
@@ -35,7 +35,7 @@
 	@Override
 	protected void doInit() throws Exception {
 		super.doInit();
-		notificationBroker = new JbiNotificationBroker("Broker");
+		notificationBroker = new JbiNotificationBroker(configuration.getBrokerName());
 		notificationBroker.setContext(context);
 		notificationBroker.setManager(new WSNEndpointManager());
 		if (connectionFactory == null) {

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiNotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiNotificationBroker.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiNotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiNotificationBroker.java Tue Dec 20 13:59:53 2005
@@ -3,6 +3,7 @@
 import javax.jbi.component.ComponentContext;
 
 import org.servicemix.wsn.jms.JmsNotificationBroker;
+import org.servicemix.wsn.jms.JmsPublisher;
 import org.servicemix.wsn.jms.JmsSubscription;
 
 public class JbiNotificationBroker extends JmsNotificationBroker {
@@ -18,6 +19,14 @@
 		JbiSubscription subscription = new JbiSubscription(name);
 		subscription.setContext(context);
 		return subscription;
+	}
+
+	@Override
+	protected JmsPublisher createJmsPublisher(String name) {
+		JbiPublisher publisher = new JbiPublisher(name);
+		publisher.setContext(context);
+		publisher.setNotificationBrokerAddress(address);
+		return publisher;
 	}
 
 	public ComponentContext getContext() {

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiPublisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiPublisher.java?rev=358120&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiPublisher.java (added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jbi/JbiPublisher.java Tue Dec 20 13:59:53 2005
@@ -0,0 +1,97 @@
+package org.servicemix.wsn.jbi;
+
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.bind.JAXBException;
+import javax.xml.namespace.QName;
+
+import org.oasis_open.docs.wsn.br_1.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_1.RegisterPublisher;
+import org.servicemix.wsn.client.AbstractWSAClient;
+import org.servicemix.wsn.client.NotificationBroker;
+import org.servicemix.wsn.client.Subscription;
+import org.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
+import org.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
+import org.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.servicemix.wsn.jms.JmsPublisher;
+
+public class JbiPublisher extends JmsPublisher {
+
+	private ComponentContext context;
+	private ServiceEndpoint endpoint;
+	private String notificationBrokerAddress;
+	
+	public JbiPublisher(String name) {
+		super(name);
+	}
+
+	public void setContext(ComponentContext context) {
+		this.context = context;
+	}
+
+	public String getNotificationBrokerAddress() {
+		return notificationBrokerAddress;
+	}
+
+	public void setNotificationBrokerAddress(String notificationBrokerAddress) {
+		this.notificationBrokerAddress = notificationBrokerAddress;
+	}
+	
+	@Override
+	protected Object startSubscription() {
+		Subscription subscription = null;
+		try {
+			NotificationBroker broker = new NotificationBroker(context);
+			broker.setResolver(AbstractWSAClient.resolveWSA(publisherReference));
+			subscription = broker.subscribe(AbstractWSAClient.createWSA(notificationBrokerAddress), 
+														 "noTopic", null);
+		} catch (JBIException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		} catch (JAXBException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+		return subscription;
+	}
+
+	@Override
+	protected void destroySubscription(Object subscription) {
+		try {
+			((Subscription) subscription).unsubscribe();
+		} catch (JBIException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+	
+	@Override
+	protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+		super.validatePublisher(registerPublisherRequest);
+		String[] parts = split(publisherReference.getAddress().getValue());
+		endpoint = context.getEndpoint(new QName(parts[0], parts[1]), parts[2]);
+		if (endpoint == null) {
+			PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+			throw new PublisherRegistrationFailedFault("Unable to resolve consumer reference endpoint", fault);
+		}
+	}
+	
+    protected String[] split(String uri) {
+		char sep;
+		if (uri.indexOf('/') > 0) {
+			sep = '/';
+		} else {
+			sep = ':';
+		}
+		int idx1 = uri.lastIndexOf(sep);
+		int idx2 = uri.lastIndexOf(sep, idx1 - 1);
+		String epName = uri.substring(idx1 + 1);
+		String svcName = uri.substring(idx2 + 1, idx1);
+		String nsUri   = uri.substring(0, idx2);
+    	return new String[] { nsUri, svcName, epName };
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/InvalidTopicException.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/InvalidTopicException.java?rev=358120&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/InvalidTopicException.java (added)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/InvalidTopicException.java Tue Dec 20 13:59:53 2005
@@ -0,0 +1,23 @@
+package org.servicemix.wsn.jms;
+
+public class InvalidTopicException extends Exception {
+
+	private static final long serialVersionUID = -3708397351142080702L;
+
+	public InvalidTopicException() {
+		super();
+	}
+
+	public InvalidTopicException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public InvalidTopicException(String message) {
+		super(message);
+	}
+
+	public InvalidTopicException(Throwable cause) {
+		super(cause);
+	}
+
+}

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsNotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsNotificationBroker.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsNotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsNotificationBroker.java Tue Dec 20 13:59:53 2005
@@ -3,9 +3,6 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 
-import org.oasis_open.docs.wsn.b_1.CreatePullPoint;
-import org.oasis_open.docs.wsn.b_1.Subscribe;
-import org.oasis_open.docs.wsn.br_1.RegisterPublisher;
 import org.servicemix.wsn.AbstractNotificationBroker;
 import org.servicemix.wsn.AbstractPublisher;
 import org.servicemix.wsn.AbstractPullPoint;
@@ -29,15 +26,15 @@
     }
 	
 	@Override
-	protected AbstractPublisher createPublisher(String name, RegisterPublisher registerPublisherRequest) {
-		JmsPublisher publisher = new JmsPublisher(name);
+	protected AbstractPublisher createPublisher(String name) {
+		JmsPublisher publisher = createJmsPublisher(name);
 		publisher.setManager(getManager());
 		publisher.setConnection(connection);
 		return publisher;
 	}
 
 	@Override
-	protected AbstractPullPoint createPullPoint(String name, CreatePullPoint createPullPointRequest) {
+	protected AbstractPullPoint createPullPoint(String name) {
 		JmsPullPoint pullPoint = new JmsPullPoint(name);
 		pullPoint.setManager(getManager());
 		pullPoint.setConnection(connection);
@@ -45,7 +42,7 @@
 	}
 
 	@Override
-	protected AbstractSubscription createSubcription(String name, Subscribe subscribeRequest) {
+	protected AbstractSubscription createSubcription(String name) {
 		JmsSubscription subscription = createJmsSubscription(name);
 		subscription.setManager(getManager());
 		subscription.setConnection(connection);
@@ -53,6 +50,8 @@
 	}
 	
 	protected abstract JmsSubscription createJmsSubscription(String name);
+
+	protected abstract JmsPublisher createJmsPublisher(String name);
 
 	public ConnectionFactory getConnectionFactory() {
 		return connectionFactory;

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsPublisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsPublisher.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsPublisher.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsPublisher.java Tue Dec 20 13:59:53 2005
@@ -11,20 +11,35 @@
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 
+import org.activemq.advisory.ConsumerEvent;
+import org.activemq.advisory.ConsumerEventSource;
+import org.activemq.advisory.ConsumerListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.oasis_open.docs.wsn.b_1.InvalidTopicExpressionFaultType;
 import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
 import org.oasis_open.docs.wsn.b_1.Notify;
+import org.oasis_open.docs.wsn.br_1.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_1.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_1.ResourceNotDestroyedFaultType;
 import org.servicemix.wsn.AbstractPublisher;
-import org.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
+import org.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
+import org.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
+import org.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.servicemix.wsn.jaxws.TopicNotSupportedFault;
 
-public class JmsPublisher extends AbstractPublisher {
+public abstract class JmsPublisher extends AbstractPublisher implements ConsumerListener {
 
 	private static Log log = LogFactory.getLog(JmsPublisher.class);
 	
 	private Connection connection;
 	private JmsTopicExpressionConverter topicConverter;
 	private JAXBContext jaxbContext;
+    private Topic jmsTopic;
+    private ConsumerEventSource advisory;
+    private Object subscription;
 
 	public JmsPublisher(String name) {
 		super(name);
@@ -36,6 +51,14 @@
 		}
 	}
 
+	public Connection getConnection() {
+		return connection;
+	}
+
+	public void setConnection(Connection connection) {
+		this.connection = connection;
+	}
+
 	@Override
 	public void notify(NotificationMessageHolderType messageHolder) {
 		Session session = null;
@@ -53,7 +76,7 @@
 			log.warn("Error dispatching message", e);
 		} catch (JAXBException e) {
 			log.warn("Error dispatching message", e);
-		} catch (TopicExpressionDialectUnknownFault e) {
+		} catch (InvalidTopicException e) {
 			log.warn("Error dispatching message", e);
 		} finally {
 			if (session != null) {
@@ -66,12 +89,63 @@
 		}
 	}
 
-	public Connection getConnection() {
-		return connection;
+	@Override
+	protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+		super.validatePublisher(registerPublisherRequest);
+		try {
+			jmsTopic = topicConverter.toActiveMQTopic(topic);
+		} catch (InvalidTopicException e) {
+			InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+			throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+		}
+	}
+	
+	@Override
+	protected void start() throws PublisherRegistrationFailedFault {
+		if (demand) {
+			try {
+				advisory = new ConsumerEventSource(connection, jmsTopic);
+				advisory.setConsumerListener(this);
+				advisory.start();
+			} catch (Exception e) {
+				PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+				throw new PublisherRegistrationFailedFault("Error starting demand-based publisher", fault, e);
+			}
+		}
 	}
 
-	public void setConnection(Connection connection) {
-		this.connection = connection;
+    protected void destroy() throws ResourceNotDestroyedFault {
+		try {
+			if (advisory != null) {
+				advisory.stop();
+	    	}
+		} catch (Exception e) {
+			ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
+			throw new ResourceNotDestroyedFault("Error destroying publisher", fault, e);
+		} finally {
+			super.destroy();
+		}
+    }
+	
+	public void onConsumerEvent(ConsumerEvent event) {
+		if (event.getConsumerCount() > 0) {
+			if (subscription == null) {
+				// start subscription
+				subscription = startSubscription();
+			}
+		} else {
+			if (subscription != null) {
+				// destroy subscription
+				Object sub = subscription;
+				subscription = null;
+				destroySubscription(sub);
+			}
+		}
 	}
+
+	protected abstract void destroySubscription(Object subscription);
+
+	protected abstract Object startSubscription();
+	
 
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java Tue Dec 20 13:59:53 2005
@@ -22,6 +22,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.oasis_open.docs.wsn.b_1.InvalidTopicExpressionFaultType;
 import org.oasis_open.docs.wsn.b_1.PauseFailedFaultType;
 import org.oasis_open.docs.wsn.b_1.ResumeFailedFaultType;
 import org.oasis_open.docs.wsn.b_1.Subscribe;
@@ -75,15 +76,14 @@
 	@Override
 	protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, InvalidUseRawValueFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
 		super.validateSubscription(subscribeRequest);
-		jmsTopic = topicConverter.toActiveMQTopic(topic);
+		try {
+			jmsTopic = topicConverter.toActiveMQTopic(topic);
+		} catch (InvalidTopicException e) {
+			InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+			throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+		}
 	}
 	
-	@Override
-	public void subscribe(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, InvalidUseRawValueFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-		validateSubscription(subscribeRequest);
-		start();
-	}
-
 	@Override
 	protected void pause() throws PauseFailedFault {
 		if (session == null) {

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsTopicExpressionConverter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsTopicExpressionConverter.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsTopicExpressionConverter.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsTopicExpressionConverter.java Tue Dec 20 13:59:53 2005
@@ -24,9 +24,7 @@
 import javax.xml.namespace.QName;
 
 import org.activemq.command.ActiveMQTopic;
-import org.oasis_open.docs.wsn.b_1.TopicExpressionDialectUnknownFaultType;
 import org.oasis_open.docs.wsn.b_1.TopicExpressionType;
-import org.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
 
 public class JmsTopicExpressionConverter {
 
@@ -47,7 +45,7 @@
         return answer;
     }
 
-    public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws TopicExpressionDialectUnknownFault {
+    public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws InvalidTopicException {
         if (topics == null || topics.size() == 0) {
             return null;
         }
@@ -59,11 +57,10 @@
 
         ActiveMQTopic topic = new ActiveMQTopic();
         topic.setCompositeDestinations(childrenDestinations);
-        //topic.setChildDestinations(childrenDestinations);
         return topic;
     }
 
-    public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws TopicExpressionDialectUnknownFault {
+    public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws InvalidTopicException {
         String dialect = topic.getDialect();
         if (dialect == null || SIMPLE_DIALECT.equals(dialect)) {
             for (Iterator iter = topic.getContent().iterator(); iter.hasNext();) {
@@ -72,11 +69,10 @@
                     return answer;
                 }
             }
-            throw new RuntimeException("No topic name available topic: " + topic);
+            throw new InvalidTopicException("No topic name available topic: " + topic);
         }
         else {
-        	TopicExpressionDialectUnknownFaultType fault = new TopicExpressionDialectUnknownFaultType();
-        	throw new TopicExpressionDialectUnknownFault("Topic dialect: " + dialect + " not supported", fault);
+        	throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
         }
     }
 

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/SubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/SubscriptionTest.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/SubscriptionTest.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/SubscriptionTest.java Tue Dec 20 13:59:53 2005
@@ -12,15 +12,11 @@
 
 import org.oasis_open.docs.wsn.b_1.Subscribe;
 import org.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
 import org.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
 import org.servicemix.wsn.jaxws.InvalidUseRawValueFault;
 import org.servicemix.wsn.jaxws.PauseFailedFault;
 import org.servicemix.wsn.jaxws.ResumeFailedFault;
 import org.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.servicemix.wsn.jaxws.TopicNotSupportedFault;
 import org.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
 import org.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
 
@@ -115,7 +111,7 @@
 		}
 
 		@Override
-		public void subscribe(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, InvalidUseRawValueFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+		protected void start() throws SubscribeCreationFailedFault {
 		}
 
 		@Override

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java?rev=358120&r1=358119&r2=358120&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java Tue Dec 20 13:59:53 2005
@@ -1,11 +1,18 @@
 package org.servicemix.wsn.component;
 
 import java.io.StringReader;
+import java.io.StringWriter;
 import java.util.List;
 
 import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.bind.JAXBContext;
 import javax.xml.namespace.QName;
 import javax.xml.parsers.DocumentBuilder;
+import javax.xml.transform.Source;
 
 import junit.framework.TestCase;
 
@@ -13,11 +20,20 @@
 import org.activemq.broker.BrokerService;
 import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
 import org.oasis_open.docs.wsn.b_1.Notify;
+import org.oasis_open.docs.wsn.b_1.Subscribe;
+import org.oasis_open.docs.wsn.b_1.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_1.Unsubscribe;
+import org.oasis_open.docs.wsn.b_1.UnsubscribeResponse;
+import org.servicemix.MessageExchangeListener;
+import org.servicemix.components.util.ComponentSupport;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.jaxp.SourceTransformer;
+import org.servicemix.jbi.jaxp.StringSource;
 import org.servicemix.tck.ReceiverComponent;
+import org.servicemix.wsn.client.AbstractWSAClient;
 import org.servicemix.wsn.client.NotificationBroker;
+import org.servicemix.wsn.client.Publisher;
 import org.servicemix.wsn.client.PullPoint;
 import org.servicemix.wsn.client.Subscription;
 import org.w3._2005._03.addressing.AttributedURIType;
@@ -90,6 +106,28 @@
 		Thread.sleep(50);
 	}
 
+	public void testUnsubscribe() throws Exception {
+		PullPoint pullPoint = wsnBroker.createPullPoint();
+		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(1, pullPoint.getMessages(0).size());
+
+		subscription.unsubscribe();
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(0, pullPoint.getMessages(0).size());
+
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+
 	public void testPauseResume() throws Exception {
 		PullPoint pullPoint = wsnBroker.createPullPoint();
 		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
@@ -164,6 +202,33 @@
 		assertEquals(0, pullPoint2.getMessages(0).size());
 	}
 	
+	public void testDemandeBasedPublisher() throws Exception {
+		PublisherComponent publisherComponent = new PublisherComponent();
+		jbi.activateComponent(publisherComponent, "publisher");
+		
+		Publisher publisher = wsnBroker.registerPublisher(
+									AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT), 
+									"myTopic", true);
+		
+		Thread.sleep(50);
+		assertNull(publisherComponent.getSubscription());
+
+		PullPoint pullPoint = wsnBroker.createPullPoint();
+		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+		Thread.sleep(50);
+		assertNotNull(publisherComponent.getSubscription());
+		
+		subscription.unsubscribe();
+		
+		Thread.sleep(50);
+		assertNull(publisherComponent.getSubscription());
+		
+		publisher.destroy();
+		
+		Thread.sleep(50);
+	}
+	
 	protected Element parse(String txt) throws Exception {
 		DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
 		InputSource is = new InputSource(new StringReader(txt));
@@ -176,5 +241,54 @@
 		epr.setAddress(new AttributedURIType());
 		epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/" + endpoint);
 		return epr;
+	}
+	
+	public static class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
+	    public static final QName SERVICE = new QName("http://servicemix.org/example", "publisher");
+	    public static final String ENDPOINT = "publisher";
+	    private Object subscription;
+	    public PublisherComponent() {
+	    	super(SERVICE, ENDPOINT);
+	    }
+		public Object getSubscription() {
+			return subscription;
+		}
+		public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+			if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+				try {
+					JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class);
+					Source src = exchange.getMessage("in").getContent();
+					Object input = jaxbContext.createUnmarshaller().unmarshal(src);
+					if (input instanceof Subscribe) {
+						subscription = input;
+						SubscribeResponse response = new SubscribeResponse();
+						response.setSubscriptionReference(AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT));
+						StringWriter writer = new StringWriter();
+						jaxbContext.createMarshaller().marshal(response, writer);
+						NormalizedMessage out = exchange.createMessage();
+						out.setContent(new StringSource(writer.toString()));
+						exchange.setMessage(out, "out");
+						send(exchange);
+					} else if (input instanceof Unsubscribe) {
+						subscription = null;
+						UnsubscribeResponse response = new UnsubscribeResponse();
+						StringWriter writer = new StringWriter();
+						jaxbContext.createMarshaller().marshal(response, writer);
+						NormalizedMessage out = exchange.createMessage();
+						out.setContent(new StringSource(writer.toString()));
+						exchange.setMessage(out, "out");
+						send(exchange);
+					} else {
+						throw new Exception("Unkown request");
+					}
+				} catch (Exception e) {
+					exchange.setError(e);
+					send(exchange);
+				}
+			} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+				exchange.setStatus(ExchangeStatus.DONE);
+				send(exchange);
+			}
+		}
 	}
 }



Mime
View raw message