activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-7000 - ensure server sessions with closed active sessions get removed from the pool on stop, fix and test
Date Tue, 26 Jun 2018 10:17:12 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 9abe2c6f9 -> 5ebee0ace


AMQ-7000 - ensure server sessions with closed active sessions get removed from the pool on
stop, fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5ebee0ac
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5ebee0ac
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5ebee0ac

Branch: refs/heads/master
Commit: 5ebee0ace7e5e4d70ea476b93ac091bd452f04ab
Parents: 9abe2c6
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jun 26 11:06:38 2018 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Jun 26 11:07:04 2018 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQSession.java    |   8 +-
 .../activemq/ra/ActiveMQEndpointWorker.java     |   6 +-
 .../apache/activemq/ra/ServerSessionImpl.java   |  21 +-
 .../activemq/ra/ServerSessionPoolImpl.java      |  15 +-
 .../activemq/ra/ServerSessionImplTest.java      | 235 +++++++++++++++++--
 5 files changed, 254 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5ebee0ac/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index af043d0..1b4abc1 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -1043,9 +1043,11 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
                     messageListener.onMessage(message);
 
                 } catch (Throwable e) {
-                    LOG.error("error dispatching message: ", e);
+                    if (!isClosed()) {
+                        LOG.error("{} error dispatching message: {} ", this, message.getMessageId(),
e);
+                    }
 
-                    if (getTransactionContext().isInXATransaction()) {
+                    if (getTransactionContext() != null && getTransactionContext().isInXATransaction())
{
                         LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
                         getTransactionContext().setRollbackOnly(true);
                     }
@@ -2168,7 +2170,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession,
Sta
 
     @Override
     public String toString() {
-        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get()
+ "} " + sendMutex;
+        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get()
+ ",closed=" + closed + "} " + sendMutex;
     }
 
     public void checkMessageListener() throws JMSException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/5ebee0ac/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
index ac35959..213c1ab 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
@@ -283,7 +283,11 @@ public class ActiveMQEndpointWorker {
             LOG.info("Stopping");
             // wake up pausing reconnect attempt
             shutdownMutex.notifyAll();
-            serverSessionPool.close();
+            try {
+                serverSessionPool.close();
+            } catch (Throwable ignored) {
+                LOG.debug("Unexpected error on server session pool close", ignored);
+            }
         }
         disconnect();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5ebee0ac/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
index a4382ee..dcf73e9 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
@@ -105,7 +105,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
     }
 
     protected boolean isStale() {
-        return stale || !session.isRunning();
+        return stale || !session.isRunning() || !session.isClosed();
     }
 
     public MessageProducer getMessageProducer() throws JMSException {
@@ -159,13 +159,15 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
      * @see java.lang.Runnable#run()
      */
     public void run() {
-        log.debug("Running");
+        log.debug("{} Running", this);
         currentBatchSize = 0;
         while (true) {
-            log.debug("run loop start");
+            log.debug("{} run loop", this);
             try {
                 InboundContextSupport.register(this);
-                if ( session.isRunning() ) {
+                if (session.isClosed()) {
+                    stale = true;
+                } else if (session.isRunning() ) {
                     session.run();
                 } else {
                     log.debug("JMS Session {} with unconsumed {} is no longer running (maybe
due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size());
@@ -174,9 +176,9 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
             } catch (Throwable e) {
                 stale = true;
                 if ( log.isDebugEnabled() ) {
-                    log.debug("Endpoint {} failed to process message.", session, e);
+                    log.debug("Endpoint {} failed to process message.", this, e);
                 } else if ( log.isInfoEnabled() ) {
-                    log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(),
session);
+                    log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(),
this);
                 }
             } finally {
                 InboundContextSupport.unregister(this);
@@ -184,20 +186,23 @@ public class ServerSessionImpl implements ServerSession, InboundContext,
Work, D
                 synchronized (runControlMutex) {
                     // This endpoint may have gone stale due to error
                     if (stale) {
+                        log.debug("Session {} stale, removing from pool", this);
                         runningFlag = false;
                         pool.removeFromPool(this);
                         break;
                     }
                     if (!session.hasUncomsumedMessages()) {
                         runningFlag = false;
-                        log.debug("Session has no unconsumed message, returning to pool");
+                        log.debug("Session {} has no unconsumed message, returning to pool",
this);
                         pool.returnToPool(this);
                         break;
+                    } else {
+                        log.debug("Session has session has more work to do b/c of unconsumed",
this);
                     }
                 }
             }
         }
-        log.debug("Run finished");
+        log.debug("{} Run finished", this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/5ebee0ac/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
index 94bca0c..6c281d8 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
@@ -194,7 +194,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
 
     public void returnToPool(ServerSessionImpl ss) {
         sessionLock.lock();
-            activeSessions.remove(ss);
+        activeSessions.remove(ss);
         try {
             // make sure we only return non-stale sessions to the pool
             if ( ss.isStale() ) {
@@ -226,7 +226,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
         try {
             ActiveMQSession session = (ActiveMQSession)ss.getSession();
             List<MessageDispatch> l = session.getUnconsumedMessages();
-            if (!l.isEmpty()) {
+            if (!isClosing() && !l.isEmpty()) {
                 ActiveMQConnection connection = activeMQAsfEndpointWorker.getConnection();
                 if (connection != null) {
                     for (Iterator<MessageDispatch> i = l.iterator(); i.hasNext();)
{
@@ -276,6 +276,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
 
     public void close() {
         closing.set(true);
+        LOG.debug("{} close", this);
         int activeCount = closeSessions();
         // we may have to wait erroneously 250ms if an
         // active session is removed during our wait and we
@@ -300,11 +301,16 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
     protected int closeSessions() {
         sessionLock.lock();
         try {
+            List<ServerSessionImpl> alreadyClosedServerSessions = new ArrayList<>(activeSessions.size());
             for (ServerSessionImpl ss : activeSessions) {
                 try {
                     ActiveMQSession session = (ActiveMQSession) ss.getSession();
                     if (!session.isClosed()) {
                         session.close();
+                    } else {
+                        LOG.debug("Session {} already closed", session);
+                        alreadyClosedServerSessions.add(ss);
+
                     }
                 } catch (JMSException ignored) {
                     if (LOG.isDebugEnabled()) {
@@ -312,6 +318,11 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
                     }
                 }
             }
+            for (ServerSessionImpl ss : alreadyClosedServerSessions) {
+                removeFromPool(ss);
+            }
+            alreadyClosedServerSessions.clear();
+
             for (ServerSessionImpl ss : idleSessions) {
                 ss.close();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5ebee0ac/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
index 927a9b3..a9bab8d 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
@@ -16,23 +16,6 @@
  */
 package org.apache.activemq.ra;
 
-import static org.junit.Assert.assertTrue;
-
-import java.lang.reflect.Method;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Session;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.Work;
-import javax.resource.spi.work.WorkListener;
-import javax.resource.spi.work.WorkManager;
-import javax.transaction.xa.XAResource;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -40,6 +23,7 @@ import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.util.Wait;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
@@ -56,6 +40,25 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
+import javax.transaction.xa.XAResource;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertTrue;
+
 @RunWith(JMock.class)
 public class ServerSessionImplTest {
 
@@ -234,4 +237,202 @@ public class ServerSessionImplTest {
         assertTrue("run has completed", runState.await(20, TimeUnit.SECONDS));
         assertTrue("not all messages consumed", messageCount.getCount() > 0);
     }
+
+    @Test
+    public void testGetWhenClosed() throws Exception {
+
+        final int maxMessages = 2000;
+        final AtomicReference<CountDownLatch> messageCountRef = new AtomicReference<CountDownLatch>();
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+
+        final MessageEndpointFactory messageEndpointFactory = context.mock(MessageEndpointFactory.class);
+        final MessageResourceAdapter resourceAdapter = context.mock(MessageResourceAdapter.class);
+        final ActiveMQEndpointActivationKey key = context.mock(ActiveMQEndpointActivationKey.class);
+        messageEndpoint = context.mock(MessageEndpointProxy.class);
+        workManager = context.mock(WorkManager.class);
+        final MessageActivationSpec messageActivationSpec = context.mock(MessageActivationSpec.class);
+        final BootstrapContext boostrapContext = context.mock(BootstrapContext.class);
+        context.checking(new Expectations() {
+            {
+                allowing(boostrapContext).getWorkManager();
+                will(returnValue(workManager));
+                allowing(resourceAdapter).getBootstrapContext();
+                will(returnValue(boostrapContext));
+                allowing(messageEndpointFactory).isDeliveryTransacted(with(any(Method.class)));
+                will(returnValue(Boolean.FALSE));
+                allowing(key).getMessageEndpointFactory();
+                will(returnValue(messageEndpointFactory));
+                allowing(key).getActivationSpec();
+                will(returnValue(messageActivationSpec));
+                allowing(messageActivationSpec).isUseJndi();
+                will(returnValue(Boolean.FALSE));
+                allowing(messageActivationSpec).getDestinationType();
+                will(returnValue("javax.jms.Queue"));
+                allowing(messageActivationSpec).getDestination();
+                will(returnValue("Queue"));
+                allowing(messageActivationSpec).getAcknowledgeModeForSession();
+                will(returnValue(1));
+                allowing(messageActivationSpec).getMaxSessionsIntValue();
+                will(returnValue(10));
+                allowing(messageActivationSpec).getEnableBatchBooleanValue();
+                will(returnValue(Boolean.FALSE));
+                allowing(messageActivationSpec).isUseRAManagedTransactionEnabled();
+                will(returnValue(Boolean.TRUE));
+                allowing(messageEndpointFactory).createEndpoint(with(any(XAResource.class)));
+                will(returnValue(messageEndpoint));
+
+                allowing(workManager).scheduleWork((Work) with(Matchers.instanceOf(Work.class)),
with(any(long.class)), with(any(ExecutionContext.class)),
+                        with(any(WorkListener.class)));
+                will(new Action() {
+                    @Override
+                    public Object invoke(Invocation invocation) throws Throwable {
+                        LOG.info("Wok manager invocation: " + invocation);
+
+                        if (invocation.getParameter(0) instanceof ServerSessionImpl) {
+                            final ServerSessionImpl serverSession1 = (ServerSessionImpl)invocation.getParameter(0);
+                            executorService.execute(new Runnable() {
+                                @Override
+                                public void run() {
+                                    try {
+                                        serverSession1.run();
+                                    } catch (Exception e) {
+                                        LOG.error("Error on Work run: {}", serverSession1,
e);
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                        }
+                        return null;
+                    }
+
+                    @Override
+                    public void describeTo(Description description) {
+                    }
+                });
+
+                allowing(messageEndpoint).beforeDelivery((Method) with(Matchers.instanceOf(Method.class)));
+                allowing(messageEndpoint).onMessage(with(any(javax.jms.Message.class)));
+                will(new Action() {
+                    @Override
+                    public Object invoke(Invocation invocation) throws Throwable {
+                        messageCountRef.get().countDown();
+                        return null;
+                    }
+
+                    @Override
+                    public void describeTo(Description description) {
+                        description.appendText("Keep message count");
+                    }
+                });
+                allowing(messageEndpoint).afterDelivery();
+                will(new Action() {
+                    @Override
+                    public void describeTo(Description description) {
+                        description.appendText("do sync work on broker");
+                    }
+
+                    @Override
+                    public Object invoke(Invocation invocation) throws Throwable {
+                        TransactionInfo transactionInfo = new TransactionInfo();
+                        transactionInfo.setType(TransactionInfo.END);
+                        LOG.info("AfterDelivery on: " + messageCountRef.get().getCount());
+                        return null;
+                    }
+                });
+                allowing(messageEndpoint).release();
+
+            }
+        });
+
+        endpointWorker = new ActiveMQEndpointWorker(resourceAdapter, key);
+        endpointWorker.setConnection(con);
+
+
+        for (int i=0; i<40; i++) {
+            final int iteration  = i;
+            LOG.info("ITERATION: " +  iteration);
+            pool = new ServerSessionPoolImpl(endpointWorker, 2);
+            endpointWorker.start();
+
+            messageCountRef.set(new CountDownLatch(maxMessages));
+
+            final CountDownLatch senderDone = new CountDownLatch(1);
+            final CountDownLatch messageSent = new CountDownLatch(maxMessages);
+            final AtomicBoolean foundClosedSession = new AtomicBoolean(false);
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        // preload the session dispatch queue to keep the session active
+
+                        for (int i = 0; i < maxMessages; i++) {
+                            MessageDispatch messageDispatch = new MessageDispatch();
+                            ActiveMQMessage message = new ActiveMQTextMessage();
+                            message.setMessageId(new MessageId("0:0:0:" + i));
+                            message.getMessageId().setBrokerSequenceId(i);
+                            messageDispatch.setMessage(message);
+                            messageDispatch.setConsumerId(new ConsumerId("0:0:0"));
+                            ServerSessionImpl serverSession1 = null;
+                            try {
+                                serverSession1 = (ServerSessionImpl) pool.getServerSession();
+                                ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
+                                if (session1.isClosed()) {
+                                    // closed flag is not volatile - ok to give a whirl with
it closed
+                                    foundClosedSession.set(true);
+                                }
+                                session1.dispatch(messageDispatch);
+                                messageSent.countDown();
+                                serverSession1.start();
+                            } catch (JMSException okOnClose) {
+                                LOG.info("Exception on dispatch to {}", serverSession1, okOnClose);
+                            }
+                        }
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    } finally {
+                        senderDone.countDown();
+                    }
+                }
+            });
+
+            assertTrue("[" + iteration + "] Some messages dispatched", Wait.waitFor(new Wait.Condition()
{
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.info("[" + iteration + "] Wait before close work MessageSent: " +
messageSent.getCount() + ", messages got: "+ messageCountRef.get().getCount());
+                    return messageSent.getCount() < maxMessages - 20 && messageCountRef.get().getCount()
< maxMessages - 5;
+                }
+            }, 5000, 10));
+
+            assertTrue("some messages consumed", messageCountRef.get().getCount() < maxMessages);
+
+            final CountDownLatch closeDone = new CountDownLatch(1);
+            final CountDownLatch closeSuccess = new CountDownLatch(1);
+
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.info("[" + iteration + "] Closing pool on delivered {} and dispatched
{}", messageSent.getCount(), messageCountRef.get().getCount());
+                    try {
+                        pool.close();
+                        closeSuccess.countDown();
+                    } catch (InvalidMessageEndpointException error) {
+                        LOG.error("Ex on pool close", error);
+                        //error.printStackTrace();
+                    } finally {
+                        closeDone.countDown();
+                    }
+                }
+            });
+
+            assertTrue("[" + iteration + "] Pool close does not block", closeDone.await(10,
TimeUnit.SECONDS));
+            assertTrue("[" + iteration + "] Pool close ok", closeSuccess.await(10, TimeUnit.MILLISECONDS));
+
+            assertTrue("[" + iteration + "] not all delivered", messageCountRef.get().getCount()
> 0);
+
+            assertTrue("[" + iteration + "] sender complete", senderDone.await(30, TimeUnit.SECONDS));
+        }
+    }
+
 }


Mime
View raw message