Add PostGres Driver
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/79904aeb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/79904aeb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/79904aeb
Branch: refs/heads/master
Commit: 79904aeb64a5e5b7bc9aa8f2060d32327e9092ca
Parents: 466d43c
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue May 31 12:19:29 2016 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Wed Jun 1 16:09:42 2016 +0100
----------------------------------------------------------------------
artemis-jdbc-store/pom.xml | 6 +
.../activemq/artemis/jdbc/store/JDBCUtils.java | 31 +-
.../jdbc/store/drivers/AbstractJDBCDriver.java | 59 +++-
.../drivers/postgres/PostgresSQLProvider.java | 53 +++
.../PostgresSequentialSequentialFileDriver.java | 169 +++++++++
.../jdbc/store/file/JDBCFileFactoryDriver.java | 344 -------------------
.../jdbc/store/file/JDBCSequentialFile.java | 6 +-
.../store/file/JDBCSequentialFileFactory.java | 3 +-
.../file/JDBCSequentialFileFactoryDriver.java | 323 +++++++++++++++++
.../jdbc/store/journal/JDBCJournalImpl.java | 3 +-
pom.xml | 8 +
tests/integration-tests/pom.xml | 10 +-
12 files changed, 646 insertions(+), 369 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index 86fe1a6..bb54c12 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -53,6 +53,7 @@
<scope>test</scope>
</dependency>
+ <!-- Database driver support -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
@@ -60,6 +61,11 @@
</dependency>
<dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-journal</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index a1bde56..04ac242 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -24,7 +24,9 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.JDBCFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
@@ -70,24 +72,35 @@ public class JDBCUtils {
if (driverClass.contains("derby")) {
return new DerbySQLProvider(tableName);
}
+ else if (driverClass.contains("postgres")) {
+ return new PostgresSQLProvider(tableName);
+ }
else {
return new GenericSQLProvider(tableName);
}
}
- public static JDBCFileFactoryDriver getDBFileDriver(String driverClass, String tableName,
String jdbcConnectionUrl) throws SQLException {
- JDBCFileFactoryDriver dbDriver;
+ public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
+ String tableName,
+ String jdbcConnectionUrl)
throws SQLException {
+ JDBCSequentialFileFactoryDriver dbDriver;
if (driverClass.contains("derby")) {
- dbDriver = new JDBCFileFactoryDriver();
+ dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
- dbDriver.setConnectionURL(jdbcConnectionUrl);
- dbDriver.setDriverClass(driverClass);
+ dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+ dbDriver.setJdbcDriverClass(driverClass);
+ }
+ else if (driverClass.contains("postgres")) {
+ dbDriver = new PostgresSequentialSequentialFileDriver();
+ dbDriver.setSqlProvider(new PostgresSQLProvider(tableName));
+ dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+ dbDriver.setJdbcDriverClass(driverClass);
}
else {
- dbDriver = new JDBCFileFactoryDriver();
+ dbDriver = new JDBCSequentialFileFactoryDriver();
dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
- dbDriver.setConnectionURL(jdbcConnectionUrl);
- dbDriver.setDriverClass(driverClass);
+ dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+ dbDriver.setJdbcDriverClass(driverClass);
}
return dbDriver;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index 1c282c0..6d8be71 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -33,14 +33,17 @@ public abstract class AbstractJDBCDriver {
protected Connection connection;
- protected final SQLProvider sqlProvider;
+ protected SQLProvider sqlProvider;
- protected final String jdbcConnectionUrl;
+ protected String jdbcConnectionUrl;
- protected final String jdbcDriverClass;
+ protected String jdbcDriverClass;
protected Driver dbDriver;
+ public AbstractJDBCDriver() {
+ }
+
public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass)
{
this.jdbcConnectionUrl = jdbcConnectionUrl;
this.jdbcDriverClass = jdbcDriverClass;
@@ -53,7 +56,7 @@ public abstract class AbstractJDBCDriver {
prepareStatements();
}
- public void stop() throws Exception {
+ public void stop() throws SQLException {
if (sqlProvider.closeConnectionOnShutdown()) {
connection.close();
}
@@ -79,10 +82,48 @@ public abstract class AbstractJDBCDriver {
}
public void destroy() throws Exception {
- connection.setAutoCommit(false);
- Statement statement = connection.createStatement();
- statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
- statement.close();
- connection.commit();
+ try {
+ connection.setAutoCommit(false);
+ Statement statement = connection.createStatement();
+ statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
+ statement.close();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ public SQLProvider getSqlProvider() {
+ return sqlProvider;
+ }
+
+ public void setSqlProvider(SQLProvider sqlProvider) {
+ this.sqlProvider = sqlProvider;
+ }
+
+ public String getJdbcConnectionUrl() {
+ return jdbcConnectionUrl;
+ }
+
+ public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
+ this.jdbcConnectionUrl = jdbcConnectionUrl;
+ }
+
+ public String getJdbcDriverClass() {
+ return jdbcDriverClass;
+ }
+
+ public void setJdbcDriverClass(String jdbcDriverClass) {
+ this.jdbcDriverClass = jdbcDriverClass;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
new file mode 100644
index 0000000..664202b
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
+
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+
+public class PostgresSQLProvider extends GenericSQLProvider {
+
+ // BYTEA Size used in Journal
+ private static final int MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB
+
+ private final String createFileTableSQL;
+
+ private final String createJournalTableSQL;
+
+ public PostgresSQLProvider(String tName) {
+ super(tName.toLowerCase());
+ createFileTableSQL = "CREATE TABLE " + tableName +
+ "(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
+
+ createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount
SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize
INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)";
+ }
+
+ @Override
+ public String getCreateFileTableSQL() {
+ return createFileTableSQL;
+ }
+
+ @Override
+ public String getCreateJournalTableSQL() {
+ return createJournalTableSQL;
+ }
+
+ @Override
+ public int getMaxBlobSize() {
+ return MAX_BLOB_SIZE;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
new file mode 100644
index 0000000..4d42d7f
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
+
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
+import org.postgresql.PGConnection;
+import org.postgresql.largeobject.LargeObject;
+import org.postgresql.largeobject.LargeObjectManager;
+
+public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver
{
+
+ private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
+
+ public PostgresSequentialSequentialFileDriver() throws SQLException {
+ super();
+ }
+
+ @Override
+ public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+
+ LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+ long oid = lobjManager.createLO();
+
+ createFile.setString(1, file.getFileName());
+ createFile.setString(2, file.getExtension());
+ createFile.setLong(3, oid);
+ createFile.executeUpdate();
+
+ try (ResultSet keys = createFile.getGeneratedKeys()) {
+ keys.next();
+ file.setId(keys.getInt(1));
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.setWritePosition(getPostGresLargeObjectSize(file));
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ @Override
+ public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
{
+ LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+ LargeObject largeObject = null;
+
+ Long oid = getOID(file);
+ try {
+ connection.setAutoCommit(false);
+ largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
+ largeObject.seek(largeObject.size());
+ largeObject.write(data);
+ largeObject.close();
+ connection.commit();
+ }
+ catch (Exception e) {
+ connection.rollback();
+ throw e;
+ }
+ return data.length;
+ }
+
+ @Override
+ public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws
SQLException {
+ LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+ LargeObject largeObject = null;
+ long oid = getOID(file);
+ try {
+ connection.setAutoCommit(false);
+ largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+ int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(),
file.position());
+
+ if (readLength > 0) {
+ if (file.position() > 0) largeObject.seek((int) file.position());
+ byte[] data = largeObject.read(readLength);
+ bytes.put(data);
+ }
+
+ largeObject.close();
+ connection.commit();
+
+ return readLength;
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
+ Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
+ if (oid == null) {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+ if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
+ System.out.println("FD");
+ }
+ return (Long) file.getMetaData(POSTGRES_OID_KEY);
+ }
+
+ private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException
{
+ LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+
+ int size = 0;
+ Long oid = getOID(file);
+ if (oid != null) {
+ try {
+ connection.setAutoCommit(false);
+ LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+ size = largeObject.size();
+ largeObject.close();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
deleted file mode 100644
index 04af009..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file;
-
-import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
-import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-
-public class JDBCFileFactoryDriver {
-
- protected Connection connection;
-
- protected SQLProvider sqlProvider;
-
- protected PreparedStatement deleteFile;
-
- protected PreparedStatement createFile;
-
- protected PreparedStatement selectFileByFileName;
-
- protected PreparedStatement copyFileRecord;
-
- protected PreparedStatement renameFile;
-
- protected PreparedStatement readLargeObject;
-
- protected PreparedStatement appendToLargeObject;
-
- protected PreparedStatement selectFileNamesByExtension;
-
- protected String connectionUrl;
-
- protected String driverClass;
-
- public JDBCFileFactoryDriver() {
- }
-
- public void setConnectionURL(String connectionUrl) {
- this.connectionUrl = connectionUrl;
- }
-
- public void setSqlProvider(SQLProvider sqlProvider) {
- this.sqlProvider = sqlProvider;
- }
-
- public void setDriverClass(String driverClass) {
- this.driverClass = driverClass;
- }
-
- public void start() throws Exception {
- Driver driver = JDBCUtils.getDriver(driverClass);
- connection = driver.connect(connectionUrl, new Properties());
- JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), sqlProvider.getCreateFileTableSQL());
- prepareStatements();
- }
-
- public void stop() throws SQLException {
- if (sqlProvider.closeConnectionOnShutdown())
- connection.close();
- }
-
- protected void prepareStatements() throws SQLException {
- this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
- this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
- this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
- this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
- this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
- this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
- this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
- this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
- }
-
- public synchronized List<String> listFiles(String extension) throws Exception {
- List<String> fileNames = new ArrayList<>();
- try {
- connection.setAutoCommit(false);
- selectFileNamesByExtension.setString(1, extension);
- try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
- while (rs.next()) {
- fileNames.add(rs.getString(1));
- }
- }
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- return fileNames;
- }
-
- /**
- * Opens the supplied file. If the file does not exist in the database it will create
a new one.
- *
- * @param file
- * @return
- * @throws SQLException
- */
- public void openFile(JDBCSequentialFile file) throws SQLException {
- int fileId = fileExists(file);
- if (fileId < 0) {
- createFile(file);
- }
- else {
- file.setId(fileId);
- loadFile(file);
- }
- }
-
- /**
- * Checks to see if a file with filename and extension exists. If so returns the ID of
the file or returns -1.
- *
- * @param file
- * @return
- * @throws SQLException
- */
- public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
- connection.setAutoCommit(false);
- selectFileByFileName.setString(1, file.getFileName());
- try (ResultSet rs = selectFileByFileName.executeQuery()) {
- int id = rs.next() ? rs.getInt(1) : -1;
- connection.commit();
- return id;
- }
- catch (Exception e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Loads an existing file.
- *
- * @param file
- * @throws SQLException
- */
- public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
- connection.setAutoCommit(false);
- readLargeObject.setInt(1, file.getId());
-
- try (ResultSet rs = readLargeObject.executeQuery()) {
- if (rs.next()) {
- file.setWritePosition((int) rs.getBlob(1).length());
- }
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Creates a new database row representing the supplied file.
- *
- * @param file
- * @throws SQLException
- */
- public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
- try {
- connection.setAutoCommit(false);
- createFile.setString(1, file.getFileName());
- createFile.setString(2, file.getExtension());
- createFile.setBytes(3, new byte[0]);
- createFile.executeUpdate();
- try (ResultSet keys = createFile.getGeneratedKeys()) {
- keys.next();
- file.setId(keys.getInt(1));
- }
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Updates the fileName field to the new value.
- *
- * @param file
- * @param newFileName
- * @throws SQLException
- */
- public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws
SQLException {
- try {
- connection.setAutoCommit(false);
- renameFile.setString(1, newFileName);
- renameFile.setInt(2, file.getId());
- renameFile.executeUpdate();
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Deletes the associated row in the database.
- *
- * @param file
- * @throws SQLException
- */
- public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
- try {
- connection.setAutoCommit(false);
- deleteFile.setInt(1, file.getId());
- deleteFile.executeUpdate();
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Persists data to this files associated database mapping.
- *
- * @param file
- * @param data
- * @return
- * @throws Exception
- */
- public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
{
- try {
- connection.setAutoCommit(false);
- appendToLargeObject.setBytes(1, data);
- appendToLargeObject.setInt(2, file.getId());
- appendToLargeObject.executeUpdate();
- connection.commit();
- return data.length;
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Reads data from the file (at file.readPosition) into the byteBuffer.
- *
- * @param file
- * @param bytes
- * @return
- * @throws Exception
- */
- public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws
SQLException {
- connection.setAutoCommit(false);
- readLargeObject.setInt(1, file.getId());
- int readLength = 0;
- try (ResultSet rs = readLargeObject.executeQuery()) {
- if (rs.next()) {
- Blob blob = rs.getBlob(1);
- readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
- byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
- bytes.put(data);
- }
- connection.commit();
- return readLength;
- }
- catch (Throwable e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Copy the data content of FileFrom to FileTo
- *
- * @param fileFrom
- * @param fileTo
- * @throws SQLException
- */
- public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile
fileTo) throws SQLException {
- try {
- connection.setAutoCommit(false);
- copyFileRecord.setInt(1, fileFrom.getId());
- copyFileRecord.setInt(2, fileTo.getId());
- copyFileRecord.executeUpdate();
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- /**
- * Drop all tables and data
- */
- public synchronized void destroy() throws SQLException {
- try {
- connection.setAutoCommit(false);
- Statement statement = connection.createStatement();
- statement.executeUpdate(sqlProvider.getDropFileTableSQL());
- connection.commit();
- }
- catch (SQLException e) {
- connection.rollback();
- throw e;
- }
- }
-
- public long calculateReadLength(long objectLength, int bufferSpace, long readPosition)
{
- long bytesRemaining = objectLength - readPosition;
- if (bytesRemaining > bufferSpace) {
- return bufferSpace;
- }
- else {
- return bytesRemaining;
- }
- }
-
- public int getMaxSize() {
- return sqlProvider.getMaxBlobSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6b91223..5de8761 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -59,9 +59,7 @@ public class JDBCSequentialFile implements SequentialFile {
private final Object writeLock;
- private final JDBCFileFactoryDriver dbDriver;
-
- private static final Logger log = Logger.getLogger(JDBCSequentialFile.class.getName());
+ private final JDBCSequentialFileFactoryDriver dbDriver;
// Allows DB Drivers to cache meta data.
private Map<Object, Object> metaData = new ConcurrentHashMap<>();
@@ -69,7 +67,7 @@ public class JDBCSequentialFile implements SequentialFile {
public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
final String filename,
final Executor executor,
- final JDBCFileFactoryDriver driver,
+ final JDBCSequentialFileFactoryDriver driver,
final Object writeLock) throws SQLException {
this.fileFactory = fileFactory;
this.filename = filename;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 07e30a9..3454757 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -42,7 +42,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
private Map<String, Object> fileLocks = new HashMap<>();
- private final JDBCFileFactoryDriver dbDriver;
+ private final JDBCSequentialFileFactoryDriver dbDriver;
public JDBCSequentialFileFactory(final String connectionUrl,
final String tableName,
@@ -184,5 +184,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
}
public synchronized void destroy() throws SQLException {
+ dbDriver.destroy();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
new file mode 100644
index 0000000..f8ad06b
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+
+public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
+
+ protected PreparedStatement deleteFile;
+
+ protected PreparedStatement createFile;
+
+ protected PreparedStatement selectFileByFileName;
+
+ protected PreparedStatement copyFileRecord;
+
+ protected PreparedStatement renameFile;
+
+ protected PreparedStatement readLargeObject;
+
+ protected PreparedStatement appendToLargeObject;
+
+ protected PreparedStatement selectFileNamesByExtension;
+
+ public JDBCSequentialFileFactoryDriver() {
+ super();
+ }
+
+ public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String
jdbcDriverClass) {
+ super(tableName, jdbcConnectionUrl, jdbcDriverClass);
+ }
+
+ public void start() throws Exception {
+ super.start();
+ }
+
+ @Override
+ protected void createSchema() throws SQLException {
+ createTable(sqlProvider.getCreateFileTableSQL());
+ }
+
+ @Override
+ protected void prepareStatements() throws SQLException {
+ this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
+ this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
+ this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
+ this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
+ this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+ this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
+ this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
+ this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+ }
+
+ public synchronized List<String> listFiles(String extension) throws Exception {
+ List<String> fileNames = new ArrayList<>();
+ try {
+ connection.setAutoCommit(false);
+ selectFileNamesByExtension.setString(1, extension);
+ try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+ while (rs.next()) {
+ fileNames.add(rs.getString(1));
+ }
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ return fileNames;
+ }
+
+ /**
+ * Opens the supplied file. If the file does not exist in the database it will create
a new one.
+ *
+ * @param file
+ * @return
+ * @throws SQLException
+ */
+ public void openFile(JDBCSequentialFile file) throws SQLException {
+ int fileId = fileExists(file);
+ if (fileId < 0) {
+ createFile(file);
+ }
+ else {
+ file.setId(fileId);
+ loadFile(file);
+ }
+ }
+
+ /**
+ * Checks to see if a file with filename and extension exists. If so returns the ID of
the file or returns -1.
+ *
+ * @param file
+ * @return
+ * @throws SQLException
+ */
+ public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
+ connection.setAutoCommit(false);
+ selectFileByFileName.setString(1, file.getFileName());
+ try (ResultSet rs = selectFileByFileName.executeQuery()) {
+ int id = rs.next() ? rs.getInt(1) : -1;
+ connection.commit();
+ return id;
+ }
+ catch (Exception e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Loads an existing file.
+ *
+ * @param file
+ * @throws SQLException
+ */
+ public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ file.setWritePosition((int) rs.getBlob(1).length());
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Creates a new database row representing the supplied file.
+ *
+ * @param file
+ * @throws SQLException
+ */
+ public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ createFile.setString(1, file.getFileName());
+ createFile.setString(2, file.getExtension());
+ createFile.setBytes(3, new byte[0]);
+ createFile.executeUpdate();
+ try (ResultSet keys = createFile.getGeneratedKeys()) {
+ keys.next();
+ file.setId(keys.getInt(1));
+ }
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Updates the fileName field to the new value.
+ *
+ * @param file
+ * @param newFileName
+ * @throws SQLException
+ */
+ public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws
SQLException {
+ try {
+ connection.setAutoCommit(false);
+ renameFile.setString(1, newFileName);
+ renameFile.setInt(2, file.getId());
+ renameFile.executeUpdate();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Deletes the associated row in the database.
+ *
+ * @param file
+ * @throws SQLException
+ */
+ public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ deleteFile.setInt(1, file.getId());
+ deleteFile.executeUpdate();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Persists data to this files associated database mapping.
+ *
+ * @param file
+ * @param data
+ * @return
+ * @throws Exception
+ */
+ public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
{
+ try {
+ connection.setAutoCommit(false);
+ appendToLargeObject.setBytes(1, data);
+ appendToLargeObject.setInt(2, file.getId());
+ appendToLargeObject.executeUpdate();
+ connection.commit();
+ return data.length;
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Reads data from the file (at file.readPosition) into the byteBuffer.
+ *
+ * @param file
+ * @param bytes
+ * @return
+ * @throws Exception
+ */
+ public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws
SQLException {
+ connection.setAutoCommit(false);
+ readLargeObject.setInt(1, file.getId());
+ int readLength = 0;
+ try (ResultSet rs = readLargeObject.executeQuery()) {
+ if (rs.next()) {
+ Blob blob = rs.getBlob(1);
+ readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+ byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
+ bytes.put(data);
+ }
+ connection.commit();
+ return readLength;
+ }
+ catch (Throwable e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Copy the data content of FileFrom to FileTo
+ *
+ * @param fileFrom
+ * @param fileTo
+ * @throws SQLException
+ */
+ public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile
fileTo) throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ copyFileRecord.setInt(1, fileFrom.getId());
+ copyFileRecord.setInt(2, fileTo.getId());
+ copyFileRecord.executeUpdate();
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ /**
+ * Drop all tables and data
+ */
+ public synchronized void destroy() throws SQLException {
+ try {
+ connection.setAutoCommit(false);
+ Statement statement = connection.createStatement();
+ statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+ connection.commit();
+ }
+ catch (SQLException e) {
+ connection.rollback();
+ throw e;
+ }
+ }
+
+ public long calculateReadLength(long objectLength, int bufferSpace, long readPosition)
{
+ long bytesRemaining = objectLength - readPosition;
+ if (bytesRemaining > bufferSpace) {
+ return bufferSpace;
+ }
+ else {
+ return bytesRemaining;
+ }
+ }
+
+ public int getMaxSize() {
+ return sqlProvider.getMaxBlobSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 1e6f393..6c05112 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -95,6 +95,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
createTable(sqlProvider.getCreateJournalTableSQL());
}
+ @Override
protected void prepareStatements() throws SQLException {
insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
@@ -104,7 +105,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
}
@Override
- public synchronized void stop() throws Exception {
+ public synchronized void stop() throws SQLException {
if (started) {
synchronized (journalLock) {
syncTimer.cancel();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba4d6a5..d8e4f11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,14 @@
</dependency>
<dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>9.4-1205-jdbc4</version>
+ <scope>provided</scope>
+ <!-- postgresql license -->
+ </dependency>
+
+ <dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections-testframework</artifactId>
<version>${commons.collections.version}</version>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 90f5425..2d62ce0 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -240,10 +240,18 @@
<artifactId>jboss-javaee</artifactId>
<version>5.0.0.GA</version>
</dependency>
+
+ <!-- DB Test Deps -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
- <version>${apache.derby.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <scope>test</scope>
</dependency>
<!--Vertx provided dependencies-->
<dependency>
|