river-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From peter_firmst...@apache.org
Subject svn commit: r1557708 - in /river/jtsk/skunk/qa_refactor/trunk/src: com/sun/jini/fiddler/ com/sun/jini/mahalo/ com/sun/jini/mercury/ com/sun/jini/norm/event/ com/sun/jini/outrigger/ com/sun/jini/reggie/ com/sun/jini/thread/ net/jini/discovery/ net/jini/...
Date Mon, 13 Jan 2014 13:26:23 GMT
Author: peter_firmstone
Date: Mon Jan 13 13:26:23 2014
New Revision: 1557708

URL: http://svn.apache.org/r1557708
Log:
RIVER-344

Implemented Comparable for all tasks relying on order, removed Comparator's to avoid unnecessarily
adding to public API.

Set up sensible defaults for ThreadPoolExecutor implementations of ExecutorService, this will
cause some test failures for tests that make assumptions about serial event generation and
delivery above and beyond the Jini Event specification, for Reggie, note these failures do
not occur when ThreadPoolExecutors are configured to be single threaded.

Documentation of configuration options to follow.

Modified:
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/fiddler/FiddlerInit.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Notifier.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
    river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
    river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/fiddler/FiddlerInit.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/fiddler/FiddlerInit.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/fiddler/FiddlerInit.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/fiddler/FiddlerInit.java Mon Jan 13
13:26:23 2014
@@ -190,12 +190,13 @@ class FiddlerInit {
                                            "leaseMax",
                                            FiddlerImpl.MAX_LEASE, 0, Long.MAX_VALUE);
 
-            /* Get a general-purpose task manager for this service */
+            /* Get a general-purpose task manager for this service 
+             * LinkedBlockingQueue is unbounded for that reason*/
             executorService = Config.getNonNullEntry(config,
                                              FiddlerImpl.COMPONENT_NAME,
                                              "executorService",
                                              ExecutorService.class,
-                                             new ThreadPoolExecutor(1,10,15,TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("Fiddler Executor", false))
);
+                                             new ThreadPoolExecutor(10,10,15,TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("Fiddler Executor", false))
);
             /* Get the discovery manager to pass to this service's join manager. */
             try {
                 joinMgrLDM = Config.getNonNullEntry(config,

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java
(original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mahalo/TxnManagerImplInitializer.java
Mon Jan 13 13:26:23 2014
@@ -208,11 +208,11 @@ class TxnManagerImplInitializer {
                 "settlerPool", 
                 ExecutorService.class,
                 new ThreadPoolExecutor(
-                    1,
-                    settlerthreads, 
+                    settlerthreads,
+                    settlerthreads, /* Ignored */
                     settlertimeout, 
                     TimeUnit.MILLISECONDS, 
-                    new LinkedBlockingQueue<Runnable>(),
+                    new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
                     new NamedThreadFactory("TxnMgr settlerPool", false)
                 )
         );
@@ -222,11 +222,11 @@ class TxnManagerImplInitializer {
                 "taskPool",
                 ExecutorService.class, 
                 new ThreadPoolExecutor(
-                        1,
                         taskthreads,
+                        taskthreads, /* Ignored */
                         tasktimeout,
                         TimeUnit.MILLISECONDS,
-                        new LinkedBlockingQueue<Runnable>(),
+                        new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
                         new NamedThreadFactory("TxnMgr taskPool", false)
                 )
         );

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/mercury/MailboxImpl.java Mon Jan 13
13:26:23 2014
@@ -2930,9 +2930,16 @@ class MailboxImpl implements MailboxBack
     	    super("Notifier");
     	    taskManager = Config.getNonNullEntry(config,
 	        MERCURY, "notificationsExecutorService",
-	        ExecutorService.class, new ThreadPoolExecutor(1,10,15,TimeUnit.SECONDS, 
-                    new LinkedBlockingQueue<Runnable>(), 
-                    new NamedThreadFactory("EventTypeGenerator", false)));
+	        ExecutorService.class, 
+                new ThreadPoolExecutor(
+                    10,
+                    10, /* Ignored */
+                    15,
+                    TimeUnit.SECONDS, 
+                    new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
+                    new NamedThreadFactory("EventTypeGenerator", false)
+                )
+            );
 //TODO - defer TaskManager() creation to catch block of getEntry()
     	    //start();
         }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java
(original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/norm/event/EventTypeGenerator.java
Mon Jan 13 13:26:23 2014
@@ -51,9 +51,14 @@ public class EventTypeGenerator implemen
      * Task manager used to send events
      */
     private transient ExecutorService taskManager = 
-            new ThreadPoolExecutor(1,10,15,TimeUnit.SECONDS, 
-                    new LinkedBlockingQueue<Runnable>(), 
-                    new NamedThreadFactory("EventTypeGenerator", false));
+            new ThreadPoolExecutor(
+                    10,
+                    10, /* Ignored */
+                    15,
+                    TimeUnit.SECONDS, 
+                    new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */
+                    new NamedThreadFactory("EventTypeGenerator", false)
+            );
 
     /**
      * Wakeup manager used by the event sending tasks to schedule 
@@ -151,9 +156,14 @@ public class EventTypeGenerator implemen
 	// fill in the object from the stream 
 	in.defaultReadObject();
 
-	taskManager = new ThreadPoolExecutor(1,10,15,TimeUnit.SECONDS, 
-                    new LinkedBlockingQueue<Runnable>(), 
-                    new NamedThreadFactory("EventTypeGenerator", true));
+	taskManager = new ThreadPoolExecutor(
+                    10,
+                    10, /* Ignored */
+                    15,
+                    TimeUnit.SECONDS, 
+                    new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
+                    new NamedThreadFactory("EventTypeGenerator", true)
+        );
 	wakeupManager = 
 	    new WakeupManager(new WakeupManager.ThreadDesc(null, true));    
     }

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Notifier.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Notifier.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Notifier.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/Notifier.java Mon Jan 13
13:26:23 2014
@@ -99,9 +99,15 @@ class Notifier implements com.sun.jini.c
 	pending = Config.getNonNullEntry(config,
 	    OutriggerServerImpl.COMPONENT_NAME, "notificationsExecutorService", 
 	    ExecutorService.class, 
-            new ThreadPoolExecutor(1,10,15,TimeUnit.SECONDS, 
-                    new LinkedBlockingQueue<Runnable>()), 
-            new NamedThreadFactory("OutriggerServerImpl Notifier", false));
+            new ThreadPoolExecutor(
+                10,
+                10, /* Ignored */
+                15,
+                TimeUnit.SECONDS, 
+                new LinkedBlockingQueue<Runnable>(), /* Unbounded queue */
+                new NamedThreadFactory("OutriggerServerImpl Notifier", false)
+            )
+        );
     }
 
     /**

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/outrigger/TxnMonitor.java Mon Jan
13 13:26:23 2014
@@ -116,15 +116,20 @@ class TxnMonitor implements Runnable {
 	this.space = space;
 
 	taskManager = Config.getNonNullEntry(config,
-	    OutriggerServerImpl.COMPONENT_NAME, "txnMonitorTaskManager", 
+	    OutriggerServerImpl.COMPONENT_NAME, "txnMonitorExecutorService", 
 	    ExecutorService.class, 
-            new ThreadPoolExecutor(1,10,15,TimeUnit.SECONDS, 
-                    new LinkedBlockingQueue<Runnable>(), 
-                    new NamedThreadFactory("OutriggerServerImpl TxnMonitor", false)));
+            new ThreadPoolExecutor(
+                    10,
+                    10, /* Ignored */
+                    15,
+                    TimeUnit.SECONDS, 
+                    new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
+                    new NamedThreadFactory("OutriggerServerImpl TxnMonitor", false)
+            )
+        );
 
         ourThread = new Thread(this, "TxnMonitor");
 	ourThread.setDaemon(true);
-//        ourThread.start();
     }
     
     public void start(){

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Mon Jan
13 13:26:23 2014
@@ -17,9 +17,6 @@
  */
 package com.sun.jini.reggie;
 
-import au.net.zeus.collection.RC;
-import au.net.zeus.collection.Ref;
-import au.net.zeus.collection.Referrer;
 import com.sun.jini.config.Config;
 import com.sun.jini.constants.ThrowableConstants;
 import com.sun.jini.constants.VersionConstants;
@@ -92,6 +89,7 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -2011,9 +2009,9 @@ class RegistrarImpl implements Registrar
 	    reg = null;
 	}
     }
-
+    
     /** An event to be sent, and the listener to send it to. */
-    private static final class EventTask implements Runnable {
+    private static final class EventTask implements Runnable, Comparable<EventTask>
{
 
 	/** The event registration */
 	private final EventReg reg;
@@ -2028,9 +2026,11 @@ class RegistrarImpl implements Registrar
         
         private final RegistrarProxy proxy;
         private final Registrar registrar;
+        /* the time of the event */
+        private final long now;
 
 	/** Simple constructor, except increments reg.seqNo. */
-	public EventTask(EventReg reg, ServiceID sid, Item item, int transition, RegistrarProxy
proxy, Registrar registrar)
+	public EventTask(EventReg reg, ServiceID sid, Item item, int transition, RegistrarProxy
proxy, Registrar registrar, long now)
 	{
 	    this.reg = reg;
 	    seqNo = reg.incrementAndGetSeqNo();
@@ -2039,6 +2039,7 @@ class RegistrarImpl implements Registrar
 	    this.transition = transition;
             this.proxy = proxy;
             this.registrar = registrar;
+            this.now = now;
 	}
 
 	/** Send the event */
@@ -2083,6 +2084,19 @@ class RegistrarImpl implements Registrar
 		}
 	    }
 	}
+
+        /**
+         * This is inconsistent with Object.equals, it is simply intended to
+         * order tasks by priority.
+         * @param o
+         * @return 
+         */
+        @Override
+        public int compareTo(EventTask o) {
+            if (this.now < o.now) return -1;
+            if (this.now > o.now) return 1;
+            return 0;
+        }
     }
 
     /** Task for decoding multicast request packets. */
@@ -4888,11 +4902,11 @@ class RegistrarImpl implements Registrar
                 "eventNotifierExecutor",
                 ExecutorService.class, 
                 new ThreadPoolExecutor(
-                    1, 
                     poolSizeLimit, 
+                    poolSizeLimit, /* Ignored */
                     15L, 
                     TimeUnit.MINUTES, 
-                    new LinkedBlockingQueue(),
+                    new PriorityBlockingQueue(poolSizeLimit * 4), /* Unbounded Ordered Queue
*/
                     new NamedThreadFactory("Reggie_Event_Notifier", true)   
                 )
             );
@@ -4903,11 +4917,11 @@ class RegistrarImpl implements Registrar
                 "discoveryResponseExecutor", 
                 ExecutorService.class, 
                 new ThreadPoolExecutor(
-                    1, 
                     poolSizeLimit, 
+                    poolSizeLimit, /* Ignored */
                     15L, 
                     TimeUnit.MINUTES, 
-                    new LinkedBlockingQueue(),
+                    new LinkedBlockingQueue(), /* Unbounded Queue */
                     new NamedThreadFactory("Reggie_Discovery_Response", true)
                 ) 
             );
@@ -5789,30 +5803,27 @@ class RegistrarImpl implements Registrar
 		 (pre == null || !matchItem(reg.tmpl, pre)) &&
 		 (post != null && matchItem(reg.tmpl, post)))
 	    pendingEvent(reg, sid, post,
-			 ServiceRegistrar.TRANSITION_NOMATCH_MATCH);
+			 ServiceRegistrar.TRANSITION_NOMATCH_MATCH, now);
 	else if ((reg.transitions &
 		  ServiceRegistrar.TRANSITION_MATCH_NOMATCH) != 0 &&
 		 (pre != null && matchItem(reg.tmpl, pre)) &&
 		 (post == null || !matchItem(reg.tmpl, post)))
 	    pendingEvent(reg, sid, post,
-			 ServiceRegistrar.TRANSITION_MATCH_NOMATCH);
+			 ServiceRegistrar.TRANSITION_MATCH_NOMATCH, now);
 	else if ((reg.transitions &
 		  ServiceRegistrar.TRANSITION_MATCH_MATCH) != 0 &&
 		 (pre != null && matchItem(reg.tmpl, pre)) &&
 		 (post != null && matchItem(reg.tmpl, post)))
 	    pendingEvent(reg, sid, post,
-			 ServiceRegistrar.TRANSITION_MATCH_MATCH);
+			 ServiceRegistrar.TRANSITION_MATCH_MATCH, now);
     }
 
     /** Add a pending EventTask for this event registration. */
-    private void pendingEvent(EventReg reg,
-			      ServiceID sid,
-			      Item item,
-			      int transition)
+    private void pendingEvent(EventReg reg, ServiceID sid, Item item, int transition, long
now)
     {
 	if (item != null)
 	    item = copyItem(item);
-	eventNotifierExec.execute(new EventTask(reg, sid, item, transition, proxy, this));
+	eventNotifierExec.execute(new EventTask(reg, sid, item, transition, proxy, this, now));
     }
 
     /** Generate a new service ID */

Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/thread/ThreadPool.java Mon Jan 13
13:26:23 2014
@@ -224,20 +224,21 @@ final class ThreadPool implements Execut
                      * REMIND: What if the task changed this thread's
                      * priority? or context class loader?
                      * 
-                     * thread.setName is not thread safe.
+                     * thread.setName is not thread safe, so may not reflect
+                     * most up to date state
                      */
                     try {
                         waitingThreads.getAndIncrement();
                         task = null;
                         task = queue.poll(idleTimeout, TimeUnit.MILLISECONDS);
                         waitingThreads.getAndDecrement();
-//                        thread.setName(NewThreadAction.NAME_PREFIX + task);
+                        thread.setName(NewThreadAction.NAME_PREFIX + task);
                         if (task != null) {
                             task.run();
                         } else {
                             break; //Timeout or spurious wakeup.
                         }
-//                         thread.setName(NewThreadAction.NAME_PREFIX + "Idle");
+                         thread.setName(NewThreadAction.NAME_PREFIX + "Idle");
                     } catch (InterruptedException e){
                         waitingThreads.getAndDecrement();
                         thread.interrupt();

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
(original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupDiscovery.java
Mon Jan 13 13:26:23 2014
@@ -1425,10 +1425,13 @@ abstract class AbstractLookupDiscovery i
                         "executorService", ExecutorService.class);
             } catch (NoSuchEntryException e) { /* use default */
                 executorServ =
-                    new ThreadPoolExecutor(1, MAX_N_TASKS ,
-                              15L, TimeUnit.SECONDS,
-                              new LinkedBlockingQueue<Runnable>(),
-                              new NamedThreadFactory("LookupDiscovery", false));
+                    new ThreadPoolExecutor(
+                        MAX_N_TASKS, 
+                        MAX_N_TASKS, /* Ignored */
+                        15L, TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue */
+                        new NamedThreadFactory("LookupDiscovery", false)
+                    );
             }
             this.executor = executorServ;
 

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java
(original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/discovery/AbstractLookupLocatorDiscovery.java
Mon Jan 13 13:26:23 2014
@@ -1378,10 +1378,14 @@ abstract class AbstractLookupLocatorDisc
                                                             ExecutorService.class);
         } catch(NoSuchEntryException e) { /* use default */
             i.discoveryTaskMgr = 
-            new ThreadPoolExecutor(1, MAX_N_TASKS ,
-                              15L, TimeUnit.SECONDS,
-                              new LinkedBlockingQueue<Runnable>(),
-                              new NamedThreadFactory("LookupLocatorDiscovery", false));
+            new ThreadPoolExecutor(
+                    MAX_N_TASKS, 
+                    MAX_N_TASKS, /* Ignored */
+                    15L,
+                    TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<Runnable>(), /* Unbounded */
+                    new NamedThreadFactory("LookupLocatorDiscovery", false)
+            );
         }
         /* Wakeup manager */
         try {

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lease/LeaseRenewalManager.java Mon Jan
13 13:26:23 2014
@@ -716,10 +716,15 @@ public class LeaseRenewalManager {
      */
     public LeaseRenewalManager() {
         leaseRenewalExecutor = 
-            new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, 
-                    new SynchronousQueue<Runnable>(), 
+            new ThreadPoolExecutor(
+                    1,  /* min threads */
+                    11, /* max threads */
+                    15,
+                    TimeUnit.SECONDS, 
+                    new SynchronousQueue<Runnable>(), /* Queue has no capacity */
                     new NamedThreadFactory("LeaseRenewalManager",true),
-                    new CallerRunsPolicy());
+                    new CallerRunsPolicy()
+            );
     }
 
     /**
@@ -746,11 +751,20 @@ public class LeaseRenewalManager {
 	    config, LRM, "roundTripTime",
 	    renewalRTT, 1, Long.MAX_VALUE);
 	leaseRenewalExecutor = Config.getNonNullEntry(
-	    config, LRM, "executorService", ExecutorService.class,
-                new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, 
-                    new SynchronousQueue<Runnable>(), 
+            config, 
+            LRM, 
+            "executorService", 
+            ExecutorService.class,
+            new ThreadPoolExecutor(
+                    1,  /* Min Threads */
+                    11, /* Max Threads */
+                    15,
+                    TimeUnit.SECONDS, 
+                    new SynchronousQueue<Runnable>(), /* No capacity */
                     new NamedThreadFactory("LeaseRenewalManager",false),
-                    new CallerRunsPolicy()) );
+                    new CallerRunsPolicy()
+            ) 
+        );
     }
 
     /**
@@ -778,10 +792,15 @@ public class LeaseRenewalManager {
 			       long desiredExpiration,
 			       LeaseListener listener)
     {
-        leaseRenewalExecutor = new ThreadPoolExecutor(1,11,15,TimeUnit.SECONDS, 
-                    new SynchronousQueue<Runnable>(), 
-                    new NamedThreadFactory("LeaseRenewalManager",true),
-                    new CallerRunsPolicy());
+        leaseRenewalExecutor = new ThreadPoolExecutor(
+                1,  /* Min Threads */
+                11, /* Max Threads */
+                15,
+                TimeUnit.SECONDS, 
+                new SynchronousQueue<Runnable>(), /* No Capacity */
+                new NamedThreadFactory("LeaseRenewalManager",true),
+                new CallerRunsPolicy()
+        );
 	renewUntil(lease, desiredExpiration, listener);
     }
 

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Mon Jan 13 13:26:23
2014
@@ -451,23 +451,6 @@ import org.apache.river.impl.thread.Name
  */
 public class JoinManager {
     
-    /**
-     * executorService requires a PriorityBlockingQueue, this is the 
-     * comparator for that queue.
-     */
-    public static class ExecutorQueueComparator implements Comparator {
-
-        @Override
-        public int compare(Object o1, Object o2) {
-            int one = ((ProxyRegTask)o1).getSeqN();
-            int two = ((ProxyRegTask)o2).getSeqN();
-            if (one < two) return -1;
-            if (one > two) return 1;
-            return 0;
-        }
-        
-    }
-
     /** Implementation Note:
      *
      *  This class executes a number of tasks asynchronously. Each task is
@@ -590,7 +573,7 @@ public class JoinManager {
      */
 
     /** Abstract base class from which all of the task classes are derived. */
-    private class ProxyRegTask extends RetryTask {
+    private class ProxyRegTask extends RetryTask implements Comparable<ProxyRegTask>
{
         private final long[] sleepTime = { 5*1000, 10*1000, 15*1000,
                                           20*1000, 25*1000, 30*1000 };
         // volatile fields only mutated while synchronized on proxyReg.taskList
@@ -764,6 +747,13 @@ public class JoinManager {
                        "JoinManager - failure, will retry later", e);
             return false;//try this task again later
         }//end stopTrying
+
+        @Override
+        public int compareTo(ProxyRegTask o) {
+            if (seqN < o.seqN) return -1;
+            if (seqN > o.seqN) return 1;
+            return 0;
+        }
     }//end class ProxyRegTask
 
     /** Abstract base class from which all the sub-task classes are derived. */
@@ -2586,12 +2576,12 @@ public class JoinManager {
                                        ExecutorService.class);
         } catch(NoSuchEntryException e) { /* use default */
             taskMgr = new ThreadPoolExecutor(
-                    1,
-                    MAX_N_TASKS,
-                    15,
-                    TimeUnit.SECONDS,
-                    new PriorityBlockingQueue(100, new ExecutorQueueComparator()),
-                    new NamedThreadFactory("JoinManager executor thread", false)
+                MAX_N_TASKS, 
+                MAX_N_TASKS, /* Ignored */
+                15,
+                TimeUnit.SECONDS,
+                new PriorityBlockingQueue(100), /* Unbounded Queue */
+                new NamedThreadFactory("JoinManager executor thread", false)
             );
         }
         /* Wakeup manager */

Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java
URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1557708&r1=1557707&r2=1557708&view=diff
==============================================================================
--- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original)
+++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Mon
Jan 13 13:26:23 2014
@@ -927,7 +927,7 @@ public class ServiceDiscoveryManager {
      * 
      * @param <T> 
      */
-    public static final class FutureTaskSeqNo<T> extends FutureTask<T> 
+    public static final class FutureTaskSeqNo<T> extends FutureTask<T> implements
Comparable<FutureTaskSeqNo>
     {
         private final long seqNo;
 
@@ -944,27 +944,12 @@ public class ServiceDiscoveryManager {
             else seqNo = -1;
         }
 
-        private long getSeqNo()
-        {
-            return seqNo;
-        }
-    }
-
-    /**
-     * Comparator for PriorityBlockingQueue used in cacheExecutorService
-     * provided by configuration.
-     */
-    public static final class FutureComparator implements Comparator<FutureTaskSeqNo>
{
-
         @Override
-        public int compare(FutureTaskSeqNo o1, FutureTaskSeqNo o2) {
-            long one = o1.getSeqNo();
-            long two = o2.getSeqNo();
-            if (one < two) return -1;
-            if (one > two) return 1;
+        public int compareTo(FutureTaskSeqNo o) {
+            if (seqNo < o.seqNo) return -1;
+            if (seqNo > o.seqNo) return 1;
             return 0;
         }
-
     }
     
     /** Internal implementation of the LookupCache interface. Instances of
@@ -2293,11 +2278,11 @@ public class ServiceDiscoveryManager {
             } catch(ConfigurationException e) { /* use default */
                 cacheTaskMgr =
                         new ThreadPoolExecutor(
-                                1, 
-                                10, 
+                                10, /* Min Threads */
+                                10, /* Ignored */
                                 15, 
                                 TimeUnit.SECONDS,
-                                new PriorityBlockingQueue(100, new FutureComparator()),
+                                new PriorityBlockingQueue(100), /* Unbounded */
                                 new NamedThreadFactory(
                                         "SDM lookup cache",
                                         false
@@ -2320,17 +2305,17 @@ public class ServiceDiscoveryManager {
             } catch(ConfigurationException e) { /* use default */
                 serviceDiscardTimerTaskMgr = 
 //                        new TaskManager(10,(15*1000),1.0f);
-                        new ThreadPoolExecutor(
-                                1, 
-                                10,
-                                15,
-                                TimeUnit.SECONDS,
-                                new LinkedBlockingQueue<Runnable>(),
-                                new NamedThreadFactory(
-                                        "SDM discard timer",
-                                        false
-                                )
-                        );
+                    new ThreadPoolExecutor(
+                            10, /* Min Threads */
+                            10, /* Ignored */
+                            15,
+                            TimeUnit.SECONDS,
+                            new LinkedBlockingQueue<Runnable>(), /* Unbounded Queue
*/
+                            new NamedThreadFactory(
+                                "SDM discard timer",
+                                false
+                            )
+                    );
             }
             // Moved here from constructor to avoid publishing this reference
             lookupListenerProxy = lookupListener.export();



Mime
View raw message