activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: [AMQ-6906] tidy up cleanup on jdbc error and combine updates in single completion to avoid prepared sequence update on non transacted add with error. More jdbc error related tests
Date Thu, 03 May 2018 10:32:32 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 314d5a516 -> bd45d931b


[AMQ-6906] tidy up cleanup on jdbc error and combine updates in single completion to avoid prepared sequence update on non transacted add with error. More jdbc error related tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd45d931
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd45d931
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd45d931

Branch: refs/heads/master
Commit: bd45d931ba273be4d94bf213c6befd116f99dcc8
Parents: 314d5a5
Author: gtully <gary.tully@gmail.com>
Authored: Thu May 3 11:32:21 2018 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Thu May 3 11:32:21 2018 +0100

----------------------------------------------------------------------
 .../store/jdbc/JDBCPersistenceAdapter.java      |    6 +-
 .../store/jdbc/JdbcMemoryTransactionStore.java  |   16 +-
 .../activemq/store/jdbc/TransactionContext.java |   73 +-
 .../activemq/broker/XARecoveryBrokerTest.java   |    4 +-
 .../store/jdbc/JDBCCommitExceptionTest.java     |   12 +-
 .../store/jdbc/JDBCXACommitExceptionTest.java   | 1151 ++++++++++++++++++
 .../activemq/store/jdbc/XACompletionTest.java   |    4 +
 7 files changed, 1213 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 102dec5..95b1446 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -785,12 +785,10 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         }
     }
 
-    public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
+    public void commitAdd(ConnectionContext context, final MessageId messageId, final long preparedSequenceId, final long newSequence) throws IOException {
         TransactionContext c = getTransactionContext(context);
         try {
-            long sequence = (Long)messageId.getEntryLocator();
-            getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
-            messageId.setEntryLocator(preparedSequenceId);
+            getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence);
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 6df0860..4bbe43d 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -78,11 +78,12 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
                 cmd.run(ctx);
             }
 
+            persistenceAdapter.commitTransaction(ctx);
+
         } catch ( IOException e ) {
             persistenceAdapter.rollbackTransaction(ctx);
             throw e;
         }
-        persistenceAdapter.commitTransaction(ctx);
 
         ctx.setXid(null);
         // setup for commit outcome
@@ -126,13 +127,15 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
             final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator();
             TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
 
+            long newSequence;
             synchronized (jdbcMessageStore.pendingAdditions) {
-                message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
-
+                newSequence = jdbcPersistenceAdapter.getNextSequenceId();
+                final long sequenceToSet = newSequence;
                 c.onCompletion(new Runnable() {
                     @Override
                     public void run() {
-                        message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator());
+                        message.getMessageId().setEntryLocator(sequenceToSet);
+                        message.getMessageId().setFutureOrSequenceLong(sequenceToSet);
                     }
                 });
 
@@ -141,7 +144,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
                 }
             }
 
-            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence);
+            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence, newSequence);
             jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority());
         }
 
@@ -175,8 +178,9 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
                             ((LastAckCommand)removeMessageCommand).rollback(ctx);
                         } else {
                             MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
+                            long sequence = (Long)messageId.getEntryLocator();
                             // need to unset the txid flag on the existing row
-                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator());
+                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence);
 
                             if (removeMessageCommand instanceof RecoveredRemoveMessageCommand) {
                                 ((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand) removeMessageCommand).getMessage());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index ab3bef8..db2aace 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -84,7 +84,7 @@ public class TransactionContext {
                 } catch (IllegalMonitorStateException oops) {
                     LOG.error("Thread does not hold the context lock on close of:"  + connection, oops);
                 }
-                close();
+                silentClose();
                 IOException ioe = IOExceptionSupport.create(e);
                 if (persistenceAdapter.getBrokerService() != null) {
                     persistenceAdapter.getBrokerService().handleIOException(ioe);
@@ -137,45 +137,39 @@ public class TransactionContext {
         } finally {
             try {
                 p.close();
-            } catch (Throwable e) {
-            }
+            } catch (Throwable ignored) {}
         }
     }
 
+    private void silentClose() {
+        silentClosePreparedStatements();
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Throwable ignored) {}
+            connection = null;
+        }
+    }
+
+
     public void close() throws IOException {
         if (!inTx) {
             try {
-
-                /**
-                 * we are not in a transaction so should not be committing ??
-                 * This was previously commented out - but had adverse affects
-                 * on testing - so it's back!
-                 * 
-                 */
-                try {
+                // can be null for topic ops that bypass the store via existing cursor state
+                if (connection != null) {
+                    final boolean needsCommit = !connection.getAutoCommit();
                     executeBatch();
-                } finally {
-                    if (connection != null && !connection.getAutoCommit()) {
+                    if (needsCommit) {
                         connection.commit();
                     }
                 }
-
             } catch (SQLException e) {
                 JDBCPersistenceAdapter.log("Error while closing connection: ", e);
                 IOException ioe = IOExceptionSupport.create(e);
                 persistenceAdapter.getBrokerService().handleIOException(ioe);
                 throw ioe;
             } finally {
-                try {
-                    if (connection != null) {
-                        connection.close();
-                    }
-                } catch (Throwable e) {
-                    // ignore
-                    LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e);
-                } finally {
-                    connection = null;
-                }
+                silentClose();
                 for (Runnable completion: completions) {
                     completion.run();
                 }
@@ -197,8 +191,9 @@ public class TransactionContext {
             throw new IOException("Not started.");
         }
         try {
+            final boolean needsCommit = !connection.getAutoCommit();
             executeBatch();
-            if (!connection.getAutoCommit()) {
+            if (needsCommit) {
                 connection.commit();
             }
         } catch (SQLException e) {
@@ -230,19 +225,23 @@ public class TransactionContext {
         }
     }
 
-    private void doRollback() throws SQLException {
-        if (addMessageStatement != null) {
-            addMessageStatement.close();
-            addMessageStatement = null;
-        }
-        if (removedMessageStatement != null) {
-            removedMessageStatement.close();
-            removedMessageStatement = null;
-        }
-        if (updateLastAckStatement != null) {
-            updateLastAckStatement.close();
-            updateLastAckStatement = null;
+    private PreparedStatement silentClosePreparedStatement(PreparedStatement preparedStatement) {
+        if (preparedStatement != null) {
+            try {
+                preparedStatement.close();
+            } catch (Throwable ignored) {}
         }
+        return null;
+    }
+
+    private void silentClosePreparedStatements() {
+        addMessageStatement = silentClosePreparedStatement(addMessageStatement);
+        removedMessageStatement = silentClosePreparedStatement(removedMessageStatement);
+        updateLastAckStatement = silentClosePreparedStatement(updateLastAckStatement);
+    }
+
+    private void doRollback() throws SQLException {
+        silentClosePreparedStatements();
         completions.clear();
         connection.rollback();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 387e77f..6a8b3f4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
  */
 public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
-    public boolean prioritySupport = false;
+    public boolean prioritySupport = true;
 
     public void testPreparedJmxView() throws Exception {
 
@@ -712,7 +712,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
 
     }
 
-    public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
+    public void x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
         addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
index 6972a14..2b498a3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
@@ -76,19 +76,23 @@ public class JDBCCommitExceptionTest extends TestCase {
         broker.stop();
     }
 
-     protected void dumpMessages() throws Exception {
+     protected int dumpMessages() throws Exception {
+        int count = 0;
         WireFormat wireFormat = new OpenWireFormat();
         java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
-        PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
+        PreparedStatement statement = conn.prepareStatement("SELECT ID, XID, MSG FROM ACTIVEMQ_MSGS");
         ResultSet result = statement.executeQuery();
         LOG.info("Messages left in broker after test");
         while(result.next()) {
             long id = result.getLong(1);
-            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
-            LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+            String xid = result.getString(2);
+            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(3)));
+            LOG.info("id: " + id + ", xid: " + xid + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+            count++;
         }
         statement.close();
         conn.close();
+        return count;
     }
 
     protected int receiveMessages(int messagesExpected) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
index 046ab81..1529515 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
@@ -21,16 +21,72 @@ import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.jms.XAConnection;
 import javax.jms.XASession;
+import javax.management.ObjectName;
+import javax.sql.DataSource;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLRecoverableException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.DiscardingDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -166,4 +222,1099 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
         assertEquals("one enque", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
     }
 
+
+    final AtomicInteger getAutoCommitCount = new AtomicInteger();
+    private ArrayList<Integer> getAutoCommitErrors = new ArrayList<Integer>();
+    private ArrayList<Integer> executeUpdateErrorOps = new ArrayList<Integer>();
+    final AtomicInteger executeUpdateErrorOpsCount = new AtomicInteger();
+    private ArrayList<Integer> executeBatchErrorOps = new ArrayList<Integer>();
+    final AtomicInteger executeBatchErrorOpsCount = new AtomicInteger();
+
+    public void testXAEnqueueErrors() throws Exception {
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.clear();
+        executeUpdateErrorOpsCount.set(0);
+        executeUpdateErrorOps.clear();
+
+        broker.stop();
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        //broker.setDeleteAllMessagesOnStartup(true);
+
+        JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+        DataSource realDataSource = jdbc.getDataSource();
+        jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource));
+        jdbcPersistenceAdapter.setUseLock(false);
+        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+
+
+        // inject error
+        executeUpdateErrorOps.add(5);
+        executeUpdateErrorOps.add(9);
+        executeUpdateErrorOps.add(12);
+
+        getAutoCommitErrors.add(59);
+        getAutoCommitErrors.add(60);
+
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri);
+
+        XAConnection c = factory.createXAConnection();
+        c.start();
+        XASession s = c.createXASession();
+        final XAResource recoveryResource = s.getXAResource();
+
+        for (int i = 0; i < 10; i++) {
+            XAConnection connection = factory.createXAConnection();
+            connection.start();
+            XASession session = connection.createXASession();
+
+            Destination destination = session.createQueue("TEST");
+            MessageProducer producer = session.createProducer(destination);
+
+            XAResource resource = session.getXAResource();
+
+            Xid tid = createXid();
+            resource.start(tid, XAResource.TMNOFLAGS);
+            ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+            message.setTransactionId(new XATransactionId(tid));
+            producer.send(message);
+
+            resource.end(tid, XAResource.TMSUCCESS);
+            resource.prepare(tid);
+
+            try {
+                resource.commit(tid, false);
+            } catch (Exception expected) {
+                expected.printStackTrace();
+
+                dumpMessages();
+
+                boolean done = false;
+                while (!done) {
+                    // recover
+                    Xid[] recovered = recoveryResource.recover(XAResource.TMSTARTRSCAN);
+                    recoveryResource.recover(XAResource.TMNOFLAGS);
+
+                    try {
+                        recoveryResource.commit(recovered[0], false);
+                        done = true;
+                    } catch (XAException ok) {
+                        ok.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        dumpMessages();
+
+        assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+           .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("qs", 10, proxy.getQueueSize());
+        assertEquals("enq", 10, proxy.getEnqueueCount());
+        assertEquals("curs", 10, proxy.cursorSize());
+    }
+
+    public void testNonTxEnqueueErrors() throws Exception {
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.clear();
+        executeUpdateErrorOpsCount.set(0);
+        executeUpdateErrorOps.clear();
+        executeBatchErrorOps.clear();
+        executeBatchErrorOpsCount.set(0);
+
+        broker.stop();
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+
+        JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+        DataSource realDataSource = jdbc.getDataSource();
+        jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource));
+        jdbcPersistenceAdapter.setUseLock(false);
+        jdbcPersistenceAdapter.setCleanupPeriod(0);
+        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+
+
+        executeBatchErrorOps.add(2);
+        executeBatchErrorOps.add(3);
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.add(10);
+
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri);
+
+        for (int i = 0; i < 10; i++) {
+            XAConnection connection = factory.createXAConnection();
+            connection.start();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Destination destination = session.createQueue("TEST");
+            MessageProducer producer = session.createProducer(destination);
+            ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+
+            try {
+                producer.send(message);
+            } catch (Exception expected) {
+                expected.printStackTrace();
+
+                dumpMessages();
+
+                boolean done = false;
+                while (!done) {
+                    try {
+                        producer.send(message);
+                        done = true;
+                    } catch (Exception ok) {
+                        ok.printStackTrace();
+                    }
+                }
+            }
+        }
+
+        assertEquals("messages in db", 10, dumpMessages());
+
+
+        assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+           .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("qs", 10, proxy.getQueueSize());
+        assertEquals("enq", 10, proxy.getEnqueueCount());
+        assertEquals("curs", 10, proxy.cursorSize());
+    }
+
+    public void testNonTxEnqueueOverNetworkErrorsRestart() throws Exception {
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.clear();
+        executeUpdateErrorOpsCount.set(0);
+        executeUpdateErrorOps.clear();
+        executeBatchErrorOps.clear();
+        executeBatchErrorOpsCount.set(0);
+
+        broker.stop();
+
+        final AtomicBoolean done = new AtomicBoolean(false);
+        Thread thread = new Thread() {
+            @Override
+            public void run() {
+
+                while (!done.get()) {
+                    try {
+
+                        broker = new BrokerService();
+                        broker.setAdvisorySupport(false);
+                        PolicyMap policyMap = new PolicyMap();
+                        PolicyEntry policyEntry = new PolicyEntry();
+                        policyEntry.setUseCache(false);
+                        policyEntry.setExpireMessagesPeriod(0);
+                        policyEntry.setDeadLetterStrategy(new DiscardingDeadLetterStrategy());
+                        policyMap.setDefaultEntry(policyEntry);
+                        broker.setDestinationPolicy(policyMap);
+
+                        JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+                        DataSource realDataSource = jdbc.getDataSource();
+                        jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource));
+                        jdbcPersistenceAdapter.setUseLock(false);
+                        jdbcPersistenceAdapter.setCleanupPeriod(0);
+                        broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+                        TransportConnector transportConnector = broker.addConnector("tcp://localhost:61616");
+                        //transportConnector.setAuditNetworkProducers(true);
+                        connectionUri = transportConnector.getPublishableConnectString();
+                        DefaultIOExceptionHandler stopOnIOEx = new DefaultIOExceptionHandler();
+                        stopOnIOEx.setIgnoreSQLExceptions(false);
+                        stopOnIOEx.setStopStartConnectors(false);
+                        broker.setIoExceptionHandler(stopOnIOEx);
+                        broker.start();
+
+                        broker.waitUntilStopped();
+
+                    } catch (Exception oops) {
+                        oops.printStackTrace();
+                        done.set(true);
+                    }
+                }
+            }
+        };
+        thread.start();
+
+        //executeBatchErrorOps.add(5);
+        //executeBatchErrorOps.add(3);
+        getAutoCommitCount.set(0);
+        getAutoCommitErrors.add(39);
+
+
+        // network broker to push messages
+        final BrokerService other = new BrokerService();
+        other.setBrokerName("other");
+        other.setAdvisorySupport(false);
+        other.setUseJmx(false);
+        other.setPersistent(false);
+        NetworkConnector netwokConnector = other.addNetworkConnector("static://tcp://localhost:61616");
+        netwokConnector.setStaticBridge(true);
+        netwokConnector.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("TEST")}));
+        other.start();
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://other");
+        ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+        activeMQConnection.setWatchTopicAdvisories(false);
+        Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        activeMQConnection.start();
+        Destination destination = session.createQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(message);
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("MESSAGES DRAINED :" + ((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+                return 0 == ((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount();
+            }
+        });
+        activeMQConnection.close();
+
+
+        assertEquals("db", 10, dumpMessages());
+        assertEquals("messages count", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+           .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+        assertEquals("qs", 10, proxy.getQueueSize());
+        assertEquals("curs", 10, proxy.cursorSize());
+
+        done.set(true);
+        other.stop();
+    }
+
+
+    private class TestDataSource implements javax.sql.DataSource {
+
+        private final javax.sql.DataSource realDataSource;
+
+        public TestDataSource(javax.sql.DataSource dataSource) {
+            realDataSource = dataSource;
+        }
+
+        @Override
+        public Connection getConnection() throws SQLException {
+            Connection autoCommitCheckConnection = new AutoCommitCheckConnection(realDataSource.getConnection());
+            return autoCommitCheckConnection;
+        }
+
+        @Override
+        public Connection getConnection(String username, String password) throws SQLException {
+            Connection autoCommitCheckConnection = new AutoCommitCheckConnection(realDataSource.getConnection(username, password));
+
+            return autoCommitCheckConnection;
+        }
+
+        @Override
+        public PrintWriter getLogWriter() throws SQLException {
+            return realDataSource.getLogWriter();
+        }
+
+        @Override
+        public void setLogWriter(PrintWriter out) throws SQLException {
+            realDataSource.setLogWriter(out);
+        }
+
+        @Override
+        public void setLoginTimeout(int seconds) throws SQLException {
+            realDataSource.setLoginTimeout(seconds);
+        }
+
+        @Override
+        public int getLoginTimeout() throws SQLException {
+            return realDataSource.getLoginTimeout();
+        }
+
+        @Override
+        public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+            return realDataSource.getParentLogger();
+        }
+
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return realDataSource.unwrap(iface);
+        }
+
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return realDataSource.isWrapperFor(iface);
+        }
+    }
+
+    private class AutoCommitCheckConnection implements Connection {
+
+        private final Connection realConnection;
+
+        public AutoCommitCheckConnection(Connection connection) {
+            this.realConnection = connection;
+        }
+
+        @Override
+        public void commit() throws SQLException {
+            realConnection.commit();
+        }
+
+        // Just plumbing for wrapper. Might have been better to do a Dynamic Proxy here.
+
+        @Override
+        public Statement createStatement() throws SQLException {
+            return realConnection.createStatement();
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql) throws SQLException {
+            //final AtomicInteger executeCount = new AtomicInteger();
+
+            final PreparedStatement delegate = realConnection.prepareStatement(sql);
+            return new PreparedStatement() {
+                public ResultSet executeQuery() throws SQLException {
+                    return delegate.executeQuery();
+                }
+
+                final
+                public int executeUpdate() throws SQLException {
+                    int ret = delegate.executeUpdate();
+                    if (executeUpdateErrorOps.contains(executeUpdateErrorOpsCount.incrementAndGet())) {
+                        throw new SQLRecoverableException("SOME executeUpdate ERROR[" + executeUpdateErrorOpsCount.get() +"]");
+                    }
+                    return ret;
+                }
+
+                public void setNull(int parameterIndex, int sqlType) throws SQLException {
+                    delegate.setNull(parameterIndex, sqlType);
+                }
+
+                public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+                    delegate.setBoolean(parameterIndex, x);
+                }
+
+                public void setByte(int parameterIndex, byte x) throws SQLException {
+                    delegate.setByte(parameterIndex, x);
+                }
+
+                public void setShort(int parameterIndex, short x) throws SQLException {
+                    delegate.setShort(parameterIndex, x);
+                }
+
+                public void setInt(int parameterIndex, int x) throws SQLException {
+                    delegate.setInt(parameterIndex, x);
+                }
+
+                public void setLong(int parameterIndex, long x) throws SQLException {
+                    delegate.setLong(parameterIndex, x);
+                }
+
+                public void setFloat(int parameterIndex, float x) throws SQLException {
+                    delegate.setFloat(parameterIndex, x);
+                }
+
+                public void setDouble(int parameterIndex, double x) throws SQLException {
+                    delegate.setDouble(parameterIndex, x);
+                }
+
+                public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+                    delegate.setBigDecimal(parameterIndex, x);
+                }
+
+                public void setString(int parameterIndex, String x) throws SQLException {
+                    delegate.setString(parameterIndex, x);
+                }
+
+                public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+                    delegate.setBytes(parameterIndex, x);
+                }
+
+                public void setDate(int parameterIndex, Date x) throws SQLException {
+                    delegate.setDate(parameterIndex, x);
+                }
+
+                public void setTime(int parameterIndex, Time x) throws SQLException {
+                    delegate.setTime(parameterIndex, x);
+                }
+
+                public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+                    delegate.setTimestamp(parameterIndex, x);
+                }
+
+                public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+                    delegate.setAsciiStream(parameterIndex, x, length);
+                }
+
+                @Deprecated
+                public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+                    delegate.setUnicodeStream(parameterIndex, x, length);
+                }
+
+                public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+                    delegate.setBinaryStream(parameterIndex, x, length);
+                }
+
+                public void clearParameters() throws SQLException {
+                    delegate.clearParameters();
+                }
+
+                public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType);
+                }
+
+                public void setObject(int parameterIndex, Object x) throws SQLException {
+                    delegate.setObject(parameterIndex, x);
+                }
+
+                public boolean execute() throws SQLException {
+                    return delegate.execute();
+                }
+
+                public void addBatch() throws SQLException {
+                    delegate.addBatch();
+                }
+
+                public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+                    delegate.setCharacterStream(parameterIndex, reader, length);
+                }
+
+                public void setRef(int parameterIndex, Ref x) throws SQLException {
+                    delegate.setRef(parameterIndex, x);
+                }
+
+                public void setBlob(int parameterIndex, Blob x) throws SQLException {
+                    delegate.setBlob(parameterIndex, x);
+                }
+
+                public void setClob(int parameterIndex, Clob x) throws SQLException {
+                    delegate.setClob(parameterIndex, x);
+                }
+
+                public void setArray(int parameterIndex, Array x) throws SQLException {
+                    delegate.setArray(parameterIndex, x);
+                }
+
+                public ResultSetMetaData getMetaData() throws SQLException {
+                    return delegate.getMetaData();
+                }
+
+                public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+                    delegate.setDate(parameterIndex, x, cal);
+                }
+
+                public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+                    delegate.setTime(parameterIndex, x, cal);
+                }
+
+                public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+                    delegate.setTimestamp(parameterIndex, x, cal);
+                }
+
+                public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+                    delegate.setNull(parameterIndex, sqlType, typeName);
+                }
+
+                public void setURL(int parameterIndex, URL x) throws SQLException {
+                    delegate.setURL(parameterIndex, x);
+                }
+
+                public ParameterMetaData getParameterMetaData() throws SQLException {
+                    return delegate.getParameterMetaData();
+                }
+
+                public void setRowId(int parameterIndex, RowId x) throws SQLException {
+                    delegate.setRowId(parameterIndex, x);
+                }
+
+                public void setNString(int parameterIndex, String value) throws SQLException {
+                    delegate.setNString(parameterIndex, value);
+                }
+
+                public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+                    delegate.setNCharacterStream(parameterIndex, value, length);
+                }
+
+                public void setNClob(int parameterIndex, NClob value) throws SQLException {
+                    delegate.setNClob(parameterIndex, value);
+                }
+
+                public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+                    delegate.setClob(parameterIndex, reader, length);
+                }
+
+                public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+                    delegate.setBlob(parameterIndex, inputStream, length);
+                }
+
+                public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+                    delegate.setNClob(parameterIndex, reader, length);
+                }
+
+                public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+                    delegate.setSQLXML(parameterIndex, xmlObject);
+                }
+
+                public void setObject(int parameterIndex,
+                                      Object x,
+                                      int targetSqlType,
+                                      int scaleOrLength) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+                }
+
+                public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+                    delegate.setAsciiStream(parameterIndex, x, length);
+                }
+
+                public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+                    delegate.setBinaryStream(parameterIndex, x, length);
+                }
+
+                public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+                    delegate.setCharacterStream(parameterIndex, reader, length);
+                }
+
+                public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+                    delegate.setAsciiStream(parameterIndex, x);
+                }
+
+                public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+                    delegate.setBinaryStream(parameterIndex, x);
+                }
+
+                public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+                    delegate.setCharacterStream(parameterIndex, reader);
+                }
+
+                public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+                    delegate.setNCharacterStream(parameterIndex, value);
+                }
+
+                public void setClob(int parameterIndex, Reader reader) throws SQLException {
+                    delegate.setClob(parameterIndex, reader);
+                }
+
+                public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+                    delegate.setBlob(parameterIndex, inputStream);
+                }
+
+                public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+                    delegate.setNClob(parameterIndex, reader);
+                }
+/*
+                public void setObject(int parameterIndex,
+                                      Object x,
+                                      SQLType targetSqlType,
+                                      int scaleOrLength) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+                }
+
+                public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException {
+                    delegate.setObject(parameterIndex, x, targetSqlType);
+                }
+
+                public long executeLargeUpdate() throws SQLException {
+                    return delegate.executeLargeUpdate();
+                }
+*/
+                public ResultSet executeQuery(String sql) throws SQLException {
+                    return delegate.executeQuery(sql);
+                }
+
+                public int executeUpdate(String sql) throws SQLException {
+                    return delegate.executeUpdate(sql);
+                }
+
+                public void close() throws SQLException {
+                    delegate.close();
+                }
+
+                public int getMaxFieldSize() throws SQLException {
+                    return delegate.getMaxFieldSize();
+                }
+
+                public void setMaxFieldSize(int max) throws SQLException {
+                    delegate.setMaxFieldSize(max);
+                }
+
+                public int getMaxRows() throws SQLException {
+                    return delegate.getMaxRows();
+                }
+
+                public void setMaxRows(int max) throws SQLException {
+                    delegate.setMaxRows(max);
+                }
+
+                public void setEscapeProcessing(boolean enable) throws SQLException {
+                    delegate.setEscapeProcessing(enable);
+                }
+
+                public int getQueryTimeout() throws SQLException {
+                    return delegate.getQueryTimeout();
+                }
+
+                public void setQueryTimeout(int seconds) throws SQLException {
+                    delegate.setQueryTimeout(seconds);
+                }
+
+                public void cancel() throws SQLException {
+                    delegate.cancel();
+                }
+
+                public SQLWarning getWarnings() throws SQLException {
+                    return delegate.getWarnings();
+                }
+
+                public void clearWarnings() throws SQLException {
+                    delegate.clearWarnings();
+                }
+
+                public void setCursorName(String name) throws SQLException {
+                    delegate.setCursorName(name);
+                }
+
+                public boolean execute(String sql) throws SQLException {
+                    return delegate.execute(sql);
+                }
+
+                public ResultSet getResultSet() throws SQLException {
+                    return delegate.getResultSet();
+                }
+
+                public int getUpdateCount() throws SQLException {
+                    return delegate.getUpdateCount();
+                }
+
+                public boolean getMoreResults() throws SQLException {
+                    return delegate.getMoreResults();
+                }
+
+                public void setFetchDirection(int direction) throws SQLException {
+                    delegate.setFetchDirection(direction);
+                }
+
+                public int getFetchDirection() throws SQLException {
+                    return delegate.getFetchDirection();
+                }
+
+                public void setFetchSize(int rows) throws SQLException {
+                    delegate.setFetchSize(rows);
+                }
+
+                public int getFetchSize() throws SQLException {
+                    return delegate.getFetchSize();
+                }
+
+                public int getResultSetConcurrency() throws SQLException {
+                    return delegate.getResultSetConcurrency();
+                }
+
+                public int getResultSetType() throws SQLException {
+                    return delegate.getResultSetType();
+                }
+
+                public void addBatch(String sql) throws SQLException {
+                    delegate.addBatch(sql);
+                }
+
+                public void clearBatch() throws SQLException {
+                    delegate.clearBatch();
+                }
+
+                public int[] executeBatch() throws SQLException {
+                    if (executeBatchErrorOps.contains(executeBatchErrorOpsCount.incrementAndGet())) {
+                        throw new SQLRecoverableException("SOME executeBatch ERROR[" + executeBatchErrorOpsCount.get() +"]");
+                    }
+                    return delegate.executeBatch();
+                }
+
+                public Connection getConnection() throws SQLException {
+                    return delegate.getConnection();
+                }
+
+                public boolean getMoreResults(int current) throws SQLException {
+                    return delegate.getMoreResults(current);
+                }
+
+                public ResultSet getGeneratedKeys() throws SQLException {
+                    return delegate.getGeneratedKeys();
+                }
+
+                public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+                    return delegate.executeUpdate(sql, autoGeneratedKeys);
+                }
+
+                public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+                    return delegate.executeUpdate(sql, columnIndexes);
+                }
+
+                public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+                    return delegate.executeUpdate(sql, columnNames);
+                }
+
+                public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+                    return delegate.execute(sql, autoGeneratedKeys);
+                }
+
+                public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+                    return delegate.execute(sql, columnIndexes);
+                }
+
+                public boolean execute(String sql, String[] columnNames) throws SQLException {
+                    return delegate.execute(sql, columnNames);
+                }
+
+                public int getResultSetHoldability() throws SQLException {
+                    return delegate.getResultSetHoldability();
+                }
+
+                public boolean isClosed() throws SQLException {
+                    return delegate.isClosed();
+                }
+
+                public void setPoolable(boolean poolable) throws SQLException {
+                    delegate.setPoolable(poolable);
+                }
+
+                public boolean isPoolable() throws SQLException {
+                    return delegate.isPoolable();
+                }
+
+                public void closeOnCompletion() throws SQLException {
+                    delegate.closeOnCompletion();
+                }
+
+                public boolean isCloseOnCompletion() throws SQLException {
+                    return delegate.isCloseOnCompletion();
+                }
+/*
+                public long getLargeUpdateCount() throws SQLException {
+                    return delegate.getLargeUpdateCount();
+                }
+
+                public void setLargeMaxRows(long max) throws SQLException {
+                    delegate.setLargeMaxRows(max);
+                }
+
+                public long getLargeMaxRows() throws SQLException {
+                    return delegate.getLargeMaxRows();
+                }
+
+                public long[] executeLargeBatch() throws SQLException {
+                    return delegate.executeLargeBatch();
+                }
+
+                public long executeLargeUpdate(String sql) throws SQLException {
+                    return delegate.executeLargeUpdate(sql);
+                }
+
+                public long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+                    return delegate.executeLargeUpdate(sql, autoGeneratedKeys);
+                }
+
+                public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException {
+                    return delegate.executeLargeUpdate(sql, columnIndexes);
+                }
+
+                public long executeLargeUpdate(String sql, String[] columnNames) throws SQLException {
+                    return delegate.executeLargeUpdate(sql, columnNames);
+                }
+*/
+                public <T> T unwrap(Class<T> iface) throws SQLException {
+                    return delegate.unwrap(iface);
+                }
+
+                public boolean isWrapperFor(Class<?> iface) throws SQLException {
+                    return delegate.isWrapperFor(iface);
+                }
+            };
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql) throws SQLException {
+            return realConnection.prepareCall(sql);
+        }
+
+        @Override
+        public String nativeSQL(String sql) throws SQLException {
+            return realConnection.nativeSQL(sql);
+        }
+
+        @Override
+        public void setAutoCommit(boolean autoCommit) throws SQLException {
+            realConnection.setAutoCommit(autoCommit);
+        }
+
+        @Override
+        public boolean getAutoCommit() throws SQLException {
+            if (getAutoCommitErrors.contains(getAutoCommitCount.incrementAndGet())) {
+                throw new SQLRecoverableException("AutoCommit[" + getAutoCommitCount.get() +"]");
+            }
+            return realConnection.getAutoCommit();
+        }
+
+        @Override
+        public void rollback() throws SQLException {
+            realConnection.rollback();
+        }
+
+        @Override
+        public void close() throws SQLException {
+            realConnection.close();
+        }
+
+        @Override
+        public boolean isClosed() throws SQLException {
+            return realConnection.isClosed();
+        }
+
+        @Override
+        public DatabaseMetaData getMetaData() throws SQLException {
+            return realConnection.getMetaData();
+        }
+
+        @Override
+        public void setReadOnly(boolean readOnly) throws SQLException {
+            realConnection.setReadOnly(readOnly);
+        }
+
+        @Override
+        public boolean isReadOnly() throws SQLException {
+            return realConnection.isReadOnly();
+        }
+
+        @Override
+        public void setCatalog(String catalog) throws SQLException {
+            realConnection.setCatalog(catalog);
+        }
+
+        @Override
+        public String getCatalog() throws SQLException {
+            return realConnection.getCatalog();
+        }
+
+        @Override
+        public void setTransactionIsolation(int level) throws SQLException {
+            realConnection.setTransactionIsolation(level);
+        }
+
+        @Override
+        public int getTransactionIsolation() throws SQLException {
+            return realConnection.getTransactionIsolation();
+        }
+
+        @Override
+        public SQLWarning getWarnings() throws SQLException {
+            return realConnection.getWarnings();
+        }
+
+        @Override
+        public void clearWarnings() throws SQLException {
+            realConnection.clearWarnings();
+        }
+
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+            return realConnection.createStatement(resultSetType, resultSetConcurrency);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+            return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency);
+        }
+
+        @Override
+        public Map<String, Class<?>> getTypeMap() throws SQLException {
+            return realConnection.getTypeMap();
+        }
+
+        @Override
+        public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+            realConnection.setTypeMap(map);
+        }
+
+        @Override
+        public void setHoldability(int holdability) throws SQLException {
+            realConnection.setHoldability(holdability);
+        }
+
+        @Override
+        public int getHoldability() throws SQLException {
+            return realConnection.getHoldability();
+        }
+
+        @Override
+        public Savepoint setSavepoint() throws SQLException {
+            return realConnection.setSavepoint();
+        }
+
+        @Override
+        public Savepoint setSavepoint(String name) throws SQLException {
+            return realConnection.setSavepoint(name);
+        }
+
+        @Override
+        public void rollback(Savepoint savepoint) throws SQLException {
+            realConnection.rollback();
+        }
+
+        @Override
+        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+            realConnection.releaseSavepoint(savepoint);
+        }
+
+        @Override
+        public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return realConnection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+            return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+            return realConnection.prepareStatement(sql, autoGeneratedKeys);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+            return realConnection.prepareStatement(sql, columnIndexes);
+        }
+
+        @Override
+        public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+            return realConnection.prepareStatement(sql, columnNames);
+        }
+
+        @Override
+        public Clob createClob() throws SQLException {
+            return realConnection.createClob();
+        }
+
+        @Override
+        public Blob createBlob() throws SQLException {
+            return realConnection.createBlob();
+        }
+
+        @Override
+        public NClob createNClob() throws SQLException {
+            return realConnection.createNClob();
+        }
+
+        @Override
+        public SQLXML createSQLXML() throws SQLException {
+            return realConnection.createSQLXML();
+        }
+
+        @Override
+        public boolean isValid(int timeout) throws SQLException {
+            return realConnection.isValid(timeout);
+        }
+
+        @Override
+        public void setClientInfo(String name, String value) throws SQLClientInfoException {
+            realConnection.setClientInfo(name, value);
+        }
+
+        @Override
+        public void setClientInfo(Properties properties) throws SQLClientInfoException {
+            realConnection.setClientInfo(properties);
+        }
+
+        @Override
+        public String getClientInfo(String name) throws SQLException {
+            return realConnection.getClientInfo(name);
+        }
+
+        @Override
+        public Properties getClientInfo() throws SQLException {
+            return realConnection.getClientInfo();
+        }
+
+        @Override
+        public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+            return realConnection.createArrayOf(typeName, elements);
+        }
+
+        @Override
+        public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+            return realConnection.createStruct(typeName, attributes);
+        }
+
+        @Override
+        public void setSchema(String schema) throws SQLException {
+            realConnection.setSchema(schema);
+        }
+
+        @Override
+        public String getSchema() throws SQLException {
+            return realConnection.getSchema();
+        }
+
+        @Override
+        public void abort(Executor executor) throws SQLException {
+            realConnection.abort(executor);
+        }
+
+        @Override
+        public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+            realConnection.setNetworkTimeout(executor, milliseconds);
+        }
+
+        @Override
+        public int getNetworkTimeout() throws SQLException {
+            return realConnection.getNetworkTimeout();
+        }
+
+        @Override
+        public <T> T unwrap(Class<T> iface) throws SQLException {
+            return realConnection.unwrap(iface);
+        }
+
+        @Override
+        public boolean isWrapperFor(Class<?> iface) throws SQLException {
+            return realConnection.isWrapperFor(iface);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 8da0ff6..ee3a88c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -318,6 +318,7 @@ public class XACompletionTest extends TestSupport {
         resource.recover(XAResource.TMSTARTRSCAN);
         resource.recover(XAResource.TMNOFLAGS);
 
+        dumpMessages();
         Xid tid = createXid();
 
         resource.start(tid, XAResource.TMNOFLAGS);
@@ -342,6 +343,9 @@ public class XACompletionTest extends TestSupport {
 
         consumer.close();
 
+        LOG.info("after close");
+        dumpMessages();
+
         assertEquals("drain", 5, drainUnack(5, "TEST"));
 
         dumpMessages();


Mime
View raw message