river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1554723 [5/5] - in /river/jtsk/skunk/qa_refactor/trunk: qa/src/com/sun/jini/qa/harness/ qa/src/com/sun/jini/test/impl/mahalo/ qa/src/com/sun/jini/test/impl/reggie/ qa/src/com/sun/jini/test/resources/ qa/src/com/sun/jini/test/share/ qa/src/...
Date Thu, 02 Jan 2014 02:45:09 GMT
Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1554723&r1=1554722&r2=1554723&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Thu Jan  2 02:45:07 2014
@@ -23,6 +23,7 @@ import com.sun.jini.thread.RetryTask;
 import com.sun.jini.thread.TaskManager;
 import com.sun.jini.thread.WakeupManager;
 import com.sun.jini.logging.LogUtil;
+import com.sun.jini.thread.TaskManager.Task;
 
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationException;
@@ -50,7 +51,9 @@ import java.io.IOException;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -560,14 +563,14 @@ public class JoinManager {
      */
 
     /** Abstract base class from which all of the task classes are derived. */
-    private class ProxyRegTask extends RetryTask {
+    private class ProxyRegTask extends RetryTask implements Task {
         private final long[] sleepTime = { 5*1000, 10*1000, 15*1000,
                                           20*1000, 25*1000, 30*1000 };
         // volatile fields only mutated while synchronized on proxyReg.taskList
         private volatile int tryIndx  = 0;
         private volatile int nRetries = 0;
         private final ProxyReg proxyReg;
-        private volatile int seqN;
+        private final int seqN;
 
         /** Basic constructor; simply stores the input parameters */
         ProxyRegTask(ProxyReg proxyReg, int seqN) {
@@ -604,17 +607,18 @@ public class JoinManager {
                         proxyReg.proxyRegTask = null;
                         return true;
                     }//endif
-                    t = (JoinTask)proxyReg.taskList.get(0);
+                    t = proxyReg.taskList.get(0);
                 }//end sync
                 try {
                     t.run();
                     synchronized(proxyReg.taskList) {
                         if( !proxyReg.taskList.isEmpty() ) {
-                            proxyReg.taskList.remove(0);
+                            JoinTask task = proxyReg.taskList.get(0);
+                            if (task == t) proxyReg.taskList.remove(0);
                         }//endif
-                    /* reset the retry info for the next task in the list */
-                    tryIndx  = 0;
-                    nRetries = 0;
+                        /* reset the retry info for the next task in the list */
+                        tryIndx  = 0;
+                        nRetries = 0;
                     }//end sync
                     
                 } catch (Exception e) {
@@ -630,8 +634,8 @@ public class JoinManager {
         public long retryTime() {
 	    long nextTryTime = System.currentTimeMillis() + sleepTime[tryIndx];
             synchronized (proxyReg.taskList){
-	    if(tryIndx < sleepTime.length-1)  tryIndx++;//don't go past end
-            nRetries++;
+                if(tryIndx < sleepTime.length-1)  tryIndx++;//don't go past end
+                    nRetries++;
             }
             return nextTryTime;
         }//end retryTime
@@ -679,16 +683,16 @@ public class JoinManager {
              * to run all ProxyRegTask's in parallel, otherwise, the
              * ProxyRegTask with the lowest sequence number should be run.
              */
-            synchronized(serviceItem) {//accessing serviceItem.serviceID
-                if(serviceItem.serviceID != null)  return false;
-                /* For task with lowest seq #, run it now; else run it later */
-                for(int i=0; i<size; i++) {
-                    TaskManager.Task t = (TaskManager.Task)tasks.get(i);
+            if(serviceItem.serviceID != null)  return false;
+            /* For task with lowest seq #, run it now; else run it later */
+            for(int i=0; i<size; i++) {
+                Object t = tasks.get(i);
+                if (t instanceof ProxyRegTask){
                     int nextTaskSeqN = ((ProxyRegTask)t).getSeqN();
                     if( seqN > nextTaskSeqN )  return true;
-                }//end loop
-                return false;
-	    }//end sync(serviceItem)
+                }
+            }//end loop
+            return false;
         }//end runAfter
 
         /** Accessor method that returns the instance of <code>ProxyReg</code>
@@ -725,9 +729,7 @@ public class JoinManager {
             if(    (exCat != ThrowableConstants.INDEFINITE)
                 || (nRetries >= maxNRetries) )
             {
-                synchronized(joinSet) {
-                    removeTasks(proxyReg);//cancel and clear all related tasks
-                }//end sync(joinSet)
+                removeTasks(proxyReg);//cancel and clear all related tasks
                 proxyReg.fail(e);
                 return true;//don't try again
             }//endif
@@ -738,7 +740,7 @@ public class JoinManager {
     }//end class ProxyRegTask
 
     /** Abstract base class from which all the sub-task classes are derived. */
-    private abstract class JoinTask {
+    private static abstract class JoinTask {
 
         /** Data structure referencing the task's associated lookup service */
         protected final ProxyReg proxyReg;
@@ -757,7 +759,7 @@ public class JoinManager {
      *  join manager with the lookup service referenced by the current
      *  instance of this class.
      */
-    private class RegisterTask extends JoinTask {
+    private static class RegisterTask extends JoinTask {
         /** Attributes with which to register the service. These attributes
          *  must not change during the registration process performed in
          *  this this task.
@@ -827,11 +829,7 @@ public class JoinManager {
          */
         public void run() throws Exception {
             logger.finest("JoinManager - LeaseExpireNotifyTask started");
-            boolean tryIt = false;
-            synchronized(joinSet) {
-                tryIt = joinSet.contains(proxyReg);
-            }//end sync(joinSet)
-            if(tryIt)  proxyReg.register(regAttrs);
+            if(joinSet.contains(proxyReg)) proxyReg.register(regAttrs);
             logger.finest("JoinManager - LeaseExpireNotifyTask completed");
 	}//end run
 
@@ -908,11 +906,11 @@ public class JoinManager {
      *  join manager's service in the lookup service referenced by the
      *  current instance of this class.
      */
-    private class AddAttributesTask extends JoinTask {
+    private static class AddAttributesTask extends JoinTask {
         /** The new attribute values with which the service's current
          *  attributes will be augmented, replaced, or changed.
          */
-	protected Entry[] attrSets;
+	protected final Entry[] attrSets;
 
         /** Constructor that associates this task with the lookup service
          *  referenced in the given <code>ProxyReg</code> parameter.
@@ -954,7 +952,7 @@ public class JoinManager {
      *  join manager's service in the lookup service referenced by the
      *  current instance of this class.
      */
-    private final class SetAttributesTask extends AddAttributesTask {
+    private static final class SetAttributesTask extends AddAttributesTask {
         /** Constructor that associates this task with the lookup service
          *  referenced in the given <code>ProxyReg</code> parameter.
          *
@@ -981,8 +979,8 @@ public class JoinManager {
      *  join manager's service in the lookup service referenced by the
      *  current instance of this class.
      */
-    private final class ModifyAttributesTask extends AddAttributesTask {
-	private Entry[] attrSetTemplates;
+    private static final class ModifyAttributesTask extends AddAttributesTask {
+	private final Entry[] attrSetTemplates;
         /** Constructor that associates this task with the lookup service
          *  referenced in the given <code>ProxyReg</code> parameter.
          *
@@ -1121,21 +1119,20 @@ public class JoinManager {
   	    public void notify(LeaseRenewalEvent e) {
                 Throwable ex = e.getException();
 		if ( (ex == null) || (ex instanceof UnknownLeaseException) ) {
-                    synchronized(joinSet) {
-                        removeTasks(ProxyReg.this);
-                        Lease expiredLease = e.getLease();
-                        // Maybe re-register
-                        int indx = joinSet.indexOf(ProxyReg.this);
-                        if(indx >= 0) {//new proxyReg/old ProxyReg.this in set
-                            ProxyReg curProxyReg = (ProxyReg)joinSet.get(indx);
-                            if(expiredLease.equals(curProxyReg.serviceLease)) {
-                                // Okay to re-register
-                                addTask(new LeaseExpireNotifyTask
-                                                (ProxyReg.this,
-                                                 (Entry[])lookupAttr.clone()));
-                            }//endif
+                    removeTasks(ProxyReg.this);
+                    Lease expiredLease = e.getLease();
+                    // Maybe re-register
+                    Iterator<ProxyReg> it = joinSet.iterator();
+                    while (it.hasNext()){
+                        ProxyReg next = it.next();
+                        if (!ProxyReg.this.equals(next)) continue;
+                        if(expiredLease.equals(next.serviceLease)) {
+                            // Okay to re-register
+                            addTask(
+                                new LeaseExpireNotifyTask (ProxyReg.this,
+                                             (Entry[])lookupAttr.clone()));
                         }//endif
-                    }//end sync(joinSet)
+                    }
 		} else {
 		    fail(ex);
                 }//endif
@@ -1145,7 +1142,7 @@ public class JoinManager {
         /** The <code>ProxyRegTask</code> that instantiated this
          *  <code>ProxyReg</code>.
          */
-        volatile ProxyRegTask proxyRegTask;
+        volatile ProxyRegTask proxyRegTask;// writes sync on taskList
         /** The <i>prepared</i> proxy to the lookup service referenced by
          *  this class, and with which this join manager's service will be
          *  registered.
@@ -1155,10 +1152,10 @@ public class JoinManager {
          *  associated lookup service when this join manager registers its
          *  associated service.
          * 
-         * Access to reference synchronized on joinSet, but not referent
+         * Writes to reference synchronized on JoinManager.this, but not referent
          * as it has foreign remote methods.
          */
-	ServiceRegistration srvcRegistration = null;
+	volatile ServiceRegistration srvcRegistration = null;
         /* The <i>prepared</i> proxy to the lease on the registration of this
          * join manager's service with the this class' associated lookup
          * service.
@@ -1167,7 +1164,7 @@ public class JoinManager {
         /** The set of sub-tasks that are to be executed in order for the
          *  lookup service associated with the current instance of this class.
          */
-        final List taskList = new ArrayList(1);
+        final List<JoinTask> taskList = new ArrayList<JoinTask>(1);
         /** The instance of <code>DiscLeaseListener</code> that is registered
          *  with the lease renewal manager that handles the lease of this join
          *  manger's service.
@@ -1193,16 +1190,12 @@ public class JoinManager {
          *  @param task the task to add to the task queue
          */
         public void addTask(JoinTask task) {
-            synchronized(JoinManager.this) {
-                if(bTerminated) return;
-            }//end sync
+            if(bTerminated) return;
             synchronized(taskList) {
                 taskList.add(task);
                 if(this.proxyRegTask == null) {
                     this.proxyRegTask = new ProxyRegTask(this,taskSeqN++);
-                    synchronized (taskMgr) {
-                        taskMgr.add(this.proxyRegTask);
-                    }//end sync(taskMgr)
+                    taskMgr.add(this.proxyRegTask);
                 }//endif
             }//end sync(taskList)
         }//end addTask
@@ -1219,14 +1212,13 @@ public class JoinManager {
             if(proxy == null) throw new RuntimeException("proxy is null");
             /* The lookup service proxy was already prepared at discovery */
             ServiceItem tmpSrvcItem = null;
-            synchronized(joinSet) {
-                srvcRegistration = null;
-                synchronized(serviceItem) {//accessing serviceItem.serviceID
-                    tmpSrvcItem = new ServiceItem(serviceItem.serviceID,
-                                                  serviceItem.service,
-                                                  srvcAttrs);
-                }//end sync(serviceItem)
-            }//end sync(joinSet)
+            ServiceItem item = null;
+            srvcRegistration = null;
+            //accessing serviceItem.serviceID
+            item = serviceItem;
+            tmpSrvcItem = new ServiceItem(item.serviceID,
+                                              item.service,
+                                              srvcAttrs);
             /* Retrieve and prepare the proxy to the service registration */
             ServiceRegistration tmpSrvcRegistration 
                                 = proxy.register(tmpSrvcItem, renewalDuration);
@@ -1259,16 +1251,15 @@ public class JoinManager {
             leaseRenewalMgr.renewUntil(svcLease, Lease.FOREVER,
                                        renewalDuration, dListener);
             ServiceID tmpID = null;
-            synchronized(joinSet) {
-                srvcRegistration = tmpSrvcRegistration;
-                synchronized(serviceItem) {//accessing serviceItem.serviceID
-                    if(serviceItem.serviceID == null) {
-                        serviceItem.serviceID 
-                                            = srvcRegistration.getServiceID();
-                        tmpID = serviceItem.serviceID;
-                    }//endif
-                }//end sync(serviceItem)
-            }//end sync(joinSet)
+            srvcRegistration = tmpSrvcRegistration;
+            ServiceID id = srvcRegistration.getServiceID();
+            synchronized (JoinManager.this){
+                item = serviceItem;
+                if(item.serviceID == null) {
+                    serviceItem = new ServiceItem(id, item.service, item.attributeSets);
+                    tmpID = id;
+                }//endif
+            }
             if( (tmpID != null) && (callback != null) )  {
                 callback.serviceIDNotify(tmpID);
             }//endif
@@ -1280,11 +1271,8 @@ public class JoinManager {
          *  addition to that service's current set of attributes.
          */
         public void addAttributes(Entry[] attSet) throws Exception {
-            ServiceRegistration sr;
-            synchronized (joinSet){
-                sr = srvcRegistration;
-            }
-            sr.addAttributes(attSet);
+            ServiceRegistration sr = srvcRegistration;
+            if (sr != null) sr.addAttributes(attSet);
 	}//end ProxyReg.addAttributes
 
         /** With respect to the lookup service referenced in this class
@@ -1297,11 +1285,8 @@ public class JoinManager {
         public void modifyAttributes(Entry[] templ, Entry[] attSet)
                                                              throws Exception
         {
-            ServiceRegistration sr;
-            synchronized (joinSet){
-               sr = srvcRegistration;
-            }
-            sr.modifyAttributes(templ, attSet);
+            ServiceRegistration sr = srvcRegistration;
+            if (sr != null) sr.modifyAttributes(templ, attSet);
 	}//end ProxyReg.modifyAttributes		    
 
         /** With respect to the lookup service referenced in this class
@@ -1310,11 +1295,8 @@ public class JoinManager {
          *  set of attributes.
          */
         public void setAttributes(Entry[] attSet) throws Exception {
-            ServiceRegistration sr;
-            synchronized (joinSet){
-               sr = srvcRegistration;
-            }
-            sr.setAttributes(attSet);
+            ServiceRegistration sr = srvcRegistration;
+            if (sr != null) sr.setAttributes(attSet);
 	}//end ProxyReg.setAttributes
 
         /** Convenience method that encapsulates appropriate behavior when
@@ -1339,23 +1321,18 @@ public class JoinManager {
          * For more information, refer to Bug 4490355.
          */
 	public void fail(Throwable e) {
-	    synchronized(JoinManager.this) {
-		if(bTerminated) {
-		    return;
-		} else {
-		    LogUtil.logThrow(logger, Level.INFO, ProxyReg.class, "fail",
-			"JoinManager - failure for lookup service proxy: {0}",
-			new Object[] { proxy }, e);
-		    try {
-			discMgr.discard(proxy);
-		    } catch(IllegalStateException e1) {
-		       logger.log(Level.FINEST,
-				  "JoinManager - cannot discard lookup, "
-				  +"discovery manager already terminated",
-				  e1);
-		    }
-		}//endif
-	    }//end sync(JoinManager.this)
+		if(bTerminated) return;
+                LogUtil.logThrow(logger, Level.INFO, ProxyReg.class, "fail",
+                    "JoinManager - failure for lookup service proxy: {0}",
+                    new Object[] { proxy }, e);
+                try {
+                    discMgr.discard(proxy);
+                } catch(IllegalStateException e1) {
+                   logger.log(Level.FINEST,
+                              "JoinManager - cannot discard lookup, "
+                              +"discovery manager already terminated",
+                              e1);
+                }
 	}//end ProxyReg.fail
 
 	/** Returns true if the both objects' associated proxies are equal. */
@@ -1378,10 +1355,10 @@ public class JoinManager {
     private class DiscMgrListener implements DiscoveryListener {
 	/* Invoked when new or previously discarded lookup is discovered. */
 	public void discovered(DiscoveryEvent e) {
-	    synchronized(joinSet) {
 		ServiceRegistrar[] proxys
 				       = (ServiceRegistrar[])e.getRegistrars();
-		for(int i=0;i<proxys.length;i++) {
+                int l = proxys.length;
+		for(int i=0;i<l;i++) {
 		    /* Prepare the proxy to the discovered lookup service
 					 * before interacting with it.
 					 */
@@ -1414,26 +1391,24 @@ public class JoinManager {
 			}//endif
 		    }//endif
 		}//end loop
-	    }//end sync(joinSet)
 	}//end discovered
 
 	/* Invoked when previously discovered lookup is discarded. */
 	public void discarded(DiscoveryEvent e) {
-            synchronized(joinSet) {
-                ServiceRegistrar[] proxys
-                                      = (ServiceRegistrar[])e.getRegistrars();
-                for(int i=0;i<proxys.length;i++) {
-                    ProxyReg proxyReg = findReg(proxys[i]);
-		    if(proxyReg != null) {
-                        removeTasks(proxyReg);
-                        joinSet.remove(proxyReg);
-                        try {
-                            leaseRenewalMgr.remove( proxyReg.serviceLease );
-                        } catch(UnknownLeaseException ex) { /*ignore*/ }
-                        proxyReg.addTask(new DiscardProxyTask(proxyReg));
-		    }//endif
-                }//end loop
-            }//end sync(joinSet)
+            ServiceRegistrar[] proxys
+                                  = (ServiceRegistrar[])e.getRegistrars();
+            int l = proxys.length;
+            for(int i=0;i<l;i++) {
+                ProxyReg proxyReg = findReg(proxys[i]);
+                if(proxyReg != null) {
+                    removeTasks(proxyReg);
+                    joinSet.remove(proxyReg);
+                    try {
+                        leaseRenewalMgr.remove( proxyReg.serviceLease );
+                    } catch(UnknownLeaseException ex) { /*ignore*/ }
+                    proxyReg.addTask(new DiscardProxyTask(proxyReg));
+                }//endif
+            }//end loop
 	}//end discarded
     }//end class DiscMgrListener
 
@@ -1480,12 +1455,12 @@ public class JoinManager {
     /** Contains the reference to the service that is to be registered with
      *  all of the desired lookup services referenced by <code>discMgr</code>.
      */
-    private final ServiceItem serviceItem;
+    private volatile ServiceItem serviceItem = null; // writes sync on JoinManager.this
     /** Contains the attributes with which to associate the service in each
      *  of the lookup services with which this join manager registers the
      *  service.
      */
-    private Entry[] lookupAttr = null; // access sync on joinSet
+    private volatile Entry[] lookupAttr = null; // writes sync on JoinManager.this
     /** Contains the listener -- instantiated by the entity that constructs
      *  this join manager -- that will receive an event containing the
      *  service ID assigned to this join manager's service by one of the
@@ -1496,7 +1471,7 @@ public class JoinManager {
      *  references a proxy to one of the lookup services with which this
      *  join manager's service is registered.
      */
-    private final ArrayList joinSet = new ArrayList(1);
+    private final List<ProxyReg> joinSet = new CopyOnWriteArrayList<ProxyReg>();
     /** Contains the discovery manager that discovers the lookup services
      *  with which this join manager will register its associated service.
      */
@@ -1527,7 +1502,7 @@ public class JoinManager {
      */
     private final long renewalDuration;
     /** Flag that indicates if this join manager has been terminated. */
-    private boolean bTerminated = false; // All access sync on this.
+    private volatile boolean bTerminated = false; // write access sync on this.
     /* Preparer for the proxies to the lookup services that are discovered
      * and used by this utility.
      */
@@ -1916,11 +1891,8 @@ public class JoinManager {
      * @see net.jini.discovery.LookupDiscoveryManager
      */
     public DiscoveryManagement getDiscoveryManager(){
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
+        if(bTerminated) 
+            throw new IllegalStateException("join manager was terminated");
 	return discMgr; 
     }//end getDiscoveryManager
 
@@ -1945,11 +1917,8 @@ public class JoinManager {
      * @see net.jini.lease.LeaseRenewalManager
      */
     public LeaseRenewalManager getLeaseRenewalManager(){
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
+        if(bTerminated) 
+            throw new IllegalStateException("join manager was terminated");
 	return leaseRenewalMgr;
     }//end getLeaseRenewalManager
 
@@ -1967,23 +1936,15 @@ public class JoinManager {
      * @see net.jini.core.lookup.ServiceRegistrar
      */
     public ServiceRegistrar[] getJoinSet() {
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
+        if(bTerminated) throw new IllegalStateException("join manager was terminated");
+        List<ServiceRegistrar> retList = new LinkedList<ServiceRegistrar>();
+        for (Iterator<ProxyReg> iter = joinSet.iterator(); iter.hasNext(); ) {
+            ProxyReg proxyReg = iter.next();
+            if(proxyReg.srvcRegistration != null) {//test registration flag
+                retList.add(proxyReg.proxy);
             }//endif
-        }//end sync
-	synchronized(joinSet) {
-            ArrayList retList = new ArrayList(joinSet.size());
-	    int k = 0;
-	    for (Iterator iter = joinSet.iterator(); iter.hasNext(); ) {
-                ProxyReg proxyReg = (ProxyReg)iter.next();
-                if(proxyReg.srvcRegistration != null) {//test registration flag
-                    retList.add(proxyReg.proxy);
-                }//endif
-	    }//end loop
-            return ( (ServiceRegistrar[])(retList.toArray
-                                 (new ServiceRegistrar[retList.size()]) ) );
-	}//end sync(joinSet)
+        }//end loop
+        return ( (retList.toArray(new ServiceRegistrar[retList.size()]) ) );
     }//end getJoinSet
 
     /** 
@@ -2000,14 +1961,8 @@ public class JoinManager {
      * @see #setAttributes
      */
     public Entry[] getAttributes() {
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
-	synchronized(joinSet) {
-	    return (Entry[])lookupAttr.clone();
-	}//end sync(joinSet)
+        if(bTerminated) throw new IllegalStateException("join manager was terminated");
+        return (Entry[])lookupAttr.clone();
     }//end getAttributes
 
     /** 
@@ -2132,19 +2087,16 @@ public class JoinManager {
      * @see net.jini.lookup.entry.ServiceControlled
      */
     public void addAttributes(Entry[] attrSets, boolean checkSC) {
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
-	synchronized(joinSet) {
+        if(bTerminated) throw new IllegalStateException("join manager was terminated");
+	synchronized(this) {
 	    lookupAttr = LookupAttributes.add(lookupAttr, attrSets, checkSC);
-            serviceItem.attributeSets = lookupAttr;
-            for(int i=0;i<joinSet.size();i++) {
-                ProxyReg proxyReg = (ProxyReg)joinSet.get(i);
-                proxyReg.addTask(new AddAttributesTask(proxyReg,attrSets));
-            }//end loop
-	}//end sync(joinSet)
+            serviceItem = new ServiceItem(serviceItem.serviceID, serviceItem.service, lookupAttr);
+        }
+        Iterator<ProxyReg> it = joinSet.iterator();
+        while (it.hasNext()){
+            ProxyReg proxyReg = it.next();
+            proxyReg.addTask(new AddAttributesTask(proxyReg,attrSets));
+        }//end loop
     }//end addAttributes
 
     /** 
@@ -2186,20 +2138,19 @@ public class JoinManager {
      * @see #getAttributes
      */
     public void setAttributes(Entry[] attrSets) {
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
+        if(bTerminated) 
+            throw new IllegalStateException("join manager was terminated");
         testForNullElement(attrSets);
-	synchronized(joinSet) {
+	synchronized(this) {
             lookupAttr = (Entry[]) attrSets.clone();
-            serviceItem.attributeSets = lookupAttr;
-            for(int i=0;i<joinSet.size();i++) {
-                ProxyReg proxyReg = (ProxyReg)joinSet.get(i);
-                proxyReg.addTask(new SetAttributesTask(proxyReg,attrSets));
-            }//end loop
-	}//end sync(joinSet)
+            serviceItem = new ServiceItem(serviceItem.serviceID, 
+                                          serviceItem.service, lookupAttr);
+        }
+        Iterator<ProxyReg> it = joinSet.iterator();
+        while (it.hasNext()){
+            ProxyReg proxyReg = it.next();
+            proxyReg.addTask(new SetAttributesTask(proxyReg,attrSets));
+        }
     }//end setAttributes
 
     /** 
@@ -2323,22 +2274,21 @@ public class JoinManager {
                                  Entry[] attrSets,
                                  boolean checkSC)
     {
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
-	synchronized(joinSet) {
+        if(bTerminated) 
+            throw new IllegalStateException("join manager was terminated");
+	synchronized(this) {
 	    lookupAttr = LookupAttributes.modify(lookupAttr, attrSetTemplates,
                                                  attrSets, checkSC);
-            serviceItem.attributeSets = lookupAttr;
-            for(int i=0;i<joinSet.size();i++) {
-                ProxyReg proxyReg = (ProxyReg)joinSet.get(i);
-                proxyReg.addTask(new ModifyAttributesTask(proxyReg,
-                                                          attrSetTemplates,
-                                                          attrSets));
-            }//end loop
-	}//end sync(joinSet)
+            serviceItem = new ServiceItem(serviceItem.serviceID,
+                                          serviceItem.service, lookupAttr);
+        }//end sync
+        Iterator<ProxyReg> it = joinSet.iterator();
+        while (it.hasNext()){
+            ProxyReg proxyReg = it.next();
+            proxyReg.addTask(new ModifyAttributesTask(proxyReg,
+                                                      attrSetTemplates,
+                                                      attrSets));
+        }//end loop
     }//end modifyAttributes
 
     /**
@@ -2378,28 +2328,20 @@ public class JoinManager {
         synchronized(this) {
             if(bTerminated) return;//allow for multiple terminations
             bTerminated = true;
-            /* Terminate discovery and task management */
-            discMgr.removeDiscoveryListener(discMgrListener);
-            if(bCreateDiscMgr)  discMgr.terminate();
         }//end sync(this)
+        /* Terminate discovery and task management */
+        discMgr.removeDiscoveryListener(discMgrListener);
+        if(bCreateDiscMgr)  discMgr.terminate();
         terminateTaskMgr();
         /* Clear the joinSet and cancel all leases held by the service */
-        ArrayList srvcLeases = null;//store leases for use outside of sync blk
-	synchronized(joinSet) {
-            srvcLeases = new ArrayList(joinSet.size());
-	    for (Iterator iter = joinSet.iterator(); iter.hasNext(); ) {
-                srvcLeases.add
-                    ( (((ProxyReg)iter.next()).serviceLease) );
-	    }//end loop
-	    joinSet.clear();
-	}//end sync(joinSet)
-        /* Must cancel leases outside of sync block because of remote call */
-        if(srvcLeases == null) return;
-        for(int i=0;i<srvcLeases.size();i++) {
+        Iterator<ProxyReg> iter = joinSet.iterator();
+        while (iter.hasNext()) {
             try {
-                leaseRenewalMgr.cancel((Lease)srvcLeases.get(i));
-            } catch (Exception e) { }
+                leaseRenewalMgr.cancel(iter.next().serviceLease );
+            } catch (Exception e){}
         }//end loop
+        leaseRenewalMgr.close();
+        joinSet.clear();
     }//end terminate
 
     /** 
@@ -2705,9 +2647,7 @@ public class JoinManager {
         if(taskMgr == null) return;
         synchronized(proxyReg.taskList) {
             if(proxyReg.proxyRegTask != null) {
-                synchronized(taskMgr) {
-                    taskMgr.remove(proxyReg.proxyRegTask);
-                }//end sync(taskMgr)
+                taskMgr.remove(proxyReg.proxyRegTask);
                 proxyReg.proxyRegTask.cancel();//cancel retry in WakeupMgr
                 proxyReg.proxyRegTask = null;  //don't reuse because of seq#
             }//endif
@@ -2725,21 +2665,23 @@ public class JoinManager {
             /* Cancel all tasks scheduled for future retry by the wakeup mgr */
             wakeupMgr.cancelAll();//cancel all tickets
             wakeupMgr.stop();//stop execution of the wakeup manager
-            synchronized(taskMgr) {
-                /* Remove all pending tasks */
-                ArrayList pendingTasks = taskMgr.getPending();
-                for(int i=0;i<pendingTasks.size();i++) {
-                    RetryTask pendingTask = (RetryTask)pendingTasks.get(i);
-                    pendingTask.cancel();//cancel wakeup ticket
-                    taskMgr.remove(pendingTask);//remove from task mgr
-                }//end loop
-                /* Interrupt all active tasks, prepare taskMgr for GC. */
-                taskMgr.terminate();
+        }
+        synchronized(taskMgr) {
+            /* Remove all pending tasks */
+            ArrayList pendingTasks = taskMgr.getPending();
+            for(int i=0;i<pendingTasks.size();i++) {
+                RetryTask pendingTask = (RetryTask)pendingTasks.get(i);
+                pendingTask.cancel();//cancel wakeup ticket
+                taskMgr.remove(pendingTask);//remove from task mgr
+            }//end loop
+            /* Interrupt all active tasks, prepare taskMgr for GC. */
+            taskMgr.terminate();
+        }
         // Too lazy to put out the trash.
 //                taskMgr = null;
-            }//end sync(taskMgr)
+//            }//end sync(taskMgr)
 //            wakeupMgr = null;
-        }//end sync(wakeupMgr)
+//        }//end sync(wakeupMgr)
     }//end terminateTaskMgr
 
     /** Examines the elements of the input set and, upon finding at least one
@@ -2768,16 +2710,13 @@ public class JoinManager {
                                        Entry[] attrSets,
                                        boolean doAttrs)
     {
-        synchronized(this) {
-            if(bTerminated) {
-                throw new IllegalStateException("join manager was terminated");
-            }//endif
-        }//end sync
+        if(bTerminated) 
+            throw new IllegalStateException("join manager was terminated");
 	if(!(serviceProxy instanceof java.io.Serializable)) {
             throw new IllegalArgumentException
                                         ("serviceProxy must be Serializable");
 	}//endif
-	synchronized(joinSet) {
+	synchronized(this) {
             if(doAttrs) {
                 if(attrSets == null) {
                     lookupAttr = new Entry[0];
@@ -2787,18 +2726,18 @@ public class JoinManager {
                     lookupAttr = attrSets;
                 }//endif
             }//endif
-            serviceItem.service = serviceProxy;
-            serviceItem.attributeSets = lookupAttr;
-            for(int i=0;i<joinSet.size();i++) {
-                ProxyReg proxyReg = (ProxyReg)(joinSet.get(i));
-                removeTasks(proxyReg);
-                try {
-                    leaseRenewalMgr.remove( proxyReg.serviceLease );
-                } catch (UnknownLeaseException e) { }
-                proxyReg.addTask(new RegisterTask(proxyReg,
+            serviceItem = new ServiceItem(serviceItem.serviceID, serviceProxy, lookupAttr);
+        }
+        Iterator<ProxyReg> it = joinSet.iterator();
+        while (it.hasNext()){
+            ProxyReg proxyReg = it.next();
+            removeTasks(proxyReg);
+            try {
+                leaseRenewalMgr.remove( proxyReg.serviceLease );
+            } catch (UnknownLeaseException e) { }
+            proxyReg.addTask(new RegisterTask(proxyReg,
                                                  (Entry[])lookupAttr.clone()));
-            }//end loop
-	}//end sync(joinSet)
+        }//end loop
     }//end replaceRegistrationDo
 
 }//end class JoinManager

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1554723&r1=1554722&r2=1554723&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Thu Jan  2 02:45:07 2014
@@ -21,6 +21,7 @@ import com.sun.jini.logging.Levels;
 import com.sun.jini.lookup.entry.LookupAttributes;
 import com.sun.jini.proxy.BasicProxyTrustVerifier;
 import com.sun.jini.thread.TaskManager;
+import com.sun.jini.thread.TaskManager.Task;
 import java.io.IOException;
 import java.rmi.RemoteException;
 import java.rmi.server.ExportException;
@@ -605,7 +606,7 @@ import net.jini.security.proxytrust.Serv
 public class ServiceDiscoveryManager {
 
     /** Class for implementing register/lookup/notify/dropProxy/discard tasks*/
-    private static abstract class CacheTask implements TaskManager.Task {
+    private static abstract class CacheTask implements Runnable {
         protected final ProxyReg reg;
         protected volatile long thisTaskSeqN;
         public CacheTask(ProxyReg reg, long seqN) {
@@ -617,14 +618,6 @@ public class ServiceDiscoveryManager {
             if(this.reg == null) return false;
             return (this.reg).equals(reg);
         }//end isFromProxy
-        /** Returns true if current instance must be run after task(s) in
-         *  task manager queue.
-         *  @param tasks the tasks to consider.
-         *  @param size elements with index less than size are considered.
-         */
-        public boolean runAfter(List tasks, int size) {
-            return false;
-        }//end runAfter
 
         /** Returns the ProxyReg associated with this task (if any). */
         public ProxyReg getProxyReg() {
@@ -643,7 +636,7 @@ public class ServiceDiscoveryManager {
      *  corresponding to a particular serviceID associated with a particular
      *  lookup service.
      */
-    private static abstract class ServiceIdTask extends CacheTask {
+    private static abstract class ServiceIdTask extends CacheTask implements Task {
         protected final ServiceID thisTaskSid;
         ServiceIdTask(ServiceID srvcId, ProxyReg reg, long seqN) {
             super(reg, seqN);
@@ -666,7 +659,7 @@ public class ServiceDiscoveryManager {
          */
         public boolean runAfter(List tasks, int size) {
             for(int i=0; i<size; i++) {
-                TaskManager.Task t = (TaskManager.Task)tasks.get(i);
+                Runnable t = (Runnable) tasks.get(i);
                 //Compare only instances of this task class
                 if( !(t instanceof ServiceIdTask) )  continue;
                 ServiceID otherTaskSid = ((ServiceIdTask)t).getServiceID();
@@ -1001,7 +994,7 @@ public class ServiceDiscoveryManager {
 	}//end class LookupCacheImpl.RegisterListenerTask
 
 	/** This class requests a "snapshot" of the given registrar's state.*/
-        private final class LookupTask extends CacheTask {
+        private final class LookupTask extends CacheTask implements Task {
 	    private final EventReg eReg;
             public LookupTask(ProxyReg reg, long seqN, EventReg eReg) {
                 super(reg, seqN);
@@ -1104,7 +1097,7 @@ public class ServiceDiscoveryManager {
                         }//endif
                     }//endif
                 }//end loop
-                return super.runAfter(tasks, size);
+                return false;
             }//end runAfter
 
 	}//end class LookupCacheImpl.LookupTask
@@ -1239,13 +1232,13 @@ public class ServiceDiscoveryManager {
              */
             public boolean runAfter(List tasks, int size) {
                 for(int i=0; i<size; i++) {
-                    CacheTask t = (CacheTask)tasks.get(i);
+                    Runnable t = (Runnable)tasks.get(i);
                     if(   t instanceof RegisterListenerTask
                        || t instanceof LookupTask )
                     {
-                        ProxyReg otherReg = t.getProxyReg();
+                        ProxyReg otherReg = ((CacheTask)t).getProxyReg();
                         if( reg.equals(otherReg) ) {
-                            if(thisTaskSeqN > t.getSeqN()) return true;
+                            if(thisTaskSeqN > ((CacheTask)t).getSeqN()) return true;
                         }//endif
                     }//endif
                 }//end loop
@@ -1260,7 +1253,7 @@ public class ServiceDiscoveryManager {
          *  a filter retry on an item in which the cache's filter initially
          *  returned indefinite.
          */
-        private final class ServiceDiscardTimerTask implements TaskManager.Task
+        private final class ServiceDiscardTimerTask implements Runnable
         {
             private final ServiceID serviceID;
 	    private final long endTime;
@@ -1367,14 +1360,6 @@ public class ServiceDiscoveryManager {
                 logger.finest("ServiceDiscoveryManager - "
                               +"ServiceDiscardTimerTask completed");
             }//end run
-            /** Returns true if current instance must be run after task(s) in
-             *  task manager queue.
-             *  @param tasks the tasks to consider.
-             *  @param size elements with index less than size are considered.
-             */
-            public boolean runAfter(List tasks, int size) {
-                return false;
-            }//end runAfter
         }//end class LookupCacheImpl.ServiceDiscardTimerTask
 
 	/** Task class used to asynchronously process the service state
@@ -1932,7 +1917,7 @@ public class ServiceDiscoveryManager {
          *  when the given ProxyReg has been discarded.
 	 */
 	private void removeUselessTask(ProxyReg reg) {
-            ArrayList pendingTasks = cacheTaskMgr.getPending();
+            List pendingTasks = cacheTaskMgr.getPending();
             for(int i=0;i<pendingTasks.size();i++) {
                 CacheTask t = (CacheTask)pendingTasks.get(i);
                 if(t.isFromProxy(reg)) cacheTaskMgr.remove(t);
@@ -1945,7 +1930,7 @@ public class ServiceDiscoveryManager {
         private void terminateTaskMgr(TaskManager taskMgr) {
             synchronized(taskMgr) {
                 /* Remove all pending tasks */
-                ArrayList pendingTasks = taskMgr.getPending();
+                List pendingTasks = taskMgr.getPending();
                 for(int i=0;i<pendingTasks.size();i++) {
                     taskMgr.remove((TaskManager.Task)pendingTasks.get(i));
                 }//end loop
@@ -3250,6 +3235,7 @@ public class ServiceDiscoveryManager {
             LookupCacheImpl cache = (LookupCacheImpl)iter.next();
             cache.terminate();
         }//end loop
+        leaseRenewalMgr.close();
     }//end terminate
 
     /**

Added: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java?rev=1554723&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java Thu Jan  2 02:45:07 2014
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.river.api.util.Startable;
+
+/**
+ * The intent of this Executor is to groups tasks into synchronous queues.
+ * @author peter
+ */
+public class SynchronousExecutors implements Startable {
+    private static final Logger logger = Logger.getLogger("org.apache.river.impl");
+    private final Lock distributorLock;
+    private final Condition workToDo;
+    private List<Queue<Callable>> queues ;
+    private volatile int limit = -1;
+    private final Distributor distributor;
+    private final Thread distributorThread;
+    private final ScheduledExecutorService pool;
+    private final AtomicBoolean distributorWaiting;
+    
+    public SynchronousExecutors(ScheduledExecutorService pool){
+        queues = new ArrayList<Queue<Callable>>(24);
+        this.pool = pool;
+        distributorLock = new ReentrantLock();
+        workToDo = distributorLock.newCondition();
+        distributorWaiting = new AtomicBoolean(false);
+        distributor = new Distributor(queues, pool, distributorLock, workToDo, distributorWaiting);
+        distributorThread = new Thread(distributor ,"SynchronousQueueArray distributor");
+        distributorThread.setDaemon(true);
+    }
+    
+    void addQueue(Queue<Callable> queue){
+        synchronized (queues){
+            queues.add(queue);
+        }
+    }
+    
+    boolean removeQueue(Object queue){
+        synchronized (queues){
+            return queues.remove(queue);
+        }
+    }
+
+    @Override
+    public void start() throws Exception {
+        distributorThread.start();
+    }
+    
+    public void shutdown() {
+        distributorThread.interrupt();
+    }
+    
+    public <T> ExecutorService newExecutor(){
+        QueueWrapper que = new QueueWrapper<T>(new LinkedBlockingQueue<Callable<T>>());
+        ExecutorService serv = new SynchronousExecutor<T>(que, distributorWaiting, distributorLock, workToDo, pool);
+        addQueue(que);
+        return serv;
+    }
+    
+    private static class SynchronousExecutor<T> implements ExecutorService {
+        
+        QueueWrapper<T> queue;
+        AtomicBoolean waiting;
+        final Lock lock;
+        final Condition workToDo;
+        final ScheduledExecutorService pool;
+
+        SynchronousExecutor(QueueWrapper<T> queue, AtomicBoolean waiting, Lock lock, Condition cond, ScheduledExecutorService pool){
+            this.queue = queue;
+            this.waiting = waiting;
+            this.lock = lock;
+            workToDo = cond;
+            this.pool = pool;
+        }
+        @Override
+        public void shutdown() {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            if (task == null) throw new NullPointerException("task cannot be null");
+            Task t = new Task<T>(task, queue, lock, workToDo, pool);
+            if (queue.offer(t)){
+                if (waiting.get() && !queue.stalled){
+                    lock.lock();
+                    try {
+                        workToDo.signalAll();
+                    } finally {
+                        lock.unlock();
+                    }
+                }
+                return t;
+            }
+            throw new RejectedExecutionException("task rejected, queue likely full");
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            throw new UnsupportedOperationException("Not supported."); 
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            throw new UnsupportedOperationException("Not supported."); 
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            List<Future<T>> result = new ArrayList<Future<T>>(tasks.size());
+            Iterator<? extends Callable<T>> it = tasks.iterator();
+            while (it.hasNext()){
+                result.add(submit(it.next()));
+            }
+            return result;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            throw new UnsupportedOperationException("Not supported yet."); 
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+            throw new UnsupportedOperationException("Not supported yet."); 
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException("Not supported.");
+        }
+        
+    }
+    
+    private static class Distributor implements Runnable {
+        
+        private final Random selector = new Random();
+        private final List<Queue<Callable>> queues;
+        private final ScheduledExecutorService executor;
+        private final Lock lock;
+        private final Condition workToDo;
+        private final AtomicBoolean waiting;
+        
+        Distributor(List<Queue<Callable>> queues, ScheduledExecutorService executor, Lock lock, Condition workToDo, AtomicBoolean waiting){
+            this.queues = queues;
+            this.executor = executor;
+            this.lock = lock;
+            this.workToDo = workToDo;
+            this.waiting = waiting;
+        }
+
+        @Override
+        public void run() {
+            int nullCount = 0; // sequence of null tasks
+            int size = 0;
+            try {
+                while (!Thread.currentThread().isInterrupted()){
+                    try {
+                        Queue<Callable> queue = null;
+                        synchronized (queues){
+                            size = queues.size();
+                            if (size > 0){
+                                int index = selector.nextInt(size);
+                                queue = queues.get(index);
+                            }
+                        }
+                        Callable task = queue != null ? queue.peek() : null;
+                        if (task != null){ // Also checks for null
+                            long delay = 0;
+                            if (task instanceof Task) delay = ((Task)task).delay();
+                            if (delay == 0) executor.submit(task);
+                            else executor.schedule(task, delay, TimeUnit.MILLISECONDS);
+                            nullCount = 0; // reset null count
+                            continue;
+                        }
+                        // If we get here, we've hit either a task that's executing
+                        // or a null task.
+                        nullCount++;
+                        if (nullCount > size * 2) {
+                            // Time for a nap.
+                            lock.lock();
+                            try {
+                                waiting.set(true);
+                                workToDo.await(2, TimeUnit.SECONDS);
+                            } catch (InterruptedException ex) {
+                                Thread.currentThread().interrupt(); // restore
+                            } finally {
+                                waiting.set(false);
+                                lock.unlock();
+                            }
+                        }
+                    } catch (Exception e){
+                        System.out.println(e);
+                        logger.log(Level.FINE, "Exception thrown by distributor: {0}", e);
+                    }
+                }// end while
+            } finally {
+                executor.shutdown();
+            }
+        }
+    }
+    
+    private static class QueueWrapper<T> extends AbstractQueue<Callable<T>> implements Queue<Callable<T>>{
+        
+        final ReentrantLock lock; // lock to control the head of the queue.
+        final Queue<Callable<T>> queue;
+        Callable<T> peek;// Only ever accessed by distributor thread.
+        boolean stalled;
+        
+        QueueWrapper(Queue<Callable<T>> queue){
+            this.queue = queue;
+            lock = new ReentrantLock();
+            peek = null;
+            stalled = false;
+        }
+
+        @Override
+        public Iterator<Callable<T>> iterator() {
+            return queue.iterator();
+        }
+
+        @Override
+        public int size() {
+            return queue.size();
+        }
+
+        @Override
+        public boolean offer(Callable<T> e) {
+            return queue.offer(e);
+        }
+
+        @Override
+        public Callable<T> poll() {
+            lock.lock();
+            try {
+                return queue.poll();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @Override
+        public Callable<T> peek() {
+            boolean locked = lock.tryLock();
+            if (!locked) return null; // Pretend queue empty so director tries another queue.
+            try {
+                if (peek == null) {
+                    peek = queue.peek();
+                    return peek;
+                } else {
+                    stalled = true;
+                    return null; // Pretend queue empty so director tries another queue.
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+        
+    }
+    
+    private static class Task<T> implements Callable<T>, Future<T> {
+        
+        /**
+         * Default delay backoff times. 
+         *
+         * @see #retryTime 
+         */
+        private static final long[] delays = {
+             0, // First value is never read
+             TimeUnit.SECONDS.toMillis(1),
+             TimeUnit.SECONDS.toMillis(5),
+             TimeUnit.SECONDS.toMillis(10),
+             TimeUnit.MINUTES.toMillis(1),
+             TimeUnit.MINUTES.toMillis(1),
+             TimeUnit.MINUTES.toMillis(5)
+        };
+        
+        
+        volatile boolean complete = false;
+        volatile boolean cancelled = false;
+        volatile T result = null;
+        volatile Exception exception = null;
+        volatile Thread executorThread;
+        private final Callable<T> task;
+        private final QueueWrapper queue;
+        private final Lock executorLock;
+        private final Condition waiting;
+        private final Condition resultAwait;
+        private final ScheduledExecutorService exec;
+        private int attempt;
+        private volatile long retryTime;
+        
+        Task(Callable<T> c, QueueWrapper wrapper, Lock executorLock, Condition distributorWaiting, ScheduledExecutorService exec){
+            task = c;
+            queue = wrapper;
+            this.waiting = distributorWaiting;
+            resultAwait = queue.lock.newCondition();
+            attempt = 0;
+            this.executorLock = executorLock;
+            this.exec = exec;
+        }
+        
+        /**
+         * Return the next time at which we should make another attempt.
+         * This is <em>not</em> an interval, but the actual time.
+         * <p>
+         * The implementation is free to do as it pleases with the policy
+         * here.  The default implementation is to delay using intervals of
+         * 1 second, 5 seconds, 10 seconds, and 1 minute between
+         * attempts, and then retrying every five minutes forever.
+         * <p>
+         * The default implementation assumes it is being called from
+         * the default <code>run</code> method and that the current thread
+         * holds the lock on this object. If the caller does
+         * not own the lock the result is undefined and could result in an
+         * exception.
+         */
+        long delay() {
+            int index = (attempt < delays.length ? attempt : delays.length - 1); 
+            return delays[index];
+        }
+
+        public T call() throws Exception {
+            if (cancelled) return null;
+            boolean reschedule = false;
+            queue.lock.lock();
+            try {
+                result = task.call();
+                if (((Task)queue.peek).task == task 
+                        && task == ((Task)queue.queue.peek()).task)
+                {
+                    queue.queue.poll(); // Remove successfully completed task from queue.
+                } 
+                queue.peek = null; // set peek to null to unblock queue.
+                queue.stalled = false;
+                executorLock.lock();
+                try {
+                    waiting.signalAll();
+                } finally {
+                    executorLock.unlock();
+                }
+                complete = true;
+                resultAwait.signalAll();
+                return result;
+            } catch (Exception e) {
+                exception = e;
+                reschedule = true;
+                throw e;
+            } finally {
+                if (reschedule) {
+                    attempt++;
+                    queue.peek = null; // set peek to null to unblock queue.
+                    executorLock.lock();
+                    try {
+                        waiting.signalAll();
+                    } finally {
+                        executorLock.unlock();
+                    }
+                }
+                queue.lock.unlock();
+            }
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        @Override
+        public boolean isDone() {
+            return complete;
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+            if (complete) {
+                if (exception != null) throw new ExecutionException(exception);
+                return result;
+            }
+            queue.lock.lock();
+            try {
+                while (!complete){
+                    resultAwait.await();
+                }
+                return result;
+            } finally {
+                queue.lock.unlock();
+            }
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            if (complete) return result;
+            long begin = System.currentTimeMillis();
+            if (queue.lock.tryLock(timeout, unit)){
+                try {
+                    while (!complete){
+                        long remain = unit.toMillis(timeout) - (System.currentTimeMillis() - begin);
+                        if ( 1L > remain ) {
+                            if (exception != null) throw new ExecutionException(exception);
+                            throw new TimeoutException(
+                                    "Timed out while waiting for result");
+                        }
+                        resultAwait.await(remain, TimeUnit.MILLISECONDS);
+                    }
+                    return result;
+                } finally {
+                    queue.lock.unlock();
+                }
+            } else {
+                throw new TimeoutException("Timed out while waiting for lock");
+            }
+        }
+
+        
+    
+    }
+    
+}

Added: river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java?rev=1554723&view=auto
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java (added)
+++ river/jtsk/skunk/qa_refactor/trunk/test/src/org/apache/river/impl/thread/SynchronouExecutorsTest.java Thu Jan  2 02:45:07 2014
@@ -0,0 +1,186 @@
+/*
+ * To change this license header, choose License Headers in Project Properties.
+ * To change this template file, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.river.impl.thread;
+
+import java.rmi.RemoteException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author peter
+ */
+public class SynchronouExecutorsTest {
+    
+    
+    
+    public SynchronouExecutorsTest() {
+    }
+    
+    @Before
+    public void setUp() {
+    }
+    
+    @After
+    public void tearDown() {
+    }
+
+    /**
+     * Test of newExecutor method, of class SynchronousQueueArrayExecutor.
+     */
+    @Test
+    public void testNewExecutor() {
+        System.out.println("newExecutor");
+        SynchronousExecutors instance = new SynchronousExecutors(new Exec());
+        try {
+            instance.start();
+        } catch (Exception ex) {
+            ex.printStackTrace(System.out);
+        }
+        ExecutorService exec = instance.newExecutor();
+        Future future = exec.submit(new Exceptional());
+        Object result = null;
+        try {
+            result = future.get(8, TimeUnit.MINUTES);
+        } catch (InterruptedException ex) {
+            ex.printStackTrace(System.out);
+        } catch (ExecutionException ex) {
+            ex.printStackTrace(System.out);
+        } catch (TimeoutException ex) {
+            ex.printStackTrace(System.out);
+        }
+        assertEquals("success", result);
+        instance.shutdown();
+    }
+    
+    private static class Exec implements ScheduledExecutorService {
+        
+        private final ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);
+
+        @Override
+        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+            return ses.schedule(command, delay, unit);
+        }
+
+        @Override
+        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+            System.out.println(System.currentTimeMillis());
+            System.out.println("schedule:" + delay + unit);
+            return ses.schedule(callable, delay, unit);
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+            return ses.scheduleAtFixedRate(command, initialDelay, period, unit);
+        }
+
+        @Override
+        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+            return ses.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+        }
+
+        @Override
+        public void shutdown() {
+            System.out.println("shutdown called at:");
+            System.out.println(System.currentTimeMillis());
+            ses.shutdown();
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return ses.shutdownNow();
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return ses.isShutdown();
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return ses.isTerminated();
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return ses.awaitTermination(timeout, unit);
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            System.out.println("submit called");
+            return ses.submit(task);
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return ses.submit(task, result);
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return ses.submit(task);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return ses.invokeAll(tasks);
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            return ses.invokeAll(tasks, timeout, unit);
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
+        }
+        
+    }
+    
+    private static class Exceptional implements Callable {
+        private final AtomicInteger tries = new AtomicInteger(0);
+        @Override
+        public Object call() throws Exception {
+            System.out.println("Task called at:");
+            System.out.println(System.currentTimeMillis());
+            int tri = tries.incrementAndGet();
+            if (tri < 7) throw new RemoteException("Dummy communication problem");
+            return "success";
+        }
+        
+    }
+}



Mime
View raw message