qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jesse W. Hathaway" <je...@mbuki-mvuki.org>
Subject Re: ActiveConsumerCount vs ConsumerCount
Date Fri, 05 Feb 2010 16:53:23 GMT
> Hi Jesse,
> Sorry for not picking this up on qpid-users. Please see embedded responses.

no problem, thanks for the reply

> >  1. What does the ActiveConsumerCount represent?
> 
> The number of connected consumers that have available space in their
> prefetch buffer to receive messages.

Why would ActiveConsumerCount continually increase and ConsumerCount
stay steady?

> >  2. Is it possible that the increasing of the ActiveConsumerCount is
> >     causing the broker to exhaust its memory?
> 
> With out knowing more about what you are doing it is difficult to say
> but certainly the broker cannot service an infinite number of
> consumers.
> 
> >  3. What might be the reason my jruby JMS process is causing this value
> >     to increase?
> 
> I am not familiar with jruby jms (I'll try and take a look at the
> weekend). Tools like spring by default will create a new session and
> consumer for each message received. If jruby doing something simlar
> and you are sending a lot of messages then I would expect the
> behaviour you are experiencing with a large number of active
> consumers.

I am using Sparrow, http://github.com/leandrosilva/sparrow/

Here is the function from Sparrow I am using:

class Receiver < Base
  def receive_message(criteria_for_receiving = {:timeout => DEFAULT_RECEIVER_TIMEOUT, :selector
=> ''}, &message_handler)
    # Cria uma conexao, uma sessao e um consumidor de qualquer tipo de mensagem
    connection = @connection_factory.create_connection
    session    = connection.create_session(false, Session::AUTO_ACKNOWLEDGE)
    consumer   = session.create_consumer(@destination, criteria_for_receiving[:selector])
    
    # Prepara a conexao para receber mensagens
    connection.start
    
    # Inicia o recebimento de mensagens
    timeout = criteria_for_receiving[:timeout] || DEFAULT_RECEIVER_TIMEOUT
    
    while (received_message = consumer.receive(timeout))
      # Inclui o modulo de identificacao de mensagem, util para o message_handler
      class << received_message
        include MessageType
      end
    
      # Delega o tratamento da mensagem para o bloco recebido
      message_handler.call(received_message)
    end
    
    # Fecha a conexao
    connection.close
  end
end

I suspected that I was leaking connections since, it appears Sparrow is
creating a new connection each time I call receive_message, but from
what I have read `connection.close` should perform all the necessary
cleanup.

> Are you using topics by chance? If you are then every one of those
> consumers will be receiving a copy of the sent message which will
> greatly contribute to your OOM problems.

no these are direct messages

> If you have a heap dump of the broker to hand I shall put up some
> details of how you can interrogate the heap to understand what has
> happend.

Here is a heap dump, http://mbuki-mvuki.org/java_pid15443.hprof.bz2
I tried analyzing it with the Eclipse Memory Analyzer, but my knowledge
of java and QPID were to nascent to really figure out what was the cause.

thanks for the help, Jesse

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message