tuscany-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From isil...@apache.org
Subject svn commit: r573053 - in /incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification: NotificationReferenceBindingInvoker.java NotificationReferenceBindingProvider.java
Date Wed, 05 Sep 2007 21:00:43 GMT
Author: isilval
Date: Wed Sep  5 14:00:42 2007
New Revision: 573053

URL: http://svn.apache.org/viewvc?rev=573053&view=rev
Log:
Move subscriber and broker id state from invoker to provider

Modified:
    incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
    incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java

Modified: incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java?rev=573053&r1=573052&r2=573053&view=diff
==============================================================================
--- incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
(original)
+++ incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
Wed Sep  5 14:00:42 2007
@@ -19,16 +19,11 @@
 package org.apache.tuscany.sca.binding.notification;
 
 import java.io.OutputStream;
-import java.net.URL;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 
 import org.apache.axiom.om.OMElement;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import org.apache.tuscany.sca.binding.notification.NotificationReferenceBindingProvider.SubscriberInfo;
 import org.apache.tuscany.sca.binding.notification.encoding.Constants;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
 import org.apache.tuscany.sca.binding.notification.util.IOUtils;
 import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
 import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
@@ -47,21 +42,12 @@
     private static final Message RESPONSE = new ImmutableMessage();
     private Operation operation;
     
-    private List<SubscriberInfo> subscribers;
-    private String brokerID;
+    private NotificationReferenceBindingProvider notificationReferenceBindingProvider;
 
-    public NotificationReferenceBindingInvoker(Operation operation) {
+    public NotificationReferenceBindingInvoker(Operation operation,
+                                               NotificationReferenceBindingProvider notificationReferenceBindingProvider)
{
         this.operation = operation;
-        this.subscribers = new ArrayList<SubscriberInfo>();
-        this.brokerID = null;
-    }
-    
-    public void setBrokerID(String brokerID) {
-        this.brokerID = brokerID;
-    }
-    
-    public String getBrokerID() {
-        return brokerID;
+        this.notificationReferenceBindingProvider = notificationReferenceBindingProvider;
     }
     
     public Message invoke(Message msg) {
@@ -90,13 +76,14 @@
 
         try {
             synchronized(this) {
-                for (SubscriberInfo subscriber : subscribers) {
+                for (SubscriberInfo subscriber : notificationReferenceBindingProvider.getSubscribers())
{
                     // check for each subscriber's broker id and skip if equal
                     if (incomingBrokerID != null && subscriber.brokerID != null &&
incomingBrokerID.equals(subscriber.brokerID)) {
                         continue;
                     }
                     HashMap<String, String> headers = new HashMap<String, String>();
                     headers.put(IOUtils.Notification_Operation, operation.getName());
+                    String brokerID = notificationReferenceBindingProvider.getBrokerID();
                     if (brokerID != null) {
                         headers.put(Constants.Broker_ID, brokerID);
                     }
@@ -142,103 +129,5 @@
             }
         };
         return writeable;
-    }
-    
-    public void addSubscriberUrl(URL subscriberUrl) {
-        addSubscriber(subscriberUrl, null);
-    }
-    
-    public void addSubscriber(EndpointReference subscriberEPR) {
-        BrokerID brokerID = null;
-        if (subscriberEPR.getReferenceProperties() != null) {
-            brokerID = subscriberEPR.getReferenceProperties().getProperty(BrokerID.class);
-        }
-        addSubscriber(subscriberEPR.getEndpointAddress().getAddress(), (brokerID != null
? brokerID.getID() : null));
-    }
-
-    public void addSubscriber(URL address, String brokerID) {
-        synchronized(this) {
-            SubscriberInfo si = new SubscriberInfo(address);
-            si.brokerID = brokerID;
-            if (subscribers == null) {
-                subscribers = new ArrayList<SubscriberInfo>();
-            }
-            subscribers.add(si);
-        }
-    }
-    
-    public void replaceSubscribers(EndpointReference brokerConsumerEPR) {
-        synchronized(this) {
-            subscribers = null;
-        }
-        addSubscriber(brokerConsumerEPR);
-    }
-    
-    public void replaceBrokerSubscriber(URL removedBrokerConsumerUrl, EndpointReference chosenBrokerConsumerEpr)
{
-        synchronized(this) {
-            if (subscribers == null) {
-                throw new RuntimeException("No subscribers");
-            }
-            SubscriberInfo siToRemove = null;
-            for (SubscriberInfo si : subscribers) {
-                if (si.address.equals(removedBrokerConsumerUrl)) {
-                    siToRemove = si;
-                }
-            }
-            if (siToRemove == null) {
-                throw new RuntimeException("Can't find info for broker to remove [" + removedBrokerConsumerUrl
+ "]");
-            }
-            if (!subscribers.remove(siToRemove)) {
-                throw new RuntimeException("Can't remove info for [" + siToRemove.address
+ "]");
-            }
-        }
-        if (chosenBrokerConsumerEpr != null) {
-            addSubscriber(chosenBrokerConsumerEpr);
-        }
-    }
-    
-    public List<EndpointReference> getNeighborBrokerConsumerEprs() {
-        synchronized(this) {
-            if (subscribers == null) {
-                throw new RuntimeException("No subscribers");
-            }
-            List<EndpointReference> neighborBrokerConsumerEprs = new ArrayList<EndpointReference>();
-            for(SubscriberInfo si : subscribers) {
-                if (si.brokerID != null) {
-                    neighborBrokerConsumerEprs.add(EncodingUtils.createEndpointReference(si.address,
si.brokerID));
-                }
-            }
-            
-            return neighborBrokerConsumerEprs;
-        }
-    }
-    
-    public void removeBrokerSubscribers() {
-        synchronized(this) {
-            if (subscribers == null) {
-                throw new RuntimeException("No subscribers");
-            }
-            List<SubscriberInfo> sisToRemove = new ArrayList<SubscriberInfo>();
-            for (SubscriberInfo si : subscribers) {
-                if (si.brokerID != null) {
-                    sisToRemove.add(si);
-                }
-            }
-            for(SubscriberInfo si : sisToRemove) {
-                if (!subscribers.remove(si)) {
-                    throw new RuntimeException("Can't remove broker subscriber [" + si.address
+ "]");
-                }
-            }
-        }
-    }
-    
-    class SubscriberInfo {
-        public URL address;
-        public String brokerID;
-        
-        public SubscriberInfo(URL address) {
-            this.address = address;
-            this.brokerID = null;
-        }
     }
 }

Modified: incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java?rev=573053&r1=573052&r2=573053&view=diff
==============================================================================
--- incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
(original)
+++ incubator/tuscany/java/sca/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
Wed Sep  5 14:00:42 2007
@@ -29,6 +29,7 @@
 
 import org.apache.axiom.om.OMElement;
 import org.apache.tuscany.sca.binding.notification.encoding.Broker;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
 import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride;
 import org.apache.tuscany.sca.binding.notification.encoding.Constants;
 import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject;
@@ -70,6 +71,9 @@
     private boolean started;
     private NotificationBrokerManager brokerManager;
 
+    private List<SubscriberInfo> subscribers;
+    private String brokerID;
+    
     public NotificationReferenceBindingProvider(NotificationBinding notificationBinding,
                                                 RuntimeComponent component,
                                                 RuntimeComponentReference reference,
@@ -107,6 +111,9 @@
         for (Operation operation : interfaze.getOperations()) {
             operation.setNonBlocking(false);
         }
+
+        this.subscribers = new ArrayList<SubscriberInfo>();
+        this.brokerID = null;
     }
     
     public NotificationBinding getBinding() {
@@ -120,17 +127,33 @@
     public boolean isStarted() {
         return started;
     }
+    
+    public void setBrokerID(String brokerID) {
+        this.brokerID = brokerID;
+    }
+    
+    public String getBrokerID() {
+        return brokerID;
+    }
 
     public Invoker createInvoker(Operation operation, boolean isCallback) {
         if (isCallback) {
             throw new UnsupportedOperationException();
         }
+        return createInvoker(operation);
+    }
+
+    public Invoker createInvoker(Operation operation) {
         if (invoker == null) {
-            invoker = new NotificationReferenceBindingInvoker(operation);
+            invoker = new NotificationReferenceBindingInvoker(operation, this);
         }
         return invoker;
     }
 
+    public boolean supportsAsyncOneWayInvocation() {
+        return false;
+    }
+
     public InterfaceContract getBindingInterfaceContract() {
         return reference.getInterfaceContract();
     }
@@ -157,13 +180,13 @@
         }
         if (Constants.EndConsumers.equals(sequenceType)) {
             for (URL consumerUrl : consumerList) {
-                invoker.addSubscriberUrl(consumerUrl);
+                addSubscriberUrl(consumerUrl);
             }
         }
         else if (Constants.BrokerConsumers.equals(sequenceType)) {
             // Pick a broker consumer, for now the first one
             URL consumerUrl = consumerList.get(0);
-            invoker.addSubscriberUrl(consumerUrl);
+            addSubscriberUrl(consumerUrl);
         }
 
         servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
@@ -171,21 +194,21 @@
     
     public void deployBroker(String brokerID, EndpointReference brokerConsumerEPR, List<EndpointReference>
consumerList) {
         if (brokerConsumerEPR != null) {
-            invoker.addSubscriber(brokerConsumerEPR);            
+            addSubscriber(brokerConsumerEPR);            
         }
         if (consumerList != null && !consumerList.isEmpty()) {
             for (EndpointReference consumerEPR : consumerList) {
-                invoker.addSubscriber(consumerEPR);
+                addSubscriber(consumerEPR);
             }
         }
-        invoker.setBrokerID(brokerID);
+        setBrokerID(brokerID);
         servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
     }
     
     public void undeployBroker(URL brokerConsumerUrl) {
-        EndpointReference brokerConsumerEpr = EncodingUtils.createEndpointReference(brokerConsumerUrl,
invoker.getBrokerID());
-        ntm.removeBroker(brokerConsumerEpr, invoker.getNeighborBrokerConsumerEprs(), remoteNtmUrl);
-        invoker.removeBrokerSubscribers();
+        EndpointReference brokerConsumerEpr = EncodingUtils.createEndpointReference(brokerConsumerUrl,
getBrokerID());
+        ntm.removeBroker(brokerConsumerEpr, getNeighborBrokerConsumerEprs(), remoteNtmUrl);
+        removeBrokerSubscribers();
     }
     
     public void handle(Map<String, String> headers, ServletInputStream istream, int
contentLength, ServletOutputStream ostream) {
@@ -194,11 +217,11 @@
             EncodingObject eo = EncodingUtils.decodeFromStream(encodingRegistry, istream);
             if (eo instanceof Subscribe) {
                 Subscribe sub = (Subscribe)eo;
-                invoker.addSubscriber(sub.getConsumerReference().getReference());
+                addSubscriber(sub.getConsumerReference().getReference());
             }
             else if (eo instanceof ConnectionOverride) {
                 ConnectionOverride co = (ConnectionOverride)eo;
-                invoker.replaceSubscribers(co.getBrokerConsumerReference().getReference());
+                replaceSubscribers(co.getBrokerConsumerReference().getReference());
             }
             else if (eo instanceof ReplaceBrokerConnection) {
                 ReplaceBrokerConnection rbc = (ReplaceBrokerConnection)eo;
@@ -206,13 +229,13 @@
                 if (rbc.getNeighbors() != null) {
                     int choice = rbc.getNeighbors().getBrokerSequence().size() - 1;
                     Broker chosenBroker = rbc.getNeighbors().getBrokerSequence().get(choice);
-                    invoker.replaceBrokerSubscriber(removedBrokerConsumerEpr,
+                    replaceBrokerSubscriber(removedBrokerConsumerEpr,
                                                     chosenBroker.getBrokerConsumerReference().getReference());
                     brokerManager.replaceConsumersBrokerConnection(notificationType,
                                                                    chosenBroker.getBrokerProducerReference().getReference());
                 }
                 else {
-                    invoker.replaceBrokerSubscriber(removedBrokerConsumerEpr, null);
+                    replaceBrokerSubscriber(removedBrokerConsumerEpr, null);
                 }
             }
             else {
@@ -221,6 +244,108 @@
         } catch(Throwable e) {
             e.printStackTrace();
             throw new RuntimeException(e);
+        }
+    }
+
+    public List<SubscriberInfo> getSubscribers() {
+        return subscribers;
+    }
+    
+    private void addSubscriberUrl(URL subscriberUrl) {
+        addSubscriber(subscriberUrl, null);
+    }
+    
+    private void addSubscriber(EndpointReference subscriberEPR) {
+        BrokerID brokerID = null;
+        if (subscriberEPR.getReferenceProperties() != null) {
+            brokerID = subscriberEPR.getReferenceProperties().getProperty(BrokerID.class);
+        }
+        addSubscriber(subscriberEPR.getEndpointAddress().getAddress(), (brokerID != null
? brokerID.getID() : null));
+    }
+
+    private void addSubscriber(URL address, String brokerID) {
+        synchronized(this) {
+            SubscriberInfo si = new SubscriberInfo(address);
+            si.brokerID = brokerID;
+            if (subscribers == null) {
+                subscribers = new ArrayList<SubscriberInfo>();
+            }
+            subscribers.add(si);
+        }
+    }
+    
+    private void replaceSubscribers(EndpointReference brokerConsumerEPR) {
+        synchronized(this) {
+            subscribers = null;
+        }
+        addSubscriber(brokerConsumerEPR);
+    }
+    
+    private void replaceBrokerSubscriber(URL removedBrokerConsumerUrl, EndpointReference
chosenBrokerConsumerEpr) {
+        synchronized(this) {
+            if (subscribers == null) {
+                throw new RuntimeException("No subscribers");
+            }
+            SubscriberInfo siToRemove = null;
+            for (SubscriberInfo si : subscribers) {
+                if (si.address.equals(removedBrokerConsumerUrl)) {
+                    siToRemove = si;
+                }
+            }
+            if (siToRemove == null) {
+                throw new RuntimeException("Can't find info for broker to remove [" + removedBrokerConsumerUrl
+ "]");
+            }
+            if (!subscribers.remove(siToRemove)) {
+                throw new RuntimeException("Can't remove info for [" + siToRemove.address
+ "]");
+            }
+        }
+        if (chosenBrokerConsumerEpr != null) {
+            addSubscriber(chosenBrokerConsumerEpr);
+        }
+    }
+    
+    private List<EndpointReference> getNeighborBrokerConsumerEprs() {
+        synchronized(this) {
+            if (subscribers == null) {
+                throw new RuntimeException("No subscribers");
+            }
+            List<EndpointReference> neighborBrokerConsumerEprs = new ArrayList<EndpointReference>();
+            for(SubscriberInfo si : subscribers) {
+                if (si.brokerID != null) {
+                    neighborBrokerConsumerEprs.add(EncodingUtils.createEndpointReference(si.address,
si.brokerID));
+                }
+            }
+            
+            return neighborBrokerConsumerEprs;
+        }
+    }
+    
+    private void removeBrokerSubscribers() {
+        synchronized(this) {
+            if (subscribers == null) {
+                throw new RuntimeException("No subscribers");
+            }
+            List<SubscriberInfo> sisToRemove = new ArrayList<SubscriberInfo>();
+            for (SubscriberInfo si : subscribers) {
+                if (si.brokerID != null) {
+                    sisToRemove.add(si);
+                }
+            }
+            for(SubscriberInfo si : sisToRemove) {
+                if (!subscribers.remove(si)) {
+                    throw new RuntimeException("Can't remove broker subscriber [" + si.address
+ "]");
+                }
+            }
+        }
+    }
+    
+    class SubscriberInfo {
+        public URL address;
+        public String brokerID;
+        
+        public SubscriberInfo(URL address) {
+            this.address = address;
+            this.brokerID = null;
         }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: tuscany-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: tuscany-commits-help@ws.apache.org


Mime
View raw message