qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Kellum (JIRA)" <j...@apache.org>
Subject [jira] Created: (QPID-3016) Java JMS client ReplyTo memory leak
Date Sun, 23 Jan 2011 01:00:45 GMT
Java JMS client ReplyTo memory leak
-----------------------------------

                 Key: QPID-3016
                 URL: https://issues.apache.org/jira/browse/QPID-3016
             Project: Qpid
          Issue Type: Bug
          Components: Java Client
    Affects Versions: 0.8
         Environment: Fedora 14: 2.6.35.10-74.fc14.x86_64

Broker: qpidd (qpidc) version 0.8

Java Client 0.8

OpenJDK Runtime Environment (IcedTea6 1.9.3) (fedora-49.1.9.3.fc14-x86_64)
OpenJDK 64-Bit Server VM (build 19.0-b09, mixed mode)

            Reporter: David Kellum


I'm using the Java 0.8 client JMS interface. Relevant Client code is below.

If I make async requests to a queue with setJMSReplyTo as temporary queue for responses, the
client process will run out of memory after about 3.2M request/response message pairs. This
is at 512MB max java heap, though growth appears to be consistently linear.  Note that I use
a semaphore between request and response to keep ~1000 unanswered requests open in the client
and block before sending more. Thus this should not be a matter of simply saturating the client
with unsent messages.

Here is the top of the jhat histogram from jmap heap dump shortly before client runs out of
memory:

class org.apache.qpid.collections.ReferenceMap$SoftRef  3253120         143137280
class org.apache.qpid.collections.ReferenceMap$Entry    3253120         117112320
class [Lorg.apache.qpid.collections.ReferenceMap$Entry;       1         67108880
class org.apache.qpid.transport.ReplyTo                 3253120         61809280

Note that 3253120 appears to match the total number of request/replies successfully processed
by the client up to this point. Or in other words, its leaking one ReplyTo object and associated
(not so?) soft references per request/response.

If instead I replace the temporary queue with a fixed response and drop use of setJMSReplyTo(),
the client works fine, no memory leak.

Below are more details when running with the temp response queue:

% qpid-config queues

Queue Name                                     Attributes
======================================================================
TempQueued4051d9d-37d3-4306-a1fc-91b93f7082c8  auto-del excl
iudex-brutefuzzy-request                       --max-queue-size=100000 --limit-policy=reject

Client startup Log:

635  [main] INFO  o.a.q.j.PropertiesFileInitialContextFactory - No Provider URL specified.
726  [main] INFO  o.a.qpid.client.AMQConnection - Connection:amqp://qpid:********@default-client/default-vhost?brokerlist='tcp://localhost:5672'
973  [main] INFO  o.a.q.c.p.AMQProtocolSession - Using ProtocolVersion for Session:0-10
990  [main] INFO  o.a.q.c.h.ClientMethodDispatcherImpl - New Method Dispatcher:AMQProtocolSession[null]
1002 [main] INFO  o.a.qpid.client.AMQConnection - Connecting with ProtocolHandler Version:0-10
1150 [main] INFO  o.a.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-10
1192 [main] INFO  o.a.qpid.client.AMQSession - Created session:org.apache.qpid.client.AMQSession_0_10@1b7c63f
1280 [main] INFO  o.a.q.c.BasicMessageProducer_0_10 - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_10@1727745
using publish mode : ASYNC_PUBLISH_ALL
1503 [main] INFO  o.a.qpid.client.AMQSession - Prefetching delayed existing messages will
not flow until requested via receive*() or setML().
1586 [main] INFO  o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 created
1586 [Dispatcher-Channel-1] INFO  o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 started


Relevent client code:

public class Client
    implements MessageListener, Closeable, ExceptionListener
{
    public Client( JMSContext context )
        throws JMSException, NamingException
    {
        _connection = context.createConnection();

        _session = context.createSession( _connection );

        Destination requestQueue =
            context.lookupDestination( "iudex-brutefuzzy-request" );
        _producer = _session.createProducer( requestQueue );

        context.close();

        _responseQueue = _session.createTemporaryQueue();
        _session.createConsumer( _responseQueue ).setMessageListener(this);

        _connection.start();

    }

    public void sendRequest( long simhash, boolean doAdd )
        throws JMSException, InterruptedException
    {
        Builder bldr = Request.newBuilder();
        bldr.setSimhash( simhash );
        bldr.setAction( doAdd ? RequestAction.ADD : RequestAction.CHECK_ONLY );

        BytesMessage response = _session.createBytesMessage();
        response.writeBytes( bldr.build().toByteArray() );

        if( _responseQueue != null ) {
            response.setJMSReplyTo( _responseQueue );
        }
        _semaphore.acquire();

        _producer.send( response );
    }

    @Override
    public void onMessage( Message msg )
    {
        try {
            //Handle response
            msg.acknowledge();
            _semaphore.release();
        }
        catch( JMSException x ) {
            if( _log.isDebugEnabled() ) _log.error( "onMessage:", x );
            else _log.error( "onMessage: {}", x.toString() );
        }
    }

    private Session _session;
    private MessageProducer _producer;
    private boolean _createTemporaryResponseQueue = true;
    private Destination _responseQueue = null;
    private Connection _connection = null;
    private final Semaphore _semaphore = new Semaphore( 1000 );
    private Logger _log = LoggerFactory.getLogger( getClass() );
}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Mime
View raw message