jakarta-jcs-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Estefano Eduardo" <eduardo.estef...@siemens.com>
Subject RE: Less expensive EventQueue patch
Date Fri, 14 May 2004 10:37:29 GMT
Use Jalopy, you can configure this kind of code formatting.

-----Original Message-----
From: Travis Savo [mailto:tsavo@IFILM.com] 
Sent: Friday, 14 May, 2004 3:07
To: 'Turbine JCS Developers List'
Cc: 'aasmuts@wisc.edu'
Subject: Less expensive EventQueue patch


This patch makes the CacheEventQueue not spawn unnecessary threads, and
keep them alive for a short time afterwards in case new events come in.

Pay close attention to the (javadoced) THREAD_TIMEOUT constant. This
determines how long a thread lives after emptying out the queue waiting
for more events to come in. It may be desirable at some point in the
future to make this a configuration option.

I did my very best to not let formatting inconsistencies find their way
in... but there's:
A) No way to tell Eclipse to put throws declarations on the next line,
and
B) No standard for throws declaration in the Apache coding standards
(that I could find).

Also, this file isn't whitespace conformant to the Apache standards, so
my patch fixes that as well.

-Travis Savo


Index: CacheEventQueue.java
===================================================================
RCS file:
/home/cvspublic/jakarta-turbine-jcs/src/java/org/apache/jcs/engine/Cache
Even
tQueue.java,v
retrieving revision 1.7
diff -u -r1.7 CacheEventQueue.java
--- CacheEventQueue.java	17 Apr 2004 14:00:11 -0000	1.7
+++ CacheEventQueue.java	14 May 2004 00:56:33 -0000
@@ -1,6 +1,5 @@
 package org.apache.jcs.engine;
 
-
 /*
  * Copyright 2001-2004 The Apache Software Foundation.
  *
@@ -16,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 import java.io.IOException;
 import java.io.Serializable;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.jcs.engine.behavior.ICacheElement;
@@ -29,46 +25,45 @@
 
 /**
  * An event queue is used to propagate ordered cache events to one and
only one
- * target listener.
- *
- * <pre>
+ * target listener. 
+ * <pre> *   
  * Changes:<br>
  * 17 April 2004  Hanson Char
  * <ol><li>Bug fix: add missing synchronization to method
addRemoveEvent();</li>
- * <li>Use the light weight new int[0] for creating the object monitor
queueLock,
- * instead of new Object();</li>
+ * <li>Use the light weight new int[0] for creating the object monitor
queueLock, instead of new Object();</li>
  * <li>Explicitely qualify member variables of CacheEventQueue in inner
classes.
  * Hopefully this will help identify any potential concurrency
issue.</li>
  * </ol>
+ * 13 May 2004  Travis Savo<br>
+ * Changed to not spawn a thread when the queue isn't in use, and not 
+ kill
said thread until THREAD_TIMEOUT
+ * has passed without a new event.
  * </pre>
  */
 public class CacheEventQueue implements ICacheEventQueue
 {
-    private final static Log log = LogFactory.getLog(
CacheEventQueue.class
);
 
+    private final static Log log = LogFactory.getLog( 
+ CacheEventQueue.class
);
+    /**
+     * Number of milliseconds after emptying out a queue to wait until
killing the processor thread.
+     */
+    private static final long THREAD_TIMEOUT = 60*1000;
+    
     private static int processorInstanceCount = 0;
-
     // private LinkedQueue queue = new LinkedQueue();
-
     private ICacheListener listener;
     private byte listenerId;
     private String cacheName;
-
     private int failureCount;
     private int maxFailure;
-
     // in milliseconds
     private int waitBeforeRetry;
-
     private boolean destroyed;
-    private Thread t;
-
+    private boolean working;
+    
+    private Thread processorThread;
     // Internal queue implementation
-
     private Object queueLock = new int[0];
-
     // Dummy node
-
     private Node head = new Node();
     private Node tail = head;
 
@@ -79,9 +74,7 @@
      * @param listenerId
      * @param cacheName
      */
-    public CacheEventQueue( ICacheListener listener,
-                            byte listenerId,
-                            String cacheName )
+    public CacheEventQueue( ICacheListener listener, byte listenerId,
String cacheName)
     {
         this( listener, listenerId, cacheName, 10, 500 );
     }
@@ -95,26 +88,19 @@
      * @param maxFailure
      * @param waitBeforeRetry
      */
-    public CacheEventQueue( ICacheListener listener,
-                            byte listenerId,
-                            String cacheName,
-                            int maxFailure,
-                            int waitBeforeRetry )
+    public CacheEventQueue( ICacheListener listener, byte listenerId,
String cacheName, int maxFailure, int waitBeforeRetry)
     {
         if ( listener == null )
         {
             throw new IllegalArgumentException( "listener must not be
null" );
         }
-
         this.listener = listener;
         this.listenerId = listenerId;
         this.cacheName = cacheName;
         this.maxFailure = maxFailure <= 0 ? 10 : maxFailure;
         this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 :
waitBeforeRetry;
-
-        this.t = new QProcessor();
-        this.t.start();
-
+        this.processorThread = new QProcessor();
+        this.processorThread.start();
         if ( log.isDebugEnabled() )
         {
             log.debug( "Constructed: " + this );
@@ -129,21 +115,26 @@
         if ( !this.destroyed )
         {
             this.destroyed = true;
-
             // sychronize on queue so the thread will not wait forever,
             // and then interrupt the QueueProcessor
-
             synchronized ( this.queueLock )
             {
-                this.t.interrupt();
+                this.processorThread.interrupt();
             }
-
-            this.t = null;
-
+            this.processorThread = null;
             log.info( "Cache event queue destroyed: " + this );
         }
     }
-
+    
+    /**
+     * Event Q is emtpy.
+     */
+    public synchronized void stopProcessing()
+    {
+        working = false;
+        processorThread = null;
+    }
+    
     /**
      * @return
      */
@@ -160,6 +151,11 @@
         return ( !this.destroyed );
     }
 
+    private boolean isWorking()
+    {
+        return ( this.working );
+    }
+
     /**
      * @return The {3} value
      */
@@ -172,8 +168,7 @@
      * @param ce The feature to be added to the PutEvent attribute
      * @exception IOException
      */
-    public synchronized void addPutEvent( ICacheElement ce )
-        throws IOException
+    public synchronized void addPutEvent( ICacheElement ce ) throws
IOException
     {
         if ( !this.destroyed )
         {
@@ -185,8 +180,7 @@
      * @param key The feature to be added to the RemoveEvent attribute
      * @exception IOException
      */
-    public synchronized void addRemoveEvent( Serializable key )
-        throws IOException
+    public synchronized void addRemoveEvent( Serializable key ) throws
IOException
     {
         if ( !this.destroyed )
         {
@@ -197,8 +191,7 @@
     /**
      * @exception IOException
      */
-    public synchronized void addRemoveAllEvent()
-        throws IOException
+    public synchronized void addRemoveAllEvent() throws IOException
     {
         if ( !this.destroyed )
         {
@@ -209,8 +202,7 @@
     /**
      * @exception IOException
      */
-    public synchronized void addDisposeEvent()
-        throws IOException
+    public synchronized void addDisposeEvent() throws IOException
     {
         if ( !this.destroyed )
         {
@@ -226,59 +218,55 @@
     private void put( AbstractCacheEvent event )
     {
         Node newNode = new Node();
-
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "Event entering Queue for " + cacheName + ": " +
event );
+        }
         newNode.event = event;
-
-        synchronized ( this.queueLock )
+        synchronized ( queueLock )
         {
             this.tail.next = newNode;
             this.tail = newNode;
-
-            this.queueLock.notify();
+            if ( isAlive() )
+                if ( !isWorking() )
+                {
+                    this.working = true;
+                    processorThread = new QProcessor();
+                    processorThread.start();
+                } else
+                {
+                    queueLock.notify();
+                }
         }
     }
 
-    private AbstractCacheEvent take() throws InterruptedException
+    private AbstractCacheEvent take()
     {
-        synchronized ( this.queueLock )
+        synchronized ( queueLock )
         {
             // wait until there is something to read
-
-            while ( this.head == this.tail )
+            if ( head == tail )
             {
-                this.queueLock.wait();
+                return null;
             }
-
-            // we have the lock, and the list is not empty
-
-            Node node = this.head.next;
-
-            // This is an awful bug.  This will always return null.
-            // This make the event Q and event destroyer.
-            //AbstractCacheEvent value = head.event;
-
-            // corrected
+            Node node = head.next;
             AbstractCacheEvent value = node.event;
-
             if ( log.isDebugEnabled() )
             {
-              log.debug( "head.event = " + this.head.event );
-              log.debug( "node.event = " + node.event );
+                log.debug( "head.event = " + head.event );
+                log.debug( "node.event = " + node.event );
             }
-
             // Node becomes the new head (head is always empty)
-
             node.event = null;
-            this.head = node;
-
+            head = node;
             return value;
         }
     }
 
     ///////////////////////////// Inner classes
/////////////////////////////
-
     private static class Node
     {
+
         Node next = null;
         AbstractCacheEvent event = null;
     }
@@ -287,70 +275,80 @@
      */
     private class QProcessor extends Thread
     {
+
         /**
          * Constructor for the QProcessor object
          */
         QProcessor()
         {
-            super( "CacheEventQueue.QProcessor-" + (
++CacheEventQueue.this.processorInstanceCount ) );
-
+            super( "CacheEventQueue.QProcessor-" + (
++processorInstanceCount ) );
             setDaemon( true );
         }
 
+        
         /**
          * Main processing method for the QProcessor object
          */
         public void run()
         {
             AbstractCacheEvent r = null;
-
-            while ( !CacheEventQueue.this.destroyed )
+            while ( isAlive() )
             {
-                try
+                r = take();
+                if ( log.isDebugEnabled() )
                 {
-                    r = take();
-
-                    if ( log.isDebugEnabled() )
-                    {
-                        log.debug( "r from take() = " + r );
-                    }
-
+                    log.debug( "Event from queue = " + r );
                 }
-                catch ( InterruptedException e )
+                if ( r == null )
                 {
-                    // We were interrupted, just continue -- the while
loop
-                    // will exit if we have been properly destroyed.
+                    synchronized ( queueLock )
+                    {
+                        try
+                        {
+                            queueLock.wait( THREAD_TIMEOUT );
+                        } catch ( InterruptedException e )
+                        {
+                            log.warn( "Interrupted while waiting for
another event to come in before we die." );
+                            return;
+                        }
+                        r = take();
+                        if ( log.isDebugEnabled() )
+                        {
+                            log.debug( "Event from queue after sleep = 
+ " +
r );
+                        }
+                        if ( r == null )
+                        {
+                            stopProcessing();
+                        }
+                    }
                 }
-
-                if ( !CacheEventQueue.this.destroyed && r != null )
+                if ( isAlive() && r != null )
                 {
                     r.run();
                 }
             }
-            // declare failure as listener is permanently unreachable.
-            // queue = null;
-            CacheEventQueue.this.listener = null;
-            // The listener failure logging more the problem of the
user
-            // of the q.
-            log.info( "QProcessor exiting for " + CacheEventQueue.this
);
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "QProcessor exiting for " + this );
+            }
         }
     }
 
+
     /**
      * Retries before declaring failure.
      *
      */
     private abstract class AbstractCacheEvent implements Runnable
     {
+
         /**
          * Main processing method for the AbstractCacheEvent object
          */
         public void run()
         {
             IOException ex = null;
-
-            while ( !CacheEventQueue.this.destroyed
-                    && CacheEventQueue.this.failureCount <=
CacheEventQueue.this.maxFailure )
+            while ( !CacheEventQueue.this.destroyed &&
CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
             {
                 try
                 {
@@ -359,32 +357,28 @@
                     CacheEventQueue.this.failureCount = 0;
                     return;
                     // happy and done.
-                }
-                catch ( IOException e )
+                } catch ( IOException e )
                 {
                     CacheEventQueue.this.failureCount++;
                     ex = e;
                 }
                 // Let's get idle for a while before retry.
-                if ( !CacheEventQueue.this.destroyed
-                     && CacheEventQueue.this.failureCount <=
CacheEventQueue.this.maxFailure )
+                if ( !CacheEventQueue.this.destroyed &&
CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
                 {
                     try
                     {
                         log.warn( "...retrying propagation " +
CacheEventQueue.this + "..." + CacheEventQueue.this.failureCount );
-                        Thread.currentThread().sleep(
CacheEventQueue.this.waitBeforeRetry );
-                    }
-                    catch ( InterruptedException ie )
+                        Thread.sleep( 
+ CacheEventQueue.this.waitBeforeRetry
);
+                    } catch ( InterruptedException ie )
                     {
                         // ignore;
                     }
                 }
             }
-            // Too bad.  The remote host is unreachable, so we give up.
+            // Too bad. The remote host is unreachable, so we give up.
             if ( ex != null )
             {
                 log.warn( "Giving up propagation " +
CacheEventQueue.this, ex );
-
                 destroy();
             }
             return;
@@ -395,8 +389,7 @@
          *
          * @exception IOException
          */
-        protected abstract void doRun()
-            throws IOException;
+        protected abstract void doRun() throws IOException;
     }
 
     /**
@@ -412,14 +405,11 @@
          * @param ice
          * @exception IOException
          */
-        PutEvent( ICacheElement ice )
-            throws IOException
+        PutEvent( ICacheElement ice) throws IOException
         {
             this.ice = ice;
             /*
-             * this.key = key;
-             * this.obj = CacheUtils.dup(obj);
-             * this.attr = attr;
+             * this.key = key; this.obj = CacheUtils.dup(obj); 
+ this.attr =
attr;
              * this.groupName = groupName;
              */
         }
@@ -429,13 +419,11 @@
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             /*
              * CacheElement ce = new CacheElement(cacheName, key, obj);
-             * ce.setElementAttributes( attr );
-             * ce.setGroupName( groupName );
+             * ce.setElementAttributes( attr ); ce.setGroupName( 
+ groupName
);
              */
             CacheEventQueue.this.listener.handlePut( ice );
         }
@@ -447,6 +435,7 @@
      */
     private class RemoveEvent extends AbstractCacheEvent
     {
+
         private Serializable key;
 
         /**
@@ -455,8 +444,7 @@
          * @param key
          * @exception IOException
          */
-        RemoveEvent( Serializable key )
-            throws IOException
+        RemoveEvent( Serializable key) throws IOException
         {
             this.key = key;
         }
@@ -466,8 +454,7 @@
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             CacheEventQueue.this.listener.handleRemove(
CacheEventQueue.this.cacheName, key );
         }
@@ -479,13 +466,13 @@
      */
     private class RemoveAllEvent extends AbstractCacheEvent
     {
+
         /**
          * Description of the Method
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             CacheEventQueue.this.listener.handleRemoveAll(
CacheEventQueue.this.cacheName );
         }
@@ -497,16 +484,15 @@
      */
     private class DisposeEvent extends AbstractCacheEvent
     {
+
         /**
          * Description of the Method
          *
          * @exception IOException
          */
-        protected void doRun()
-            throws IOException
+        protected void doRun() throws IOException
         {
             CacheEventQueue.this.listener.handleDispose(
CacheEventQueue.this.cacheName );
         }
     }
 }
-

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org


Mime
View raw message