servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gno...@apache.org
Subject svn commit: r357945 - in /incubator/servicemix/trunk/servicemix-wsn2005/src: main/java/org/servicemix/wsn/jms/JmsSubscription.java test/java/org/servicemix/wsn/component/WSNComponentTest.java
Date Tue, 20 Dec 2005 10:02:33 GMT
Author: gnodet
Date: Tue Dec 20 02:02:26 2005
New Revision: 357945

URL: http://svn.apache.org/viewcvs?rev=357945&view=rev
Log:
Add xpath filtering to wsn2005 component

Modified:
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java?rev=357945&r1=357944&r2=357945&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java
(original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/servicemix/wsn/jms/JmsSubscription.java
Tue Dec 20 02:02:26 2005
@@ -1,5 +1,8 @@
 package org.servicemix.wsn.jms;
 
+import java.io.IOException;
+import java.io.StringReader;
+
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -9,6 +12,13 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +42,11 @@
 import org.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
 import org.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
 import org.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
 
 public abstract class JmsSubscription extends AbstractSubscription implements MessageListener
{
 
@@ -114,10 +129,7 @@
 	public void onMessage(Message message) {
 		try {
 			TextMessage text = (TextMessage) message;
-	        boolean match = true;
-			if (contentFilter != null) {
-				match = doFilter(text.getText());
-			}
+	        boolean match = doFilter(text.getText());
 			if (match) {
 				doNotify(text.getText());
 			}
@@ -127,6 +139,44 @@
 	}
 	
 	protected boolean doFilter(String notify) {
+		if (contentFilter != null) {
+			if (!contentFilter.getDialect().equals(XPATH1_URI)) {
+				throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
+			}
+			try {
+				DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+				factory.setNamespaceAware(true);
+				Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(notify)));
+				Element root = doc.getDocumentElement();
+				Element holder = (Element) root.getElementsByTagNameNS("http://docs.oasis-open.org/wsn/b-1",
"NotificationMessage").item(0);
+				Element message = (Element) holder.getElementsByTagNameNS("http://docs.oasis-open.org/wsn/b-1",
"Message").item(0);
+				Element content = null;
+				for (int i = 0; i < message.getChildNodes().getLength(); i++) {
+					if (message.getChildNodes().item(i) instanceof Element) {
+						content = (Element) message.getChildNodes().item(i);
+						break;
+					}
+				}
+				XPathFactory xpfactory = XPathFactory.newInstance();
+				XPath xpath = xpfactory.newXPath();
+				XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
+				Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
+				return ret.booleanValue();
+			} catch (SAXException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (IOException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (ParserConfigurationException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			} catch (XPathExpressionException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+			return false;
+		}
 		return true;
 	}
 	

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java?rev=357945&r1=357944&r2=357945&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
(original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/servicemix/wsn/component/WSNComponentTest.java
Tue Dec 20 02:02:26 2005
@@ -1,8 +1,13 @@
 package org.servicemix.wsn.component;
 
+import java.io.StringReader;
+import java.math.BigInteger;
+import java.util.List;
+
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
 import javax.xml.namespace.QName;
+import javax.xml.parsers.DocumentBuilder;
 
 import junit.framework.TestCase;
 
@@ -15,39 +20,57 @@
 import org.oasis_open.docs.wsn.b_1.GetMessagesResponse;
 import org.oasis_open.docs.wsn.b_1.NotificationMessageHolderType;
 import org.oasis_open.docs.wsn.b_1.Notify;
+import org.oasis_open.docs.wsn.b_1.QueryExpressionType;
 import org.oasis_open.docs.wsn.b_1.Subscribe;
 import org.oasis_open.docs.wsn.b_1.SubscribeResponse;
 import org.oasis_open.docs.wsn.b_1.TopicExpressionType;
 import org.servicemix.client.DefaultServiceMixClient;
 import org.servicemix.jbi.container.ActivationSpec;
 import org.servicemix.jbi.container.JBIContainer;
+import org.servicemix.jbi.jaxp.SourceTransformer;
 import org.servicemix.jbi.resolver.EndpointResolver;
 import org.servicemix.jbi.resolver.ServiceAndEndpointNameResolver;
 import org.servicemix.jbi.resolver.ServiceNameEndpointResolver;
 import org.servicemix.tck.ReceiverComponent;
+import org.servicemix.wsn.AbstractSubscription;
 import org.servicemix.wsn.JAXBMarshaller;
 import org.w3._2005._03.addressing.AttributedURIType;
 import org.w3._2005._03.addressing.EndpointReferenceType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
 
 public class WSNComponentTest extends TestCase {
 	
-	public static QName NOTIFICATION_BROKER = QName.valueOf("{http://servicemix.org/wsnotification}NotificationBroker");

+	public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification",
"NotificationBroker"); 
 	
 	private JBIContainer jbi;
 	private BrokerService broker;
 	private JAXBContext context;
+	private DefaultServiceMixClient client;
 	
 	protected void setUp() throws Exception {
-		jbi = new JBIContainer();
-		jbi.setEmbedded(true);
-		jbi.init();
-		
 		broker = new BrokerService();
 		broker.setPersistent(false);
 		broker.addConnector("vm://localhost");
 		broker.start();
 
+		jbi = new JBIContainer();
+		jbi.setEmbedded(true);
+		jbi.init();
+		jbi.start();
+		
+		WSNComponent component = new WSNComponent();
+		component.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
+		ActivationSpec as = new ActivationSpec();
+		as.setComponentName("servicemix-wsn2005");
+		as.setComponent(component);
+		jbi.activateComponent(as);
+		
 		context = JAXBContext.newInstance(Subscribe.class, SubscribeResponse.class);
+
+		client = new DefaultServiceMixClient(jbi);
+		client.setMarshaler(new JAXBMarshaller(context));
 	}
 	
 	protected void tearDown() throws Exception {
@@ -60,42 +83,15 @@
 	}
 	
 	public void testNB() throws Exception {
-		jbi.start();
-		
-		WSNComponent component = new WSNComponent();
-		component.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
-		ActivationSpec as = new ActivationSpec();
-		as.setComponentName("broker");
-		as.setComponent(component);
-		jbi.activateComponent(as);
-		
 		ReceiverComponent receiver = new ReceiverComponent();
 		jbi.activateComponent(receiver, "receiver");
 		
-		DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
-		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
-		client.setMarshaler(new JAXBMarshaller(context));
-		
-		Subscribe subscribeRequest = new Subscribe();
-		EndpointReferenceType consumerReference = new EndpointReferenceType();
-		consumerReference.setAddress(new AttributedURIType());
-		consumerReference.getAddress().setValue(ReceiverComponent.SERVICE.getNamespaceURI() + "/"
+ ReceiverComponent.SERVICE.getLocalPart() + "/" + ReceiverComponent.ENDPOINT);
-		subscribeRequest.setConsumerReference(consumerReference);
-		subscribeRequest.setFilter(new FilterType());
-		TopicExpressionType topic = new TopicExpressionType();
-		topic.getContent().add("myTopic");
-		subscribeRequest.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(new
QName("http://docs.oasis-open.org/wsn/b-1", "TopicExpression"), TopicExpressionType.class,
topic));
-		SubscribeResponse subscribeResponse = (SubscribeResponse) client.request(resolver, null,
null, subscribeRequest);
-		
-		Thread.sleep(500);
-		
-		Notify notify = new Notify();
-		NotificationMessageHolderType holder = new NotificationMessageHolderType();
-		holder.setTopic(topic);
-		holder.setMessage(new NotificationMessageHolderType.Message());
-		holder.getMessage().setAny(new Notify());
-		notify.getNotificationMessage().add(holder);
-		client.send(resolver, null, null, notify);
+		EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+		EndpointReferenceType subscription = subscribe(consumer, "myTopic", null);
+
+		notify("myTopic", parse("<hello>world</hello>"));
+		// Wait for notification
+		Thread.sleep(50);
 		
 		receiver.getMessageList().assertMessagesReceived(1);
 		
@@ -104,56 +100,117 @@
 	}
 
 	public void testPull() throws Exception {
-		jbi.start();
-		
-		WSNComponent component = new WSNComponent();
-		component.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
-		ActivationSpec as = new ActivationSpec();
-		as.setComponentName("broker");
-		as.setComponent(component);
-		jbi.activateComponent(as);
+		EndpointReferenceType pullPoint = createPullPoint();
+		subscribe(pullPoint, "myTopic", null);
 		
-		DefaultServiceMixClient client = new DefaultServiceMixClient(jbi);
-		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
-		client.setMarshaler(new JAXBMarshaller(context));
+		notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
 		
-		CreatePullPoint createPullPoint = new CreatePullPoint();
-		CreatePullPointResponse createPullPointResponse = (CreatePullPointResponse) client.request(resolver,
null, null, createPullPoint);
+		List<NotificationMessageHolderType> msgs = getMessages(pullPoint, 0);
+		assertNotNull(msgs);
+		assertEquals(1, msgs.size());
+
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+	
+	public void testPullWithFilter() throws Exception {
+		EndpointReferenceType pullPoint1 = createPullPoint();
+		EndpointReferenceType pullPoint2 = createPullPoint();
+		EndpointReferenceType subscription1 = subscribe(pullPoint1, "myTopic", "@type = 'a'");
+		EndpointReferenceType subscription2 = subscribe(pullPoint2, "myTopic", "@type = 'b'");
 		
-		Subscribe subscribeRequest = new Subscribe();
-		subscribeRequest.setConsumerReference(createPullPointResponse.getPullPoint());
-		subscribeRequest.setFilter(new FilterType());
-		TopicExpressionType topic = new TopicExpressionType();
-		topic.getContent().add("myTopic");
-		subscribeRequest.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(new
QName("http://docs.oasis-open.org/wsn/b-1", "TopicExpression"), TopicExpressionType.class,
topic));
-		SubscribeResponse subscribeResponse = (SubscribeResponse) client.request(resolver, null,
null, subscribeRequest);
+		notify("myTopic", parse("<msg type='a'/>"));
+		// Wait for notification
+		Thread.sleep(50);
+
+		assertEquals(1, getMessages(pullPoint1, 0).size());
+		assertEquals(0, getMessages(pullPoint2, 0).size());
 		
+		notify("myTopic", parse("<msg type='b'/>"));
 		// Wait for notification
-		Thread.sleep(500);
+		Thread.sleep(50);
+
+		assertEquals(0, getMessages(pullPoint1, 0).size());
+		assertEquals(1, getMessages(pullPoint2, 0).size());
 		
+		notify("myTopic", parse("<msg type='c'/>"));
+		// Wait for notification
+		Thread.sleep(50);
+
+		assertEquals(0, getMessages(pullPoint1, 0).size());
+		assertEquals(0, getMessages(pullPoint2, 0).size());
+	}
+	
+	protected Element parse(String txt) throws Exception {
+		DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
+		InputSource is = new InputSource(new StringReader(txt));
+		Document doc = builder.parse(is);
+		return doc.getDocumentElement();
+	}
+	
+	protected EndpointReferenceType createEPR(QName service, String endpoint) {
+		EndpointReferenceType epr = new EndpointReferenceType();
+		epr.setAddress(new AttributedURIType());
+		epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/"
+ endpoint);
+		return epr;
+	}
+	
+	protected EndpointReferenceType createPullPoint() throws Exception {
+		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+		CreatePullPointResponse response = (CreatePullPointResponse) client.request(resolver, null,
null, new CreatePullPoint());
+		return response.getPullPoint();
+	}
+	
+	protected EndpointReferenceType subscribe(EndpointReferenceType consumer, 
+											  String topic,
+											  String xpath) throws Exception {
+		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
+		Subscribe subscribeRequest = new Subscribe();
+		subscribeRequest.setConsumerReference(consumer);
+		subscribeRequest.setFilter(new FilterType());
+		if (topic != null) {
+			TopicExpressionType topicExp = new TopicExpressionType();
+			topicExp.getContent().add(topic);
+			subscribeRequest.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
TopicExpressionType.class, topicExp));
+		}
+		if (xpath != null) {
+			QueryExpressionType xpathExp = new QueryExpressionType();
+			xpathExp.setDialect(AbstractSubscription.XPATH1_URI);
+			xpathExp.getContent().add(xpath);
+			subscribeRequest.getFilter().getAny().add(new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT,
QueryExpressionType.class, xpathExp));
+		}
+		SubscribeResponse response = (SubscribeResponse) client.request(resolver, null, null, subscribeRequest);
+		return response.getSubscriptionReference();
+	}
+	
+	protected void notify(String topic, Object msg) throws Exception {
+		EndpointResolver resolver = new ServiceNameEndpointResolver(NOTIFICATION_BROKER);
 		Notify notify = new Notify();
 		NotificationMessageHolderType holder = new NotificationMessageHolderType();
-		holder.setTopic(topic);
+		if (topic != null) {
+			TopicExpressionType topicExp = new TopicExpressionType();
+			topicExp.getContent().add(topic);
+			holder.setTopic(topicExp);
+		}
 		holder.setMessage(new NotificationMessageHolderType.Message());
-		holder.getMessage().setAny(new Notify());
+		holder.getMessage().setAny(msg);
 		notify.getNotificationMessage().add(holder);
 		client.send(resolver, null, null, notify);
-		
-		// Wait for notification
-		Thread.sleep(500);
-		
-		String[] parts = split(createPullPointResponse.getPullPoint().getAddress().getValue());
-		resolver = new ServiceAndEndpointNameResolver(new QName(parts[0], parts[1]), parts[2]);
-		
+	}
+	
+	protected List<NotificationMessageHolderType> getMessages(EndpointReferenceType pullPoint,
int max) throws Exception {
+		EndpointResolver resolver = resolveWSA(pullPoint);
 		GetMessages getMessages = new GetMessages();
-		GetMessagesResponse getMessagesResponse = (GetMessagesResponse) client.request(resolver,
null, null, getMessages);
-
-		assertNotNull(getMessagesResponse);
-		assertNotNull(getMessagesResponse.getNotificationMessage());
-		assertEquals(1, getMessagesResponse.getNotificationMessage().size());
-		
-		// Wait for acks to be processed
-		Thread.sleep(50);
+		getMessages.setMaximumNumber(BigInteger.valueOf(max));
+		GetMessagesResponse response = (GetMessagesResponse) client.request(resolver, null, null,
getMessages);
+		return response.getNotificationMessage();
+	}
+	
+	protected EndpointResolver resolveWSA(EndpointReferenceType ref) {
+		String[] parts = split(ref.getAddress().getValue());
+		return new ServiceAndEndpointNameResolver(new QName(parts[0], parts[1]), parts[2]);
 	}
 
     protected String[] split(String uri) {



Mime
View raw message