river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1557469 - /river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
Date Sat, 11 Jan 2014 21:23:29 GMT
Author: peter_firmstone
Date: Sat Jan 11 21:23:28 2014
New Revision: 1557469

URL: http://svn.apache.org/r1557469
Log:
JoinManager - replace TaskManager with ExecutorService

Modified:
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1557469&r1=1557468&r2=1557469&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Sat Jan 11 21:23:28
2014
@@ -20,10 +20,8 @@ package net.jini.lookup;
 import com.sun.jini.constants.ThrowableConstants;
 import com.sun.jini.lookup.entry.LookupAttributes;
 import com.sun.jini.thread.RetryTask;
-import com.sun.jini.thread.TaskManager;
 import com.sun.jini.thread.WakeupManager;
 import com.sun.jini.logging.LogUtil;
-import com.sun.jini.thread.TaskManager.Task;
 
 import net.jini.config.Configuration;
 import net.jini.config.ConfigurationException;
@@ -50,13 +48,24 @@ import net.jini.core.lookup.ServiceRegis
 import java.io.IOException;
 
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import org.apache.river.api.util.FutureObserver;
+import org.apache.river.impl.thread.ExtensibleExecutorService;
+import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory;
+import org.apache.river.impl.thread.NamedThreadFactory;
 
 /**
  * A goal of any well-behaved service is to advertise the facilities and
@@ -441,6 +450,23 @@ import java.util.logging.Logger;
  * @see java.util.logging.Logger
  */
 public class JoinManager {
+    
+    /**
+     * executorService requires a PriorityBlockingQueue, this is the 
+     * comparator for that queue.
+     */
+    public static class ExecutorQueueComparator implements Comparator {
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            int one = ((ProxyRegTask)o1).getSeqN();
+            int two = ((ProxyRegTask)o2).getSeqN();
+            if (one < two) return -1;
+            if (one > two) return 1;
+            return 0;
+        }
+        
+    }
 
     /** Implementation Note:
      *
@@ -564,7 +590,7 @@ public class JoinManager {
      */
 
     /** Abstract base class from which all of the task classes are derived. */
-    private class ProxyRegTask extends RetryTask implements Task {
+    private class ProxyRegTask extends RetryTask {
         private final long[] sleepTime = { 5*1000, 10*1000, 15*1000,
                                           20*1000, 25*1000, 30*1000 };
         // volatile fields only mutated while synchronized on proxyReg.taskList
@@ -679,22 +705,22 @@ public class JoinManager {
          *  @param tasks the tasks with which to compare the current task
          *  @param size  elements with index less than size are considered
          */
-        public boolean runAfter(List tasks, int size) {
-            /* If the service's ID has already been set, then it's okay
-             * to run all ProxyRegTask's in parallel, otherwise, the
-             * ProxyRegTask with the lowest sequence number should be run.
-             */
-            if(serviceItem.serviceID != null)  return false;
-            /* For task with lowest seq #, run it now; else run it later */
-            for(int i=0; i<size; i++) {
-                Object t = tasks.get(i);
-                if (t instanceof ProxyRegTask){
-                    int nextTaskSeqN = ((ProxyRegTask)t).getSeqN();
-                    if( seqN > nextTaskSeqN )  return true;
-                }
-            }//end loop
-            return false;
-        }//end runAfter
+//        public boolean runAfter(List tasks, int size) {
+//            /* If the service's ID has already been set, then it's okay
+//             * to run all ProxyRegTask's in parallel, otherwise, the
+//             * ProxyRegTask with the lowest sequence number should be run.
+//             */
+//            if(serviceItem.serviceID != null)  return false;
+//            /* For task with lowest seq #, run it now; else run it later */
+//            for(int i=0; i<size; i++) {
+//                Object t = tasks.get(i);
+//                if (t instanceof ProxyRegTask){
+//                    int nextTaskSeqN = ((ProxyRegTask)t).getSeqN();
+//                    if( seqN > nextTaskSeqN )  return true;
+//                }
+//            }//end loop
+//            return false;
+//        }//end runAfter
 
         /** Accessor method that returns the instance of <code>ProxyReg</code>
          *  (the lookup service) associated with the task represented by
@@ -1019,7 +1045,8 @@ public class JoinManager {
      *  service to discover, and with which this join manager's service
      *  should be registered.
      */
-    private class ProxyReg {
+    private class ProxyReg implements FutureObserver{
+       
         /** Class that is registered as a listener with this join manager's
          *  lease renewal manager. That lease renewal manager manages the
          *  lease granted to this join manager's associated service by the
@@ -1165,11 +1192,13 @@ public class JoinManager {
         /** The set of sub-tasks that are to be executed in order for the
          *  lookup service associated with the current instance of this class.
          */
-        final List<JoinTask> taskList = new ArrayList<JoinTask>(1);
+        final List<JoinTask> taskList = new ArrayList<JoinTask>();
         /** The instance of <code>DiscLeaseListener</code> that is registered
          *  with the lease renewal manager that handles the lease of this join
          *  manger's service.
          */
+        final List<Future> runningTasks = new ArrayList<Future>();
+        
 	private final DiscLeaseListener dListener = new DiscLeaseListener();
 
         /** Constructor that associates this class with the lookup service
@@ -1183,7 +1212,24 @@ public class JoinManager {
 	    if(proxy == null)  throw new IllegalArgumentException
                                                       ("proxy can't be null");
 	    this.proxy = proxy;
-	}//end constructor	    
+	}//end constructor	
+        
+        @Override
+        public void futureCompleted(Future e) {
+            synchronized (runningTasks){
+                runningTasks.remove(e);
+            }
+        }
+        
+        public void terminate(){
+            synchronized (runningTasks){
+                Iterator<Future> it = runningTasks.iterator();
+                while (it.hasNext()){
+                    it.next().cancel(false);
+                }
+                runningTasks.clear();
+            }
+        }
 
         /** Convenience method that adds new sub-tasks to this class' 
          *  task queue.
@@ -1192,13 +1238,18 @@ public class JoinManager {
          */
         public void addTask(JoinTask task) {
             if(bTerminated) return;
+            Future future = null;
             synchronized(taskList) {
                 taskList.add(task);
                 if(this.proxyRegTask == null) {
                     this.proxyRegTask = new ProxyRegTask(this,taskSeqN++);
-                    taskMgr.add(this.proxyRegTask);
+                    this.proxyRegTask.addObserver(this);
+                    future = taskMgr.submit(this.proxyRegTask);
                 }//endif
             }//end sync(taskList)
+            synchronized (runningTasks){
+                runningTasks.add(future);
+            }
         }//end addTask
 
         /** Registers the service associated with this join manager with the
@@ -1438,7 +1489,7 @@ public class JoinManager {
      *  "backoff strategy") - the re-execution of each failed task in this
      *  <code>TaskManager</code>.
      */
-    private final TaskManager taskMgr;
+    private final ExecutorService taskMgr;
     /** Maximum number of times a failed task is allowed to be re-executed. */
     private final int maxNRetries;
     /** Wakeup manager for the various tasks executed by this join manager.
@@ -2426,7 +2477,7 @@ public class JoinManager {
         ProxyPreparer registrarPreparer;
         ProxyPreparer registrationPreparer;
         ProxyPreparer serviceLeasePreparer;
-        TaskManager taskManager;
+        ExecutorService taskManager;
         WakeupManager wakeupManager;
         Integer maxNretrys;
         LeaseRenewalManager leaseRenewalManager;
@@ -2437,7 +2488,7 @@ public class JoinManager {
         Conf (  ProxyPreparer registrarPreparer,
                 ProxyPreparer registrationPreparer,
                 ProxyPreparer serviceLeasePreparer,
-                TaskManager taskManager,
+                ExecutorService taskManager,
                 WakeupManager wakeupManager,
                 Integer maxNretrys,
                 LeaseRenewalManager leaseRenewalManager,
@@ -2512,29 +2563,36 @@ public class JoinManager {
         /* Retrieve configuration items if applicable */
         if(config == null)  throw new NullPointerException("config is null");
         /* Proxy preparers */
-        ProxyPreparer registrarPreparer = (ProxyPreparer)config.getEntry
+        ProxyPreparer registrarPreparer = config.getEntry
                                                    (COMPONENT_NAME,
                                                     "registrarPreparer",
                                                     ProxyPreparer.class,
                                                     new BasicProxyPreparer());
-        ProxyPreparer registrationPreparer = (ProxyPreparer)config.getEntry
+        ProxyPreparer registrationPreparer = config.getEntry
                                                    (COMPONENT_NAME,
                                                     "registrationPreparer",
                                                     ProxyPreparer.class,
                                                     new BasicProxyPreparer());
-        ProxyPreparer serviceLeasePreparer = (ProxyPreparer)config.getEntry
+        ProxyPreparer serviceLeasePreparer = config.getEntry
                                                    (COMPONENT_NAME,
                                                     "serviceLeasePreparer",
                                                     ProxyPreparer.class,
                                                     new BasicProxyPreparer());
         /* Task manager */
-        TaskManager taskMgr;
+        ExecutorService taskMgr;
         try {
             taskMgr = config.getEntry(COMPONENT_NAME,
-                                                   "taskManager",
-                                                   TaskManager.class);
+                                       "executorService",
+                                       ExecutorService.class);
         } catch(NoSuchEntryException e) { /* use default */
-            taskMgr = new TaskManager(MAX_N_TASKS,(15*1000),1.0f);
+            taskMgr = new ThreadPoolExecutor(
+                    1,
+                    MAX_N_TASKS,
+                    15,
+                    TimeUnit.SECONDS,
+                    new PriorityBlockingQueue(100, new ExecutorQueueComparator()),
+                    new NamedThreadFactory("JoinManager executor thread", false)
+            );
         }
         /* Wakeup manager */
         WakeupManager wakeupMgr;
@@ -2547,7 +2605,7 @@ public class JoinManager {
                                     (new WakeupManager.ThreadDesc(null,true));
         }
         /* Max number of times to re-schedule tasks in thru wakeup manager */
-        Integer maxNRetries = (config.getEntry
+        int maxNRetries = (config.getEntry
                                         (COMPONENT_NAME,
                                          "wakeupRetries",
                                          int.class,
@@ -2555,7 +2613,7 @@ public class JoinManager {
         /* Lease renewal manager */
 	if(leaseMgr == null) {
             try {
-                leaseMgr = (LeaseRenewalManager)config.getEntry
+                leaseMgr = config.getEntry
                                                   (COMPONENT_NAME,
                                                    "leaseManager",
                                                    LeaseRenewalManager.class);
@@ -2563,7 +2621,7 @@ public class JoinManager {
                 leaseMgr = new LeaseRenewalManager(config);
             }
         }//endif
-        Long renewalDuration = (config.getEntry
+        long renewalDuration = (config.getEntry
                                       (COMPONENT_NAME,
                                        "maxLeaseDuration",
                                        long.class,
@@ -2606,7 +2664,22 @@ public class JoinManager {
 	registrarPreparer = conf.registrarPreparer;
         registrationPreparer = conf.registrationPreparer;
         serviceLeasePreparer = conf.serviceLeasePreparer;
-        taskMgr = conf.taskManager;
+        taskMgr = new ExtensibleExecutorService(conf.taskManager, 
+                new RunnableFutureFactory(){
+
+            @Override
+            public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) {
+                if (r instanceof ProxyRegTask) return (RunnableFuture<T>) r;
+                throw new IllegalStateException("Runnable not instance of ProxyRegTask");
+            }
+
+            @Override
+            public <T> RunnableFuture<T> newTaskFor(Callable<T> c) {
+                if (c instanceof ProxyRegTask) return (RunnableFuture<T>) c;
+                throw new IllegalStateException("Callable not instance of ProxyRegTask");
+            }
+            
+        });
         wakeupMgr = conf.wakeupManager;
         maxNRetries = conf.maxNretrys;
         leaseRenewalMgr = conf.leaseRenewalManager;
@@ -2648,12 +2721,12 @@ public class JoinManager {
         if(taskMgr == null) return;
         synchronized(proxyReg.taskList) {
             if(proxyReg.proxyRegTask != null) {
-                taskMgr.remove(proxyReg.proxyRegTask);
                 proxyReg.proxyRegTask.cancel(false);                
                 proxyReg.proxyRegTask = null;  //don't reuse because of seq#
             }//endif
             proxyReg.taskList.clear();
         }//end sync(proxyReg.taskList)
+        proxyReg.terminate();
     }//end removeTasks
 
     /** Removes from the task manager, all pending tasks regardless of the
@@ -2667,22 +2740,8 @@ public class JoinManager {
             wakeupMgr.cancelAll();//cancel all tickets
             wakeupMgr.stop();//stop execution of the wakeup manager
         }
-        synchronized(taskMgr) {
-            /* Remove all pending tasks */
-            ArrayList pendingTasks = taskMgr.getPending();
-            for(int i=0;i<pendingTasks.size();i++) {
-                RetryTask pendingTask = (RetryTask)pendingTasks.get(i);
-                pendingTask.cancel(false);                
-                taskMgr.remove(pendingTask);//remove from task mgr
-            }//end loop
-            /* Interrupt all active tasks, prepare taskMgr for GC. */
-            taskMgr.terminate();
-        }
-        // Too lazy to put out the trash.
-//                taskMgr = null;
-//            }//end sync(taskMgr)
-//            wakeupMgr = null;
-//        }//end sync(wakeupMgr)
+        /* Interrupt all active tasks, prepare taskMgr for GC. */
+        taskMgr.shutdownNow();
     }//end terminateTaskMgr
 
     /** Examines the elements of the input set and, upon finding at least one



Mime
View raw message