jakarta-jcs-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asm...@apache.org
Subject cvs commit: jakarta-turbine-jcs/src/experimental/org/apache/jcs/engine CacheEventQueue.java
Date Tue, 20 Apr 2004 02:22:31 GMT
asmuts      2004/04/19 19:22:31

  Added:       src/experimental/org/apache/jcs/engine CacheEventQueue.java
  Log:
  Adding a modified version of Travis's CacheEventQueue.
  
  After some testing this can go into the main src.
  
  Revision  Changes    Path
  1.1                  jakarta-turbine-jcs/src/experimental/org/apache/jcs/engine/CacheEventQueue.java
  
  Index: CacheEventQueue.java
  ===================================================================
  package org.apache.jcs.engine;
  
  /*
   * Copyright 2001-2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License")
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  import java.io.IOException;
  import java.io.Serializable;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.jcs.engine.behavior.ICacheElement;
  import org.apache.jcs.engine.behavior.ICacheEventQueue;
  import org.apache.jcs.engine.behavior.ICacheListener;
  
  /**
   * An event queue is used to propagate ordered cache events to one and only one
   * target listener.
   *
   * This is a modified version of the experimental version offered by Travis.
   * It should lazy initilaize the processor thread, and kill the thread if
   * the queue goes emtpy for a specified period, now set to 1 minute.  If
   * something comes in after that a new processor thread should be created.
   *
   * I didn't get all of Hanson's cahnges in yet, but I did add the syncronization.
   */
  public class CacheEventQueue
      implements ICacheEventQueue
  {
    private static final Log log = LogFactory.getLog( CacheEventQueue.class );
  
    // private LinkedQueue queue = new LinkedQueue();
  
    // time to wait for an event before snuffing the background thread
    // if the queue is empty.
    // make configurable later
    private int waitToDieMillis = 60000;
  
    private ICacheListener listener;
    private byte listenerId;
    private String cacheName;
  
    private int failureCount;
    private int maxFailure;
  
    // in milliseconds
    private int waitBeforeRetry;
  
    private boolean destroyed = true;
    private boolean working = true;
    private Thread processorThread;
  
    // Internal queue implementation
  
    private Object queueLock = new Object();
  
    // Dummy node
  
    private Node head = new Node();
    private Node tail = head;
  
    /**
     * Constructs with the specified listener and the cache name.
     *
     * @param listener
     * @param listenerId
     * @param cacheName
     */
    public CacheEventQueue( ICacheListener listener, byte listenerId,
                            String cacheName )
    {
      this( listener, listenerId, cacheName, 10, 500 );
    }
  
    /**
     * Constructor for the CacheEventQueue object
     *
     * @param listener
     * @param listenerId
     * @param cacheName
     * @param maxFailure
     * @param waitBeforeRetry
     */
    public CacheEventQueue(
        ICacheListener listener,
        byte listenerId,
        String cacheName,
        int maxFailure,
        int waitBeforeRetry )
    {
      if ( listener == null )
      {
        throw new IllegalArgumentException( "listener must not be null" );
      }
  
      this.listener = listener;
      this.listenerId = listenerId;
      this.cacheName = cacheName;
      this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
      this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
  
      if ( log.isDebugEnabled() )
      {
        log.debug( "Constructed: " + this );
      }
    }
  
    /**
     * Event Q is emtpy.
     */
    public synchronized void stopProcessing()
    {
  
      destroyed = true;
      processorThread = null;
  
    }
  
    /**
     * Returns the time to wait for events before killing the background thread.
     */
    public int getWaitToDieMillis()
    {
      return waitToDieMillis;
    }
  
    /**
     * Sets the time to wait for events before killing the background thread.
     */
    public void setWaitToDieMillis( int wtdm)
    {
      waitToDieMillis = wtdm;
    }
  
    /**
     * @return
     */
    public String toString()
    {
      return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" +
          cacheName + "]";
    }
  
    /**
     * @return The {3} value
     */
    public boolean isAlive()
    {
      return ( !destroyed );
    }
  
    public void setAlive( boolean aState )
    {
      destroyed = !aState;
    }
  
    /**
     * @return The {3} value
     */
    public byte getListenerId()
    {
      return listenerId;
    }
  
    /**
     * Event Q is emtpy.
     */
    public synchronized void destroy()
    {
      if ( !destroyed )
      {
        destroyed = true;
  
        // sychronize on queue so the thread will not wait forever,
        // and then interrupt the QueueProcessor
  
        if ( processorThread != null )
        {
          synchronized ( queueLock )
          {
            processorThread.interrupt();
          }
        }
        processorThread = null;
  
        log.info( "Cache event queue destroyed: " + this );
      }
    }
  
    /**
     * @param ce
     *          The feature to be added to the PutEvent attribute
     * @exception IOException
     */
    public synchronized void addPutEvent( ICacheElement ce ) throws IOException
    {
      if ( isWorking() )
      {
        put( new PutEvent( ce ) );
      }
      else
      {
        if ( log.isWarnEnabled() )
        {
          log.warn( "Not enqueuing Put Event for [" +
                       this +"] because it's non-functional." );
        }
      }
    }
  
    /**
     * @param key
     *          The feature to be added to the RemoveEvent attribute
     * @exception IOException
     */
    public synchronized void addRemoveEvent( Serializable key ) throws IOException
    {
      if ( isWorking() )
      {
        put( new RemoveEvent( key ) );
      }
      else
      {
        if ( log.isWarnEnabled() )
        {
          log.warn( "Not enqueuing Remove Event for [" +
                       this +"] because it's non-functional." );
        }
      }
    }
  
    /**
     * @exception IOException
     */
    public synchronized void addRemoveAllEvent() throws IOException
    {
      if ( isWorking() )
      {
        put( new RemoveAllEvent() );
      }
      else
      {
        if ( log.isWarnEnabled() )
        {
          log.warn( "Not enqueuing RemoveAll Event for [" +
                       this +"] because it's non-functional." );
        }
      }
    }
  
    /**
     * @exception IOException
     */
    public synchronized void addDisposeEvent() throws IOException
    {
      if ( isWorking() )
      {
        put( new DisposeEvent() );
      }
      else
      {
        if ( log.isWarnEnabled() )
        {
          log.warn( "Not enqueuing Dispose Event for [" +
                       this +"] because it's non-functional." );
        }
      }
    }
  
    /**
     * Adds an event to the queue.
     *
     * @param event
     */
    private void put( AbstractCacheEvent event )
    {
      Node newNode = new Node();
      if ( log.isDebugEnabled() )
      {
        log.debug( "Event entering Queue for " + cacheName + ": " + event );
      }
  
      newNode.event = event;
  
      synchronized ( queueLock )
      {
        tail.next = newNode;
        tail = newNode;
        if ( isWorking() )
        {
          if ( !isAlive() )
          {
            destroyed = false;
            processorThread = new QProcessor( this );
            processorThread.start();
          }
          else
          {
            queueLock.notify();
          }
        }
      }
    }
  
    /**
     * Returns the next cache event from the queue or null if there are no events
     * in the queue.
     *
     */
    private AbstractCacheEvent take()
    {
      synchronized ( queueLock )
      {
        // wait until there is something to read
        if ( head == tail )
        {
          return null;
        }
  
        Node node = head.next;
  
        AbstractCacheEvent value = node.event;
  
        if ( log.isDebugEnabled() )
        {
          log.debug( "head.event = " + head.event );
          log.debug( "node.event = " + node.event );
        }
  
        // Node becomes the new head (head is always empty)
  
        node.event = null;
        head = node;
  
        return value;
      }
    }
  
    ///////////////////////////// Inner classes /////////////////////////////
  
    private static class Node
    {
      Node next = null;
      AbstractCacheEvent event = null;
    }
  
    /**
     * @author asmuts @created January 15, 2002
     */
    private class QProcessor
        extends Thread
    {
      CacheEventQueue queue;
      /**
       * Constructor for the QProcessor object
       */
      QProcessor( CacheEventQueue aQueue )
      {
  
        super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
  
        setDaemon( true );
        queue = aQueue;
      }
  
      /**
       * Main processing method for the QProcessor object.
       *
       * Waits for a specified time (waitToDieMillis) for something to come in
       * and if no new events come in during that period the run method can exit
       * and the thread is dereferenced.
       */
      public void run()
      {
        AbstractCacheEvent r = null;
  
        while ( queue.isAlive() )
        {
          r = queue.take();
  
          if ( log.isDebugEnabled() )
          {
            log.debug( "Event from queue = " + r );
          }
  
          if ( r == null )
          {
            synchronized ( queueLock )
            {
              try
              {
                queueLock.wait( queue.getWaitToDieMillis() );
              }
              catch ( InterruptedException e )
              {
                log.warn(
                    "Interrupted while waiting for another event to come in before we die."
);
                return;
              }
              r = queue.take();
              if ( log.isDebugEnabled() )
              {
                log.debug( "Event from queue after sleep = " + r );
              }
              if ( r == null )
              {
                queue.stopProcessing();
              }
            }
          }
  
          if ( queue.isWorking() && queue.isAlive() && r != null )
          {
            r.run();
          }
        }
        if ( log.isInfoEnabled() )
        {
          log.info( "QProcessor exiting for " + queue );
        }
      }
    }
  
    /**
     * Retries before declaring failure.
     *
     * @author asmuts @created January 15, 2002
     */
    private abstract class AbstractCacheEvent
        implements Runnable
    {
      int failures = 0;
      boolean done = false;
  
      /**
       * Main processing method for the AbstractCacheEvent object
       */
      public void run()
      {
        try
        {
          doRun();
        }
        catch ( IOException e )
        {
          if ( log.isWarnEnabled() )
          {
            log.warn( e );
          }
          if ( ++failures >= maxFailure )
          {
            if ( log.isWarnEnabled() )
            {
              log.warn(
                  "Error while running event from Queue: "
                  + this
                  +". Dropping Event and marking Event Queue as non-functional." );
            }
            setWorking( false );
            setAlive( false );
            return;
          }
          else
          {
            if ( log.isInfoEnabled() )
            {
              log.info( "Error while running event from Queue: " +
                           this +". Retrying..." );
            }
            try
            {
              Thread.sleep( waitBeforeRetry );
              run();
            }
            catch ( InterruptedException ie )
            {
              if ( log.isErrorEnabled() )
              {
                log.warn( "Interrupted while sleeping for retry on event " +
                             this +"." );
              }
              setWorking( false );
              setAlive( false );
            }
          }
        }
      }
  
      /**
       * @exception IOException
       */
      protected abstract void doRun() throws IOException;
    }
  
    /**
     * @author asmuts @created January 15, 2002
     */
    private class PutEvent
        extends AbstractCacheEvent
    {
  
      private ICacheElement ice;
  
      /**
       * Constructor for the PutEvent object
       *
       * @param ice
       * @exception IOException
       */
      PutEvent( ICacheElement ice ) throws IOException
      {
        this.ice = ice;
        /*
         * this.key = key; this.obj = CacheUtils.dup(obj); this.attr = attr; this.groupName
= groupName;
         */
      }
  
      /**
       * Description of the Method
       *
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        /*
         * CacheElement ce = new CacheElement(cacheName, key, obj); ce.setElementAttributes(
attr ); ce.setGroupName(
         * groupName );
         */
        listener.handlePut( ice );
      }
  
      public String toString()
      {
        return new StringBuffer( "PutEvent for key: " )
            .append( ice.getKey() )
            .append( " value: " )
            .append( ice.getVal() )
            .toString();
      }
  
    }
  
    /**
     * Description of the Class
     *
     * @author asmuts @created January 15, 2002
     */
    private class RemoveEvent
        extends AbstractCacheEvent
    {
      private Serializable key;
  
      /**
       * Constructor for the RemoveEvent object
       *
       * @param key
       * @exception IOException
       */
      RemoveEvent( Serializable key ) throws IOException
      {
        this.key = key;
      }
  
      /**
       * Description of the Method
       *
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        listener.handleRemove( cacheName, key );
      }
  
      /*
       * (non-Javadoc)
       *
       * @see java.lang.Object#toString()
       */
      public String toString()
      {
        return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
      }
  
    }
  
    /**
     * Description of the Class
     *
     * @author asmuts @created January 15, 2002
     */
    private class RemoveAllEvent
        extends AbstractCacheEvent
    {
  
      /**
       * Description of the Method
       *
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        listener.handleRemoveAll( cacheName );
      }
  
      /*
       * (non-Javadoc)
       *
       * @see java.lang.Object#toString()
       */
      public String toString()
      {
        return "RemoveAllEvent";
      }
  
    }
  
    /**
     * Description of the Class
     *
     * @author asmuts @created January 15, 2002
     */
    private class DisposeEvent
        extends AbstractCacheEvent
    {
  
      /**
       * Called when gets to the end of the queue
       *
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        listener.handleDispose( cacheName );
      }
  
      public String toString()
      {
        return "DisposeEvent";
      }
    }
  
    /**
     * @return
     */
    public boolean isWorking()
    {
      return working;
    }
  
    /**
     * @param b
     */
    public void setWorking( boolean b )
    {
      working = b;
    }
  
    public boolean isEmpty()
    {
      return tail == head;
    }
  
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org


Mime
View raw message