qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rajith Attapattu" <rajit...@gmail.com>
Subject Re: svn commit: r562489 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms: MessageActor.java SessionImpl.java
Date Fri, 03 Aug 2007 19:54:16 GMT
I figured that part the hard way

On 8/3/07, Rafael Schloming <rafaels@redhat.com> wrote:
>
> This change broke the build. Changing the _destination field in
> MessageActor to private caused compilation errors in other classes that
> depend on package access to that field.
>
> I've changed it back to package access for the moment, but you may want
> to add a getter or alter the other classes if you intended to make it
> private.
>
> Please remember that maven doesn't understand java dependencies, it just
> rebuilds based on timestamp. That means when you change an interface or
> alter the non private signature of any class or method you MUST do an
> mvn clean; mvn install in order to be sure your change really does
> compile.
>
> --Rafael
>
> arnaudsimon@apache.org wrote:
> > Author: arnaudsimon
> > Date: Fri Aug  3 07:52:43 2007
> > New Revision: 562489
> >
> > URL: http://svn.apache.org/viewvc?view=rev&rev=562489
> > Log:
> > implemented message dispatching thread
> >
> > Modified:
> >
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> >
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> >
> > Modified:
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> > URL:
> http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562489&r1=562488&r2=562489
> >
> ==============================================================================
> > ---
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> (original)
> > +++
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
> Fri Aug  3 07:52:43 2007
> > @@ -37,17 +37,23 @@
> >      /**
> >       * Indicates whether this MessageActor is closed.
> >       */
> > -    boolean _isClosed = false;
> > +    private boolean _isClosed = false;
> >
> >      /**
> >       * This messageActor's session
> >       */
> > -    SessionImpl _session;
> > +    private SessionImpl _session;
> >
> >      /**
> >       * The JMS destination this actor is set for.
> >       */
> > -    DestinationImpl _destination;
> > +    private DestinationImpl _destination;
> > +
> > +
> > +    /**
> > +     * The ID of this actor for the session.
> > +     */
> > +    private String _messageActorID;
> >
> >      //-- Constructor
> >
> > @@ -140,5 +146,16 @@
> >      {
> >          return _session;
> >      }
> > +
> > +    /**
> > +     * Get the ID of this actor within its session.
> > +     *
> > +     * @return This actor ID.
> > +     */
> > +    protected String getMessageActorID()
> > +    {
> > +        return _messageActorID;
> > +    }
> > +
> >
> >  }
> >
> > Modified:
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> > URL:
> http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562489&r1=562488&r2=562489
> >
> ==============================================================================
> > ---
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> (original)
> > +++
> incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
> Fri Aug  3 07:52:43 2007
> > @@ -28,9 +28,9 @@
> >  import javax.jms.Message;
> >  import javax.jms.MessageListener;
> >  import java.io.Serializable;
> > -import java.util.ArrayList;
> >  import java.util.Vector;
> >  import java.util.LinkedList;
> > +import java.util.HashMap;
> >
> >  /**
> >   * Implementation of the JMS Session interface
> > @@ -54,7 +54,7 @@
> >      private boolean _hasStopped = false;
> >
> >      /**
> > -     * lock for the sessionThread to wiat on when the session is
> stopped
> > +     * lock for the sessionThread to wait until the session is stopped
> >       */
> >      private Object _stoppingLock = new Object();
> >
> > @@ -63,11 +63,16 @@
> >       */
> >      private Object _stoppingJoin = new Object();
> >
> > +    /**
> > +     * thread to dispatch messages to async consumers
> > +     */
> > +    private MessageDispatcherThread _messageDispatcherThread = null;
> > +
> >
> >      /**
> >       * The messageActors of this session.
> >       */
> > -    private ArrayList<MessageActor> _messageActors = new
> ArrayList<MessageActor>();
> > +    private HashMap<String, MessageActor> _messageActors = new
> HashMap<String, MessageActor>();
> >
> >      /**
> >       * All the not yet acknoledged messages
> > @@ -151,6 +156,10 @@
> >          {
> >              throw ExceptionHelper.convertQpidExceptionToJMSException
> (e);
> >          }
> > +        // Create and start a MessageDispatcherThread
> > +        // This thread is dispatching messages to the async consumers
> > +        _messageDispatcherThread = new MessageDispatcherThread();
> > +        _messageDispatcherThread.start();
> >      }
> >
> >      //--- javax.jms.Session API
> > @@ -362,10 +371,43 @@
> >      {
> >          if (!_isClosed)
> >          {
> > +            _messageDispatcherThread.interrupt();
> > +            if (!_isClosing)
> > +            {
> > +                _isClosing = true;
> > +                // if the session is stopped then restart it before
> notifying on the lock
> > +                // that will stop the sessionThread
> > +                if (_isStopped)
> > +                {
> > +                    start();
> > +                }
> > +
> > +                //stop the sessionThread
> > +                synchronized (_incomingAsynchronousMessages)
> > +                {
> > +                    _incomingAsynchronousMessages.notifyAll();
> > +                }
> > +
> > +                try
> > +                {
> > +                    _messageDispatcherThread.join();
> > +                    _messageDispatcherThread = null;
> > +                }
> > +                catch (InterruptedException ie)
> > +                {
> > +                    /* ignore */
> > +                }
> > +            }
> >              // from now all the session methods will throw a
> IllegalStateException
> >              _isClosed = true;
> >              // close all the actors
> >              closeAllActors();
> > +            _messageActors.clear();
> > +            synchronized (_incomingAsynchronousMessages)
> > +            {
> > +                _incomingAsynchronousMessages.clear();
> > +                _incomingAsynchronousMessages.notifyAll();
> > +            }
> >              // close the underlaying QpidSession
> >              try
> >              {
> > @@ -375,6 +417,7 @@
> >              {
> >                  throw
> ExceptionHelper.convertQpidExceptionToJMSException(e);
> >              }
> > +
> >          }
> >      }
> >
> > @@ -466,7 +509,7 @@
> >          checkNotClosed();
> >          MessageProducerImpl producer = new MessageProducerImpl(this,
> (DestinationImpl) destination);
> >          // register this actor with the session
> > -        _messageActors.add(producer);
> > +        _messageActors.put(producer.getMessageActorID(), producer);
> >          return producer;
> >      }
> >
> > @@ -523,7 +566,7 @@
> >          checkDestination(destination);
> >          MessageConsumerImpl consumer = new MessageConsumerImpl(this,
> (DestinationImpl) destination, messageSelector, noLocal, null);
> >          // register this actor with the session
> > -        _messageActors.add(consumer);
> > +        _messageActors.put(consumer.getMessageActorID(), consumer);
> >          return consumer;
> >      }
> >
> > @@ -610,7 +653,7 @@
> >          checkNotClosed();
> >          checkDestination(topic);
> >          TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this,
> topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
> > -        _messageActors.add(subscriber);
> > +        _messageActors.put(subscriber.getMessageActorID(), subscriber);
> >          return subscriber;
> >      }
> >
> > @@ -643,7 +686,7 @@
> >          checkDestination(queue);
> >          QueueBrowserImpl browser = new QueueBrowserImpl(this, queue,
> messageSelector);
> >          // register this actor with the session
> > -        _messageActors.add(browser);
> > +        _messageActors.put(browser.getMessageActorID(), browser);
> >          return browser;
> >      }
> >
> > @@ -710,7 +753,18 @@
> >       */
> >      protected void start() throws JMSException
> >      {
> > -        // TODO: make sure that the correct options are used
> > +        if (_isStopped)
> > +        {
> > +            synchronized (_stoppingLock)
> > +            {
> > +                _isStopped = false;
> > +                _stoppingLock.notify();
> > +            }
> > +            synchronized (_stoppingJoin)
> > +            {
> > +                _hasStopped = false;
> > +            }
> > +        }
> >      }
> >
> >      /**
> > @@ -720,7 +774,30 @@
> >       */
> >      protected void stop() throws JMSException
> >      {
> > -        // TODO: make sure that the correct options are used
> > +        if (!_isClosing && !_isStopped)
> > +        {
> > +            synchronized (_incomingAsynchronousMessages)
> > +            {
> > +                _isStopped = true;
> > +                // unlock the sessionThread that will then wait on
> _stoppingLock
> > +                _incomingAsynchronousMessages.notifyAll();
> > +            }
> > +            // wait for the sessionThread to stop processing messages
> > +            synchronized (_stoppingJoin)
> > +            {
> > +                while (!_hasStopped)
> > +                {
> > +                    try
> > +                    {
> > +                        _stoppingJoin.wait();
> > +                    }
> > +                    catch (InterruptedException e)
> > +                    {
> > +                        /* ignore */
> > +                    }
> > +                }
> > +            }
> > +        }
> >      }
> >
> >      /**
> > @@ -847,7 +924,7 @@
> >       */
> >      private void closeAllActors() throws JMSException
> >      {
> > -        for (MessageActor messageActor : _messageActors)
> > +        for (MessageActor messageActor : _messageActors.values())
> >          {
> >              messageActor.closeMessageActor();
> >          }
> > @@ -861,8 +938,6 @@
> >       * This thread is responsible for removing messages from
> m_incomingMessages and
> >       * dispatching them to the appropriate MessageConsumer.
> >       * <p> Messages have to be dispatched serially.
> > -     *
> > -     * @message runtimeExceptionThrownByOnMessage Warning! Asynchronous
> message consumer {0} from session {1} has thrown a RunTimeException "{2}".
> >       */
> >      private class MessageDispatcherThread extends Thread
> >      {
> > @@ -932,27 +1007,27 @@
> >                      }
> >                  }
> >
> > -              /*  if (message != null)
> > +                if (message != null)
> >                  {
> >                      MessageConsumerImpl mc;
> > -                    synchronized (_actors)
> > +                    synchronized (_messageActors)
> >                      {
> > -                        mc = (MessageConsumerImpl) m_actors.get(
> actorMessage.consumerID);
> > +                        mc = null; // todo _messageActors.get(
> message.consumerID);
> >                      }
> >                      boolean consumed = false;
> >                      if (mc != null)
> >                      {
> >                          try
> >                          {
> > -                            consumed = mc.onMessage(
> actorMessage.genericMessage);
> > +                            // todo call onMessage
> >                          }
> >                          catch (RuntimeException t)
> >                          {
> >                              // the JMS specification tells us to flag
> that to the client!
> > -                            log.errorb(SessionThread.class.getName(),
> "runtimeExceptionThrownByOnMessage", new Object[]{mc, m_sessionID, t}, t);
> > +                            _logger.error("Warning! Asynchronous
> message consumer" + mc + " from session " + this + " has thrown a
> RunTimeException " + t);
> >                          }
> >                      }
> > -                } */
> > +                }
> >                  message = null;
> >              }
> >              while (!_isClosing);   // repeat as long as this session is
> not closing
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message