Repository: activemq
Updated Branches:
refs/heads/master 314d5a516 -> bd45d931b
[AMQ-6906] tidy up cleanup on jdbc error and combine updates in single completion to avoid prepared sequence update on non transacted add with error. More jdbc error related tests
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd45d931
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd45d931
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd45d931
Branch: refs/heads/master
Commit: bd45d931ba273be4d94bf213c6befd116f99dcc8
Parents: 314d5a5
Author: gtully <gary.tully@gmail.com>
Authored: Thu May 3 11:32:21 2018 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Thu May 3 11:32:21 2018 +0100
----------------------------------------------------------------------
.../store/jdbc/JDBCPersistenceAdapter.java | 6 +-
.../store/jdbc/JdbcMemoryTransactionStore.java | 16 +-
.../activemq/store/jdbc/TransactionContext.java | 73 +-
.../activemq/broker/XARecoveryBrokerTest.java | 4 +-
.../store/jdbc/JDBCCommitExceptionTest.java | 12 +-
.../store/jdbc/JDBCXACommitExceptionTest.java | 1151 ++++++++++++++++++
.../activemq/store/jdbc/XACompletionTest.java | 4 +
7 files changed, 1213 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 102dec5..95b1446 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -785,12 +785,10 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
}
- public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
+ public void commitAdd(ConnectionContext context, final MessageId messageId, final long preparedSequenceId, final long newSequence) throws IOException {
TransactionContext c = getTransactionContext(context);
try {
- long sequence = (Long)messageId.getEntryLocator();
- getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
- messageId.setEntryLocator(preparedSequenceId);
+ getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 6df0860..4bbe43d 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -78,11 +78,12 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
cmd.run(ctx);
}
+ persistenceAdapter.commitTransaction(ctx);
+
} catch ( IOException e ) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
- persistenceAdapter.commitTransaction(ctx);
ctx.setXid(null);
// setup for commit outcome
@@ -126,13 +127,15 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator();
TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
+ long newSequence;
synchronized (jdbcMessageStore.pendingAdditions) {
- message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
-
+ newSequence = jdbcPersistenceAdapter.getNextSequenceId();
+ final long sequenceToSet = newSequence;
c.onCompletion(new Runnable() {
@Override
public void run() {
- message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator());
+ message.getMessageId().setEntryLocator(sequenceToSet);
+ message.getMessageId().setFutureOrSequenceLong(sequenceToSet);
}
});
@@ -141,7 +144,7 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
}
}
- jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence);
+ jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence, newSequence);
jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority());
}
@@ -175,8 +178,9 @@ public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
((LastAckCommand)removeMessageCommand).rollback(ctx);
} else {
MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
+ long sequence = (Long)messageId.getEntryLocator();
// need to unset the txid flag on the existing row
- ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator());
+ ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence);
if (removeMessageCommand instanceof RecoveredRemoveMessageCommand) {
((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand) removeMessageCommand).getMessage());
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
index ab3bef8..db2aace 100644
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
@@ -84,7 +84,7 @@ public class TransactionContext {
} catch (IllegalMonitorStateException oops) {
LOG.error("Thread does not hold the context lock on close of:" + connection, oops);
}
- close();
+ silentClose();
IOException ioe = IOExceptionSupport.create(e);
if (persistenceAdapter.getBrokerService() != null) {
persistenceAdapter.getBrokerService().handleIOException(ioe);
@@ -137,45 +137,39 @@ public class TransactionContext {
} finally {
try {
p.close();
- } catch (Throwable e) {
- }
+ } catch (Throwable ignored) {}
}
}
+ private void silentClose() {
+ silentClosePreparedStatements();
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Throwable ignored) {}
+ connection = null;
+ }
+ }
+
+
public void close() throws IOException {
if (!inTx) {
try {
-
- /**
- * we are not in a transaction so should not be committing ??
- * This was previously commented out - but had adverse affects
- * on testing - so it's back!
- *
- */
- try {
+ // can be null for topic ops that bypass the store via existing cursor state
+ if (connection != null) {
+ final boolean needsCommit = !connection.getAutoCommit();
executeBatch();
- } finally {
- if (connection != null && !connection.getAutoCommit()) {
+ if (needsCommit) {
connection.commit();
}
}
-
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Error while closing connection: ", e);
IOException ioe = IOExceptionSupport.create(e);
persistenceAdapter.getBrokerService().handleIOException(ioe);
throw ioe;
} finally {
- try {
- if (connection != null) {
- connection.close();
- }
- } catch (Throwable e) {
- // ignore
- LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e);
- } finally {
- connection = null;
- }
+ silentClose();
for (Runnable completion: completions) {
completion.run();
}
@@ -197,8 +191,9 @@ public class TransactionContext {
throw new IOException("Not started.");
}
try {
+ final boolean needsCommit = !connection.getAutoCommit();
executeBatch();
- if (!connection.getAutoCommit()) {
+ if (needsCommit) {
connection.commit();
}
} catch (SQLException e) {
@@ -230,19 +225,23 @@ public class TransactionContext {
}
}
- private void doRollback() throws SQLException {
- if (addMessageStatement != null) {
- addMessageStatement.close();
- addMessageStatement = null;
- }
- if (removedMessageStatement != null) {
- removedMessageStatement.close();
- removedMessageStatement = null;
- }
- if (updateLastAckStatement != null) {
- updateLastAckStatement.close();
- updateLastAckStatement = null;
+ private PreparedStatement silentClosePreparedStatement(PreparedStatement preparedStatement) {
+ if (preparedStatement != null) {
+ try {
+ preparedStatement.close();
+ } catch (Throwable ignored) {}
}
+ return null;
+ }
+
+ private void silentClosePreparedStatements() {
+ addMessageStatement = silentClosePreparedStatement(addMessageStatement);
+ removedMessageStatement = silentClosePreparedStatement(removedMessageStatement);
+ updateLastAckStatement = silentClosePreparedStatement(updateLastAckStatement);
+ }
+
+ private void doRollback() throws SQLException {
+ silentClosePreparedStatements();
completions.clear();
connection.rollback();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 387e77f..6a8b3f4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
*/
public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
- public boolean prioritySupport = false;
+ public boolean prioritySupport = true;
public void testPreparedJmxView() throws Exception {
@@ -712,7 +712,7 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
}
- public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
+ public void x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
index 6972a14..2b498a3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCCommitExceptionTest.java
@@ -76,19 +76,23 @@ public class JDBCCommitExceptionTest extends TestCase {
broker.stop();
}
- protected void dumpMessages() throws Exception {
+ protected int dumpMessages() throws Exception {
+ int count = 0;
WireFormat wireFormat = new OpenWireFormat();
java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
- PreparedStatement statement = conn.prepareStatement("SELECT ID, MSG FROM ACTIVEMQ_MSGS");
+ PreparedStatement statement = conn.prepareStatement("SELECT ID, XID, MSG FROM ACTIVEMQ_MSGS");
ResultSet result = statement.executeQuery();
LOG.info("Messages left in broker after test");
while(result.next()) {
long id = result.getLong(1);
- org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(2)));
- LOG.info("id: " + id + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+ String xid = result.getString(2);
+ org.apache.activemq.command.Message message = (org.apache.activemq.command.Message)wireFormat.unmarshal(new ByteSequence(result.getBytes(3)));
+ LOG.info("id: " + id + ", xid: " + xid + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", MSG: " + message);
+ count++;
}
statement.close();
conn.close();
+ return count;
}
protected int receiveMessages(int messagesExpected) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
index 046ab81..1529515 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCXACommitExceptionTest.java
@@ -21,16 +21,72 @@ import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
+import javax.management.ObjectName;
+import javax.sql.DataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLRecoverableException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.DiscardingDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -166,4 +222,1099 @@ public class JDBCXACommitExceptionTest extends JDBCCommitExceptionTest {
assertEquals("one enque", 1, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
}
+
+ final AtomicInteger getAutoCommitCount = new AtomicInteger();
+ private ArrayList<Integer> getAutoCommitErrors = new ArrayList<Integer>();
+ private ArrayList<Integer> executeUpdateErrorOps = new ArrayList<Integer>();
+ final AtomicInteger executeUpdateErrorOpsCount = new AtomicInteger();
+ private ArrayList<Integer> executeBatchErrorOps = new ArrayList<Integer>();
+ final AtomicInteger executeBatchErrorOpsCount = new AtomicInteger();
+
+ public void testXAEnqueueErrors() throws Exception {
+ getAutoCommitCount.set(0);
+ getAutoCommitErrors.clear();
+ executeUpdateErrorOpsCount.set(0);
+ executeUpdateErrorOps.clear();
+
+ broker.stop();
+ broker = new BrokerService();
+ broker.setAdvisorySupport(false);
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setExpireMessagesPeriod(0);
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+
+ //broker.setDeleteAllMessagesOnStartup(true);
+
+ JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+ DataSource realDataSource = jdbc.getDataSource();
+ jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource));
+ jdbcPersistenceAdapter.setUseLock(false);
+ broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+ connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+ broker.start();
+
+
+ // inject error
+ executeUpdateErrorOps.add(5);
+ executeUpdateErrorOps.add(9);
+ executeUpdateErrorOps.add(12);
+
+ getAutoCommitErrors.add(59);
+ getAutoCommitErrors.add(60);
+
+
+ factory = new ActiveMQXAConnectionFactory(connectionUri);
+
+ XAConnection c = factory.createXAConnection();
+ c.start();
+ XASession s = c.createXASession();
+ final XAResource recoveryResource = s.getXAResource();
+
+ for (int i = 0; i < 10; i++) {
+ XAConnection connection = factory.createXAConnection();
+ connection.start();
+ XASession session = connection.createXASession();
+
+ Destination destination = session.createQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+
+ XAResource resource = session.getXAResource();
+
+ Xid tid = createXid();
+ resource.start(tid, XAResource.TMNOFLAGS);
+ ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+ message.setTransactionId(new XATransactionId(tid));
+ producer.send(message);
+
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.prepare(tid);
+
+ try {
+ resource.commit(tid, false);
+ } catch (Exception expected) {
+ expected.printStackTrace();
+
+ dumpMessages();
+
+ boolean done = false;
+ while (!done) {
+ // recover
+ Xid[] recovered = recoveryResource.recover(XAResource.TMSTARTRSCAN);
+ recoveryResource.recover(XAResource.TMNOFLAGS);
+
+ try {
+ recoveryResource.commit(recovered[0], false);
+ done = true;
+ } catch (XAException ok) {
+ ok.printStackTrace();
+ }
+ }
+ }
+ }
+
+ dumpMessages();
+
+ assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+ assertEquals("qs", 10, proxy.getQueueSize());
+ assertEquals("enq", 10, proxy.getEnqueueCount());
+ assertEquals("curs", 10, proxy.cursorSize());
+ }
+
+ public void testNonTxEnqueueErrors() throws Exception {
+ getAutoCommitCount.set(0);
+ getAutoCommitErrors.clear();
+ executeUpdateErrorOpsCount.set(0);
+ executeUpdateErrorOps.clear();
+ executeBatchErrorOps.clear();
+ executeBatchErrorOpsCount.set(0);
+
+ broker.stop();
+ broker = new BrokerService();
+ broker.setAdvisorySupport(false);
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setExpireMessagesPeriod(0);
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+
+
+ JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+ DataSource realDataSource = jdbc.getDataSource();
+ jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource));
+ jdbcPersistenceAdapter.setUseLock(false);
+ jdbcPersistenceAdapter.setCleanupPeriod(0);
+ broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+ connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+ broker.start();
+
+
+ executeBatchErrorOps.add(2);
+ executeBatchErrorOps.add(3);
+ getAutoCommitCount.set(0);
+ getAutoCommitErrors.add(10);
+
+
+ factory = new ActiveMQXAConnectionFactory(connectionUri);
+
+ for (int i = 0; i < 10; i++) {
+ XAConnection connection = factory.createXAConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination destination = session.createQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+ ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+
+ try {
+ producer.send(message);
+ } catch (Exception expected) {
+ expected.printStackTrace();
+
+ dumpMessages();
+
+ boolean done = false;
+ while (!done) {
+ try {
+ producer.send(message);
+ done = true;
+ } catch (Exception ok) {
+ ok.printStackTrace();
+ }
+ }
+ }
+ }
+
+ assertEquals("messages in db", 10, dumpMessages());
+
+
+ assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals("en-queue", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+ assertEquals("qs", 10, proxy.getQueueSize());
+ assertEquals("enq", 10, proxy.getEnqueueCount());
+ assertEquals("curs", 10, proxy.cursorSize());
+ }
+
+ public void testNonTxEnqueueOverNetworkErrorsRestart() throws Exception {
+ getAutoCommitCount.set(0);
+ getAutoCommitErrors.clear();
+ executeUpdateErrorOpsCount.set(0);
+ executeUpdateErrorOps.clear();
+ executeBatchErrorOps.clear();
+ executeBatchErrorOpsCount.set(0);
+
+ broker.stop();
+
+ final AtomicBoolean done = new AtomicBoolean(false);
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+
+ while (!done.get()) {
+ try {
+
+ broker = new BrokerService();
+ broker.setAdvisorySupport(false);
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setUseCache(false);
+ policyEntry.setExpireMessagesPeriod(0);
+ policyEntry.setDeadLetterStrategy(new DiscardingDeadLetterStrategy());
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+
+ JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+ DataSource realDataSource = jdbc.getDataSource();
+ jdbcPersistenceAdapter.setDataSource(new TestDataSource(realDataSource));
+ jdbcPersistenceAdapter.setUseLock(false);
+ jdbcPersistenceAdapter.setCleanupPeriod(0);
+ broker.setPersistenceAdapter(jdbcPersistenceAdapter);
+ TransportConnector transportConnector = broker.addConnector("tcp://localhost:61616");
+ //transportConnector.setAuditNetworkProducers(true);
+ connectionUri = transportConnector.getPublishableConnectString();
+ DefaultIOExceptionHandler stopOnIOEx = new DefaultIOExceptionHandler();
+ stopOnIOEx.setIgnoreSQLExceptions(false);
+ stopOnIOEx.setStopStartConnectors(false);
+ broker.setIoExceptionHandler(stopOnIOEx);
+ broker.start();
+
+ broker.waitUntilStopped();
+
+ } catch (Exception oops) {
+ oops.printStackTrace();
+ done.set(true);
+ }
+ }
+ }
+ };
+ thread.start();
+
+ //executeBatchErrorOps.add(5);
+ //executeBatchErrorOps.add(3);
+ getAutoCommitCount.set(0);
+ getAutoCommitErrors.add(39);
+
+
+ // network broker to push messages
+ final BrokerService other = new BrokerService();
+ other.setBrokerName("other");
+ other.setAdvisorySupport(false);
+ other.setUseJmx(false);
+ other.setPersistent(false);
+ NetworkConnector netwokConnector = other.addNetworkConnector("static://tcp://localhost:61616");
+ netwokConnector.setStaticBridge(true);
+ netwokConnector.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("TEST")}));
+ other.start();
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://other");
+ ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
+ activeMQConnection.setWatchTopicAdvisories(false);
+ Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ activeMQConnection.start();
+ Destination destination = session.createQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+ ActiveMQMessage message = (ActiveMQMessage) session.createMessage();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(message);
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("MESSAGES DRAINED :" + ((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+ return 0 == ((RegionBroker)other.getRegionBroker()).getDestinationStatistics().getMessages().getCount();
+ }
+ });
+ activeMQConnection.close();
+
+
+ assertEquals("db", 10, dumpMessages());
+ assertEquals("messages count", 10, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST");
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+
+ assertEquals("qs", 10, proxy.getQueueSize());
+ assertEquals("curs", 10, proxy.cursorSize());
+
+ done.set(true);
+ other.stop();
+ }
+
+
+ private class TestDataSource implements javax.sql.DataSource {
+
+ private final javax.sql.DataSource realDataSource;
+
+ public TestDataSource(javax.sql.DataSource dataSource) {
+ realDataSource = dataSource;
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ Connection autoCommitCheckConnection = new AutoCommitCheckConnection(realDataSource.getConnection());
+ return autoCommitCheckConnection;
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ Connection autoCommitCheckConnection = new AutoCommitCheckConnection(realDataSource.getConnection(username, password));
+
+ return autoCommitCheckConnection;
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return realDataSource.getLogWriter();
+ }
+
+ @Override
+ public void setLogWriter(PrintWriter out) throws SQLException {
+ realDataSource.setLogWriter(out);
+ }
+
+ @Override
+ public void setLoginTimeout(int seconds) throws SQLException {
+ realDataSource.setLoginTimeout(seconds);
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return realDataSource.getLoginTimeout();
+ }
+
+ @Override
+ public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
+ return realDataSource.getParentLogger();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return realDataSource.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return realDataSource.isWrapperFor(iface);
+ }
+ }
+
+ private class AutoCommitCheckConnection implements Connection {
+
+ private final Connection realConnection;
+
+ public AutoCommitCheckConnection(Connection connection) {
+ this.realConnection = connection;
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ realConnection.commit();
+ }
+
+ // Just plumbing for wrapper. Might have been better to do a Dynamic Proxy here.
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return realConnection.createStatement();
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ //final AtomicInteger executeCount = new AtomicInteger();
+
+ final PreparedStatement delegate = realConnection.prepareStatement(sql);
+ return new PreparedStatement() {
+ public ResultSet executeQuery() throws SQLException {
+ return delegate.executeQuery();
+ }
+
+ final
+ public int executeUpdate() throws SQLException {
+ int ret = delegate.executeUpdate();
+ if (executeUpdateErrorOps.contains(executeUpdateErrorOpsCount.incrementAndGet())) {
+ throw new SQLRecoverableException("SOME executeUpdate ERROR[" + executeUpdateErrorOpsCount.get() +"]");
+ }
+ return ret;
+ }
+
+ public void setNull(int parameterIndex, int sqlType) throws SQLException {
+ delegate.setNull(parameterIndex, sqlType);
+ }
+
+ public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+ delegate.setBoolean(parameterIndex, x);
+ }
+
+ public void setByte(int parameterIndex, byte x) throws SQLException {
+ delegate.setByte(parameterIndex, x);
+ }
+
+ public void setShort(int parameterIndex, short x) throws SQLException {
+ delegate.setShort(parameterIndex, x);
+ }
+
+ public void setInt(int parameterIndex, int x) throws SQLException {
+ delegate.setInt(parameterIndex, x);
+ }
+
+ public void setLong(int parameterIndex, long x) throws SQLException {
+ delegate.setLong(parameterIndex, x);
+ }
+
+ public void setFloat(int parameterIndex, float x) throws SQLException {
+ delegate.setFloat(parameterIndex, x);
+ }
+
+ public void setDouble(int parameterIndex, double x) throws SQLException {
+ delegate.setDouble(parameterIndex, x);
+ }
+
+ public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+ delegate.setBigDecimal(parameterIndex, x);
+ }
+
+ public void setString(int parameterIndex, String x) throws SQLException {
+ delegate.setString(parameterIndex, x);
+ }
+
+ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+ delegate.setBytes(parameterIndex, x);
+ }
+
+ public void setDate(int parameterIndex, Date x) throws SQLException {
+ delegate.setDate(parameterIndex, x);
+ }
+
+ public void setTime(int parameterIndex, Time x) throws SQLException {
+ delegate.setTime(parameterIndex, x);
+ }
+
+ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+ delegate.setTimestamp(parameterIndex, x);
+ }
+
+ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ delegate.setAsciiStream(parameterIndex, x, length);
+ }
+
+ @Deprecated
+ public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ delegate.setUnicodeStream(parameterIndex, x, length);
+ }
+
+ public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ delegate.setBinaryStream(parameterIndex, x, length);
+ }
+
+ public void clearParameters() throws SQLException {
+ delegate.clearParameters();
+ }
+
+ public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+ delegate.setObject(parameterIndex, x, targetSqlType);
+ }
+
+ public void setObject(int parameterIndex, Object x) throws SQLException {
+ delegate.setObject(parameterIndex, x);
+ }
+
+ public boolean execute() throws SQLException {
+ return delegate.execute();
+ }
+
+ public void addBatch() throws SQLException {
+ delegate.addBatch();
+ }
+
+ public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+ delegate.setCharacterStream(parameterIndex, reader, length);
+ }
+
+ public void setRef(int parameterIndex, Ref x) throws SQLException {
+ delegate.setRef(parameterIndex, x);
+ }
+
+ public void setBlob(int parameterIndex, Blob x) throws SQLException {
+ delegate.setBlob(parameterIndex, x);
+ }
+
+ public void setClob(int parameterIndex, Clob x) throws SQLException {
+ delegate.setClob(parameterIndex, x);
+ }
+
+ public void setArray(int parameterIndex, Array x) throws SQLException {
+ delegate.setArray(parameterIndex, x);
+ }
+
+ public ResultSetMetaData getMetaData() throws SQLException {
+ return delegate.getMetaData();
+ }
+
+ public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+ delegate.setDate(parameterIndex, x, cal);
+ }
+
+ public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+ delegate.setTime(parameterIndex, x, cal);
+ }
+
+ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+ delegate.setTimestamp(parameterIndex, x, cal);
+ }
+
+ public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+ delegate.setNull(parameterIndex, sqlType, typeName);
+ }
+
+ public void setURL(int parameterIndex, URL x) throws SQLException {
+ delegate.setURL(parameterIndex, x);
+ }
+
+ public ParameterMetaData getParameterMetaData() throws SQLException {
+ return delegate.getParameterMetaData();
+ }
+
+ public void setRowId(int parameterIndex, RowId x) throws SQLException {
+ delegate.setRowId(parameterIndex, x);
+ }
+
+ public void setNString(int parameterIndex, String value) throws SQLException {
+ delegate.setNString(parameterIndex, value);
+ }
+
+ public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+ delegate.setNCharacterStream(parameterIndex, value, length);
+ }
+
+ public void setNClob(int parameterIndex, NClob value) throws SQLException {
+ delegate.setNClob(parameterIndex, value);
+ }
+
+ public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ delegate.setClob(parameterIndex, reader, length);
+ }
+
+ public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+ delegate.setBlob(parameterIndex, inputStream, length);
+ }
+
+ public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ delegate.setNClob(parameterIndex, reader, length);
+ }
+
+ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+ delegate.setSQLXML(parameterIndex, xmlObject);
+ }
+
+ public void setObject(int parameterIndex,
+ Object x,
+ int targetSqlType,
+ int scaleOrLength) throws SQLException {
+ delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+ }
+
+ public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ delegate.setAsciiStream(parameterIndex, x, length);
+ }
+
+ public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ delegate.setBinaryStream(parameterIndex, x, length);
+ }
+
+ public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+ delegate.setCharacterStream(parameterIndex, reader, length);
+ }
+
+ public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+ delegate.setAsciiStream(parameterIndex, x);
+ }
+
+ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+ delegate.setBinaryStream(parameterIndex, x);
+ }
+
+ public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+ delegate.setCharacterStream(parameterIndex, reader);
+ }
+
+ public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+ delegate.setNCharacterStream(parameterIndex, value);
+ }
+
+ public void setClob(int parameterIndex, Reader reader) throws SQLException {
+ delegate.setClob(parameterIndex, reader);
+ }
+
+ public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+ delegate.setBlob(parameterIndex, inputStream);
+ }
+
+ public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+ delegate.setNClob(parameterIndex, reader);
+ }
+/*
+ public void setObject(int parameterIndex,
+ Object x,
+ SQLType targetSqlType,
+ int scaleOrLength) throws SQLException {
+ delegate.setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+ }
+
+ public void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException {
+ delegate.setObject(parameterIndex, x, targetSqlType);
+ }
+
+ public long executeLargeUpdate() throws SQLException {
+ return delegate.executeLargeUpdate();
+ }
+*/
+ public ResultSet executeQuery(String sql) throws SQLException {
+ return delegate.executeQuery(sql);
+ }
+
+ public int executeUpdate(String sql) throws SQLException {
+ return delegate.executeUpdate(sql);
+ }
+
+ public void close() throws SQLException {
+ delegate.close();
+ }
+
+ public int getMaxFieldSize() throws SQLException {
+ return delegate.getMaxFieldSize();
+ }
+
+ public void setMaxFieldSize(int max) throws SQLException {
+ delegate.setMaxFieldSize(max);
+ }
+
+ public int getMaxRows() throws SQLException {
+ return delegate.getMaxRows();
+ }
+
+ public void setMaxRows(int max) throws SQLException {
+ delegate.setMaxRows(max);
+ }
+
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ delegate.setEscapeProcessing(enable);
+ }
+
+ public int getQueryTimeout() throws SQLException {
+ return delegate.getQueryTimeout();
+ }
+
+ public void setQueryTimeout(int seconds) throws SQLException {
+ delegate.setQueryTimeout(seconds);
+ }
+
+ public void cancel() throws SQLException {
+ delegate.cancel();
+ }
+
+ public SQLWarning getWarnings() throws SQLException {
+ return delegate.getWarnings();
+ }
+
+ public void clearWarnings() throws SQLException {
+ delegate.clearWarnings();
+ }
+
+ public void setCursorName(String name) throws SQLException {
+ delegate.setCursorName(name);
+ }
+
+ public boolean execute(String sql) throws SQLException {
+ return delegate.execute(sql);
+ }
+
+ public ResultSet getResultSet() throws SQLException {
+ return delegate.getResultSet();
+ }
+
+ public int getUpdateCount() throws SQLException {
+ return delegate.getUpdateCount();
+ }
+
+ public boolean getMoreResults() throws SQLException {
+ return delegate.getMoreResults();
+ }
+
+ public void setFetchDirection(int direction) throws SQLException {
+ delegate.setFetchDirection(direction);
+ }
+
+ public int getFetchDirection() throws SQLException {
+ return delegate.getFetchDirection();
+ }
+
+ public void setFetchSize(int rows) throws SQLException {
+ delegate.setFetchSize(rows);
+ }
+
+ public int getFetchSize() throws SQLException {
+ return delegate.getFetchSize();
+ }
+
+ public int getResultSetConcurrency() throws SQLException {
+ return delegate.getResultSetConcurrency();
+ }
+
+ public int getResultSetType() throws SQLException {
+ return delegate.getResultSetType();
+ }
+
+ public void addBatch(String sql) throws SQLException {
+ delegate.addBatch(sql);
+ }
+
+ public void clearBatch() throws SQLException {
+ delegate.clearBatch();
+ }
+
+ public int[] executeBatch() throws SQLException {
+ if (executeBatchErrorOps.contains(executeBatchErrorOpsCount.incrementAndGet())) {
+ throw new SQLRecoverableException("SOME executeBatch ERROR[" + executeBatchErrorOpsCount.get() +"]");
+ }
+ return delegate.executeBatch();
+ }
+
+ public Connection getConnection() throws SQLException {
+ return delegate.getConnection();
+ }
+
+ public boolean getMoreResults(int current) throws SQLException {
+ return delegate.getMoreResults(current);
+ }
+
+ public ResultSet getGeneratedKeys() throws SQLException {
+ return delegate.getGeneratedKeys();
+ }
+
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ return delegate.executeUpdate(sql, autoGeneratedKeys);
+ }
+
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ return delegate.executeUpdate(sql, columnIndexes);
+ }
+
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ return delegate.executeUpdate(sql, columnNames);
+ }
+
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ return delegate.execute(sql, autoGeneratedKeys);
+ }
+
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ return delegate.execute(sql, columnIndexes);
+ }
+
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ return delegate.execute(sql, columnNames);
+ }
+
+ public int getResultSetHoldability() throws SQLException {
+ return delegate.getResultSetHoldability();
+ }
+
+ public boolean isClosed() throws SQLException {
+ return delegate.isClosed();
+ }
+
+ public void setPoolable(boolean poolable) throws SQLException {
+ delegate.setPoolable(poolable);
+ }
+
+ public boolean isPoolable() throws SQLException {
+ return delegate.isPoolable();
+ }
+
+ public void closeOnCompletion() throws SQLException {
+ delegate.closeOnCompletion();
+ }
+
+ public boolean isCloseOnCompletion() throws SQLException {
+ return delegate.isCloseOnCompletion();
+ }
+/*
+ public long getLargeUpdateCount() throws SQLException {
+ return delegate.getLargeUpdateCount();
+ }
+
+ public void setLargeMaxRows(long max) throws SQLException {
+ delegate.setLargeMaxRows(max);
+ }
+
+ public long getLargeMaxRows() throws SQLException {
+ return delegate.getLargeMaxRows();
+ }
+
+ public long[] executeLargeBatch() throws SQLException {
+ return delegate.executeLargeBatch();
+ }
+
+ public long executeLargeUpdate(String sql) throws SQLException {
+ return delegate.executeLargeUpdate(sql);
+ }
+
+ public long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ return delegate.executeLargeUpdate(sql, autoGeneratedKeys);
+ }
+
+ public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ return delegate.executeLargeUpdate(sql, columnIndexes);
+ }
+
+ public long executeLargeUpdate(String sql, String[] columnNames) throws SQLException {
+ return delegate.executeLargeUpdate(sql, columnNames);
+ }
+*/
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return delegate.unwrap(iface);
+ }
+
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return delegate.isWrapperFor(iface);
+ }
+ };
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return realConnection.prepareCall(sql);
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return realConnection.nativeSQL(sql);
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+ realConnection.setAutoCommit(autoCommit);
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ if (getAutoCommitErrors.contains(getAutoCommitCount.incrementAndGet())) {
+ throw new SQLRecoverableException("AutoCommit[" + getAutoCommitCount.get() +"]");
+ }
+ return realConnection.getAutoCommit();
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+ realConnection.rollback();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ realConnection.close();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return realConnection.isClosed();
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return realConnection.getMetaData();
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+ realConnection.setReadOnly(readOnly);
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return realConnection.isReadOnly();
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+ realConnection.setCatalog(catalog);
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return realConnection.getCatalog();
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+ realConnection.setTransactionIsolation(level);
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return realConnection.getTransactionIsolation();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return realConnection.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ realConnection.clearWarnings();
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
+ return realConnection.createStatement(resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
+ return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return realConnection.getTypeMap();
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ realConnection.setTypeMap(map);
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+ realConnection.setHoldability(holdability);
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return realConnection.getHoldability();
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return realConnection.setSavepoint();
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return realConnection.setSavepoint(name);
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+ realConnection.rollback();
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ realConnection.releaseSavepoint(savepoint);
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return realConnection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return realConnection.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return realConnection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return realConnection.prepareStatement(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return realConnection.prepareStatement(sql, columnIndexes);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return realConnection.prepareStatement(sql, columnNames);
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return realConnection.createClob();
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return realConnection.createBlob();
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return realConnection.createNClob();
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return realConnection.createSQLXML();
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return realConnection.isValid(timeout);
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+ realConnection.setClientInfo(name, value);
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+ realConnection.setClientInfo(properties);
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return realConnection.getClientInfo(name);
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return realConnection.getClientInfo();
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return realConnection.createArrayOf(typeName, elements);
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return realConnection.createStruct(typeName, attributes);
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ realConnection.setSchema(schema);
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return realConnection.getSchema();
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {
+ realConnection.abort(executor);
+ }
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+ realConnection.setNetworkTimeout(executor, milliseconds);
+ }
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return realConnection.getNetworkTimeout();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return realConnection.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return realConnection.isWrapperFor(iface);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bd45d931/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 8da0ff6..ee3a88c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -318,6 +318,7 @@ public class XACompletionTest extends TestSupport {
resource.recover(XAResource.TMSTARTRSCAN);
resource.recover(XAResource.TMNOFLAGS);
+ dumpMessages();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
@@ -342,6 +343,9 @@ public class XACompletionTest extends TestSupport {
consumer.close();
+ LOG.info("after close");
+ dumpMessages();
+
assertEquals("drain", 5, drainUnack(5, "TEST"));
dumpMessages();
|