activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/5] activemq-artemis git commit: Add PostGres Driver
Date Wed, 01 Jun 2016 21:33:17 GMT
Add PostGres Driver


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/79904aeb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/79904aeb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/79904aeb

Branch: refs/heads/master
Commit: 79904aeb64a5e5b7bc9aa8f2060d32327e9092ca
Parents: 466d43c
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue May 31 12:19:29 2016 +0100
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Wed Jun 1 16:09:42 2016 +0100

----------------------------------------------------------------------
 artemis-jdbc-store/pom.xml                      |   6 +
 .../activemq/artemis/jdbc/store/JDBCUtils.java  |  31 +-
 .../jdbc/store/drivers/AbstractJDBCDriver.java  |  59 +++-
 .../drivers/postgres/PostgresSQLProvider.java   |  53 +++
 .../PostgresSequentialSequentialFileDriver.java | 169 +++++++++
 .../jdbc/store/file/JDBCFileFactoryDriver.java  | 344 -------------------
 .../jdbc/store/file/JDBCSequentialFile.java     |   6 +-
 .../store/file/JDBCSequentialFileFactory.java   |   3 +-
 .../file/JDBCSequentialFileFactoryDriver.java   | 323 +++++++++++++++++
 .../jdbc/store/journal/JDBCJournalImpl.java     |   3 +-
 pom.xml                                         |   8 +
 tests/integration-tests/pom.xml                 |  10 +-
 12 files changed, 646 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/pom.xml b/artemis-jdbc-store/pom.xml
index 86fe1a6..bb54c12 100644
--- a/artemis-jdbc-store/pom.xml
+++ b/artemis-jdbc-store/pom.xml
@@ -53,6 +53,7 @@
          <scope>test</scope>
       </dependency>
 
+      <!-- Database driver support -->
       <dependency>
          <groupId>org.apache.derby</groupId>
          <artifactId>derby</artifactId>
@@ -60,6 +61,11 @@
       </dependency>
 
       <dependency>
+         <groupId>org.postgresql</groupId>
+         <artifactId>postgresql</artifactId>
+      </dependency>
+
+      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>artemis-journal</artifactId>
          <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
index a1bde56..04ac242 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/JDBCUtils.java
@@ -24,7 +24,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
-import org.apache.activemq.artemis.jdbc.store.file.JDBCFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSQLProvider;
+import org.apache.activemq.artemis.jdbc.store.drivers.postgres.PostgresSequentialSequentialFileDriver;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 
@@ -70,24 +72,35 @@ public class JDBCUtils {
       if (driverClass.contains("derby")) {
          return new DerbySQLProvider(tableName);
       }
+      else if (driverClass.contains("postgres")) {
+         return new PostgresSQLProvider(tableName);
+      }
       else {
          return new GenericSQLProvider(tableName);
       }
    }
 
-   public static JDBCFileFactoryDriver getDBFileDriver(String driverClass, String tableName,
String jdbcConnectionUrl) throws SQLException {
-      JDBCFileFactoryDriver dbDriver;
+   public static JDBCSequentialFileFactoryDriver getDBFileDriver(String driverClass,
+                                                                 String tableName,
+                                                                 String jdbcConnectionUrl)
throws SQLException {
+      JDBCSequentialFileFactoryDriver dbDriver;
       if (driverClass.contains("derby")) {
-         dbDriver = new JDBCFileFactoryDriver();
+         dbDriver = new JDBCSequentialFileFactoryDriver();
          dbDriver.setSqlProvider(new DerbySQLProvider(tableName));
-         dbDriver.setConnectionURL(jdbcConnectionUrl);
-         dbDriver.setDriverClass(driverClass);
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
+      }
+      else if (driverClass.contains("postgres")) {
+         dbDriver = new PostgresSequentialSequentialFileDriver();
+         dbDriver.setSqlProvider(new PostgresSQLProvider(tableName));
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
       }
       else {
-         dbDriver = new JDBCFileFactoryDriver();
+         dbDriver = new JDBCSequentialFileFactoryDriver();
          dbDriver.setSqlProvider(new GenericSQLProvider(tableName));
-         dbDriver.setConnectionURL(jdbcConnectionUrl);
-         dbDriver.setDriverClass(driverClass);
+         dbDriver.setJdbcConnectionUrl(jdbcConnectionUrl);
+         dbDriver.setJdbcDriverClass(driverClass);
       }
       return dbDriver;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index 1c282c0..6d8be71 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -33,14 +33,17 @@ public abstract class AbstractJDBCDriver {
 
    protected Connection connection;
 
-   protected final SQLProvider sqlProvider;
+   protected SQLProvider sqlProvider;
 
-   protected final String jdbcConnectionUrl;
+   protected String jdbcConnectionUrl;
 
-   protected final String jdbcDriverClass;
+   protected String jdbcDriverClass;
 
    protected Driver dbDriver;
 
+   public AbstractJDBCDriver() {
+   }
+
    public AbstractJDBCDriver(String tableName, String jdbcConnectionUrl, String jdbcDriverClass)
{
       this.jdbcConnectionUrl = jdbcConnectionUrl;
       this.jdbcDriverClass = jdbcDriverClass;
@@ -53,7 +56,7 @@ public abstract class AbstractJDBCDriver {
       prepareStatements();
    }
 
-   public void stop() throws Exception {
+   public void stop() throws SQLException {
       if (sqlProvider.closeConnectionOnShutdown()) {
          connection.close();
       }
@@ -79,10 +82,48 @@ public abstract class AbstractJDBCDriver {
    }
 
    public void destroy() throws Exception {
-      connection.setAutoCommit(false);
-      Statement statement = connection.createStatement();
-      statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
-      statement.close();
-      connection.commit();
+      try {
+         connection.setAutoCommit(false);
+         Statement statement = connection.createStatement();
+         statement.executeUpdate("DROP TABLE " + sqlProvider.getTableName());
+         statement.close();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   public Connection getConnection() {
+      return connection;
+   }
+
+   public void setConnection(Connection connection) {
+      this.connection = connection;
+   }
+
+   public SQLProvider getSqlProvider() {
+      return sqlProvider;
+   }
+
+   public void setSqlProvider(SQLProvider sqlProvider) {
+      this.sqlProvider = sqlProvider;
+   }
+
+   public String getJdbcConnectionUrl() {
+      return jdbcConnectionUrl;
+   }
+
+   public void setJdbcConnectionUrl(String jdbcConnectionUrl) {
+      this.jdbcConnectionUrl = jdbcConnectionUrl;
+   }
+
+   public String getJdbcDriverClass() {
+      return jdbcDriverClass;
+   }
+
+   public void setJdbcDriverClass(String jdbcDriverClass) {
+      this.jdbcDriverClass = jdbcDriverClass;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
new file mode 100644
index 0000000..664202b
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSQLProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
+
+import org.apache.activemq.artemis.jdbc.store.sql.GenericSQLProvider;
+
+public class PostgresSQLProvider extends GenericSQLProvider {
+
+   // BYTEA Size used in Journal
+   private static final int MAX_BLOB_SIZE = 1024 * 1024 * 1024; // 1GB
+
+   private final String createFileTableSQL;
+
+   private final String createJournalTableSQL;
+
+   public PostgresSQLProvider(String tName) {
+      super(tName.toLowerCase());
+      createFileTableSQL = "CREATE TABLE " + tableName +
+         "(ID SERIAL, FILENAME VARCHAR(255), EXTENSION VARCHAR(10), DATA OID, PRIMARY KEY(ID))";
+
+      createJournalTableSQL = "CREATE TABLE " + tableName + "(id BIGINT,recordType SMALLINT,compactCount
SMALLINT,txId BIGINT,userRecordType SMALLINT,variableSize INTEGER,record BYTEA,txDataSize
INTEGER,txData BYTEA,txCheckNoRecords INTEGER,seq BIGINT)";
+   }
+
+   @Override
+   public String getCreateFileTableSQL() {
+      return createFileTableSQL;
+   }
+
+   @Override
+   public String getCreateJournalTableSQL() {
+      return createJournalTableSQL;
+   }
+
+   @Override
+   public int getMaxBlobSize() {
+      return MAX_BLOB_SIZE;
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
new file mode 100644
index 0000000..4d42d7f
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/postgres/PostgresSequentialSequentialFileDriver.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.drivers.postgres;
+
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFileFactoryDriver;
+import org.apache.activemq.artemis.jdbc.store.file.JDBCSequentialFile;
+import org.postgresql.PGConnection;
+import org.postgresql.largeobject.LargeObject;
+import org.postgresql.largeobject.LargeObjectManager;
+
+public class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver
{
+
+   private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
+
+   public PostgresSequentialSequentialFileDriver() throws SQLException {
+      super();
+   }
+
+   @Override
+   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+
+         LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+         long oid = lobjManager.createLO();
+
+         createFile.setString(1, file.getFileName());
+         createFile.setString(2, file.getExtension());
+         createFile.setLong(3, oid);
+         createFile.executeUpdate();
+
+         try (ResultSet keys = createFile.getGeneratedKeys()) {
+            keys.next();
+            file.setId(keys.getInt(1));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   @Override
+   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            file.setWritePosition(getPostGresLargeObjectSize(file));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   @Override
+   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
{
+      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+      LargeObject largeObject = null;
+
+      Long oid = getOID(file);
+      try {
+         connection.setAutoCommit(false);
+         largeObject = lobjManager.open(oid, LargeObjectManager.WRITE);
+         largeObject.seek(largeObject.size());
+         largeObject.write(data);
+         largeObject.close();
+         connection.commit();
+      }
+      catch (Exception e) {
+         connection.rollback();
+         throw e;
+      }
+      return data.length;
+   }
+
+   @Override
+   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws
SQLException {
+      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+      LargeObject largeObject = null;
+      long oid = getOID(file);
+      try {
+         connection.setAutoCommit(false);
+         largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+         int readLength = (int) calculateReadLength(largeObject.size(), bytes.remaining(),
file.position());
+
+         if (readLength > 0) {
+            if (file.position() > 0) largeObject.seek((int) file.position());
+            byte[] data = largeObject.read(readLength);
+            bytes.put(data);
+         }
+
+         largeObject.close();
+         connection.commit();
+
+         return readLength;
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   private synchronized Long getOID(JDBCSequentialFile file) throws SQLException {
+      Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
+      if (oid == null) {
+         connection.setAutoCommit(false);
+         readLargeObject.setInt(1, file.getId());
+         try (ResultSet rs = readLargeObject.executeQuery()) {
+            if (rs.next()) {
+               file.addMetaData(POSTGRES_OID_KEY, rs.getLong(1));
+            }
+            connection.commit();
+         }
+         catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
+      }
+      if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
+         System.out.println("FD");
+      }
+      return (Long) file.getMetaData(POSTGRES_OID_KEY);
+   }
+
+   private synchronized int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException
{
+      LargeObjectManager lobjManager = ((PGConnection) connection).getLargeObjectAPI();
+
+      int size = 0;
+      Long oid = getOID(file);
+      if (oid != null) {
+         try {
+            connection.setAutoCommit(false);
+            LargeObject largeObject = lobjManager.open(oid, LargeObjectManager.READ);
+            size = largeObject.size();
+            largeObject.close();
+            connection.commit();
+         }
+         catch (SQLException e) {
+            connection.rollback();
+            throw e;
+         }
+      }
+      return size;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
deleted file mode 100644
index 04af009..0000000
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCFileFactoryDriver.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.jdbc.store.file;
-
-import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.activemq.artemis.jdbc.store.JDBCUtils;
-import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
-
-public class JDBCFileFactoryDriver {
-
-   protected Connection connection;
-
-   protected SQLProvider sqlProvider;
-
-   protected PreparedStatement deleteFile;
-
-   protected PreparedStatement createFile;
-
-   protected PreparedStatement selectFileByFileName;
-
-   protected PreparedStatement copyFileRecord;
-
-   protected PreparedStatement renameFile;
-
-   protected PreparedStatement readLargeObject;
-
-   protected PreparedStatement appendToLargeObject;
-
-   protected PreparedStatement selectFileNamesByExtension;
-
-   protected String connectionUrl;
-
-   protected String driverClass;
-
-   public JDBCFileFactoryDriver() {
-   }
-
-   public void setConnectionURL(String connectionUrl) {
-      this.connectionUrl = connectionUrl;
-   }
-
-   public void setSqlProvider(SQLProvider sqlProvider) {
-      this.sqlProvider = sqlProvider;
-   }
-
-   public void setDriverClass(String driverClass) {
-      this.driverClass = driverClass;
-   }
-
-   public void start() throws Exception {
-      Driver driver = JDBCUtils.getDriver(driverClass);
-      connection = driver.connect(connectionUrl, new Properties());
-      JDBCUtils.createTableIfNotExists(connection, sqlProvider.getTableName(), sqlProvider.getCreateFileTableSQL());
-      prepareStatements();
-   }
-
-   public void stop() throws SQLException {
-      if (sqlProvider.closeConnectionOnShutdown())
-         connection.close();
-   }
-
-   protected void prepareStatements() throws SQLException {
-      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
-      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
-      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
-      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
-      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
-      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
-      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
-      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
-   }
-
-   public synchronized List<String> listFiles(String extension) throws Exception {
-      List<String> fileNames = new ArrayList<>();
-      try {
-         connection.setAutoCommit(false);
-         selectFileNamesByExtension.setString(1, extension);
-         try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
-            while (rs.next()) {
-               fileNames.add(rs.getString(1));
-            }
-         }
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-      return fileNames;
-   }
-
-   /**
-    * Opens the supplied file.  If the file does not exist in the database it will create
a new one.
-    *
-    * @param file
-    * @return
-    * @throws SQLException
-    */
-   public void openFile(JDBCSequentialFile file) throws SQLException {
-      int fileId = fileExists(file);
-      if (fileId < 0) {
-         createFile(file);
-      }
-      else {
-         file.setId(fileId);
-         loadFile(file);
-      }
-   }
-
-   /**
-    * Checks to see if a file with filename and extension exists.  If so returns the ID of
the file or returns -1.
-    *
-    * @param file
-    * @return
-    * @throws SQLException
-    */
-   public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      selectFileByFileName.setString(1, file.getFileName());
-      try (ResultSet rs = selectFileByFileName.executeQuery()) {
-         int id = rs.next() ? rs.getInt(1) : -1;
-         connection.commit();
-         return id;
-      }
-      catch (Exception e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Loads an existing file.
-    *
-    * @param file
-    * @throws SQLException
-    */
-   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
-
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            file.setWritePosition((int) rs.getBlob(1).length());
-         }
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Creates a new database row representing the supplied file.
-    *
-    * @param file
-    * @throws SQLException
-    */
-   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         createFile.setString(1, file.getFileName());
-         createFile.setString(2, file.getExtension());
-         createFile.setBytes(3, new byte[0]);
-         createFile.executeUpdate();
-         try (ResultSet keys = createFile.getGeneratedKeys()) {
-            keys.next();
-            file.setId(keys.getInt(1));
-         }
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Updates the fileName field to the new value.
-    *
-    * @param file
-    * @param newFileName
-    * @throws SQLException
-    */
-   public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws
SQLException {
-      try {
-         connection.setAutoCommit(false);
-         renameFile.setString(1, newFileName);
-         renameFile.setInt(2, file.getId());
-         renameFile.executeUpdate();
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Deletes the associated row in the database.
-    *
-    * @param file
-    * @throws SQLException
-    */
-   public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         deleteFile.setInt(1, file.getId());
-         deleteFile.executeUpdate();
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Persists data to this files associated database mapping.
-    *
-    * @param file
-    * @param data
-    * @return
-    * @throws Exception
-    */
-   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
{
-      try {
-         connection.setAutoCommit(false);
-         appendToLargeObject.setBytes(1, data);
-         appendToLargeObject.setInt(2, file.getId());
-         appendToLargeObject.executeUpdate();
-         connection.commit();
-         return data.length;
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Reads data from the file (at file.readPosition) into the byteBuffer.
-    *
-    * @param file
-    * @param bytes
-    * @return
-    * @throws Exception
-    */
-   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws
SQLException {
-      connection.setAutoCommit(false);
-      readLargeObject.setInt(1, file.getId());
-      int readLength = 0;
-      try (ResultSet rs = readLargeObject.executeQuery()) {
-         if (rs.next()) {
-            Blob blob = rs.getBlob(1);
-            readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
-            byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
-            bytes.put(data);
-         }
-         connection.commit();
-         return readLength;
-      }
-      catch (Throwable e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Copy the data content of FileFrom to FileTo
-    *
-    * @param fileFrom
-    * @param fileTo
-    * @throws SQLException
-    */
-   public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile
fileTo) throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         copyFileRecord.setInt(1, fileFrom.getId());
-         copyFileRecord.setInt(2, fileTo.getId());
-         copyFileRecord.executeUpdate();
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   /**
-    * Drop all tables and data
-    */
-   public synchronized void destroy() throws SQLException {
-      try {
-         connection.setAutoCommit(false);
-         Statement statement = connection.createStatement();
-         statement.executeUpdate(sqlProvider.getDropFileTableSQL());
-         connection.commit();
-      }
-      catch (SQLException e) {
-         connection.rollback();
-         throw e;
-      }
-   }
-
-   public long calculateReadLength(long objectLength, int bufferSpace, long readPosition)
{
-      long bytesRemaining = objectLength - readPosition;
-      if (bytesRemaining > bufferSpace) {
-         return bufferSpace;
-      }
-      else {
-         return bytesRemaining;
-      }
-   }
-
-   public int getMaxSize() {
-      return sqlProvider.getMaxBlobSize();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
index 6b91223..5de8761 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java
@@ -59,9 +59,7 @@ public class JDBCSequentialFile implements SequentialFile {
 
    private final Object writeLock;
 
-   private final JDBCFileFactoryDriver dbDriver;
-
-   private static final Logger log = Logger.getLogger(JDBCSequentialFile.class.getName());
+   private final JDBCSequentialFileFactoryDriver dbDriver;
 
    // Allows DB Drivers to cache meta data.
    private Map<Object, Object> metaData = new ConcurrentHashMap<>();
@@ -69,7 +67,7 @@ public class JDBCSequentialFile implements SequentialFile {
    public JDBCSequentialFile(final JDBCSequentialFileFactory fileFactory,
                              final String filename,
                              final Executor executor,
-                             final JDBCFileFactoryDriver driver,
+                             final JDBCSequentialFileFactoryDriver driver,
                              final Object writeLock) throws SQLException {
       this.fileFactory = fileFactory;
       this.filename = filename;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
index 07e30a9..3454757 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactory.java
@@ -42,7 +42,7 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
 
    private Map<String, Object> fileLocks = new HashMap<>();
 
-   private final JDBCFileFactoryDriver dbDriver;
+   private final JDBCSequentialFileFactoryDriver dbDriver;
 
    public JDBCSequentialFileFactory(final String connectionUrl,
                                     final String tableName,
@@ -184,5 +184,6 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory,
ActiveM
    }
 
    public synchronized void destroy() throws SQLException {
+      dbDriver.destroy();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
new file mode 100644
index 0000000..f8ad06b
--- /dev/null
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFileFactoryDriver.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jdbc.store.file;
+
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
+
+public class JDBCSequentialFileFactoryDriver extends AbstractJDBCDriver {
+
+   protected PreparedStatement deleteFile;
+
+   protected PreparedStatement createFile;
+
+   protected PreparedStatement selectFileByFileName;
+
+   protected PreparedStatement copyFileRecord;
+
+   protected PreparedStatement renameFile;
+
+   protected PreparedStatement readLargeObject;
+
+   protected PreparedStatement appendToLargeObject;
+
+   protected PreparedStatement selectFileNamesByExtension;
+
+   public JDBCSequentialFileFactoryDriver() {
+      super();
+   }
+
+   public JDBCSequentialFileFactoryDriver(String tableName, String jdbcConnectionUrl, String
jdbcDriverClass) {
+      super(tableName, jdbcConnectionUrl, jdbcDriverClass);
+   }
+
+   public void start() throws Exception {
+      super.start();
+   }
+
+   @Override
+   protected void createSchema() throws SQLException {
+      createTable(sqlProvider.getCreateFileTableSQL());
+   }
+
+   @Override
+   protected void prepareStatements() throws SQLException {
+      this.deleteFile = connection.prepareStatement(sqlProvider.getDeleteFileSQL());
+      this.createFile = connection.prepareStatement(sqlProvider.getInsertFileSQL(), Statement.RETURN_GENERATED_KEYS);
+      this.selectFileByFileName = connection.prepareStatement(sqlProvider.getSelectFileByFileName());
+      this.copyFileRecord = connection.prepareStatement(sqlProvider.getCopyFileRecordByIdSQL());
+      this.renameFile = connection.prepareStatement(sqlProvider.getUpdateFileNameByIdSQL());
+      this.readLargeObject = connection.prepareStatement(sqlProvider.getReadLargeObjectSQL());
+      this.appendToLargeObject = connection.prepareStatement(sqlProvider.getAppendToLargeObjectSQL());
+      this.selectFileNamesByExtension = connection.prepareStatement(sqlProvider.getSelectFileNamesByExtensionSQL());
+   }
+
+   public synchronized List<String> listFiles(String extension) throws Exception {
+      List<String> fileNames = new ArrayList<>();
+      try {
+         connection.setAutoCommit(false);
+         selectFileNamesByExtension.setString(1, extension);
+         try (ResultSet rs = selectFileNamesByExtension.executeQuery()) {
+            while (rs.next()) {
+               fileNames.add(rs.getString(1));
+            }
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+      return fileNames;
+   }
+
+   /**
+    * Opens the supplied file.  If the file does not exist in the database it will create
a new one.
+    *
+    * @param file
+    * @return
+    * @throws SQLException
+    */
+   public void openFile(JDBCSequentialFile file) throws SQLException {
+      int fileId = fileExists(file);
+      if (fileId < 0) {
+         createFile(file);
+      }
+      else {
+         file.setId(fileId);
+         loadFile(file);
+      }
+   }
+
+   /**
+    * Checks to see if a file with filename and extension exists.  If so returns the ID of
the file or returns -1.
+    *
+    * @param file
+    * @return
+    * @throws SQLException
+    */
+   public synchronized int fileExists(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      selectFileByFileName.setString(1, file.getFileName());
+      try (ResultSet rs = selectFileByFileName.executeQuery()) {
+         int id = rs.next() ? rs.getInt(1) : -1;
+         connection.commit();
+         return id;
+      }
+      catch (Exception e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Loads an existing file.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void loadFile(JDBCSequentialFile file) throws SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            file.setWritePosition((int) rs.getBlob(1).length());
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Creates a new database row representing the supplied file.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void createFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         createFile.setString(1, file.getFileName());
+         createFile.setString(2, file.getExtension());
+         createFile.setBytes(3, new byte[0]);
+         createFile.executeUpdate();
+         try (ResultSet keys = createFile.getGeneratedKeys()) {
+            keys.next();
+            file.setId(keys.getInt(1));
+         }
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Updates the fileName field to the new value.
+    *
+    * @param file
+    * @param newFileName
+    * @throws SQLException
+    */
+   public synchronized void renameFile(JDBCSequentialFile file, String newFileName) throws
SQLException {
+      try {
+         connection.setAutoCommit(false);
+         renameFile.setString(1, newFileName);
+         renameFile.setInt(2, file.getId());
+         renameFile.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Deletes the associated row in the database.
+    *
+    * @param file
+    * @throws SQLException
+    */
+   public synchronized void deleteFile(JDBCSequentialFile file) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         deleteFile.setInt(1, file.getId());
+         deleteFile.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Persists data to this files associated database mapping.
+    *
+    * @param file
+    * @param data
+    * @return
+    * @throws Exception
+    */
+   public synchronized int writeToFile(JDBCSequentialFile file, byte[] data) throws SQLException
{
+      try {
+         connection.setAutoCommit(false);
+         appendToLargeObject.setBytes(1, data);
+         appendToLargeObject.setInt(2, file.getId());
+         appendToLargeObject.executeUpdate();
+         connection.commit();
+         return data.length;
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Reads data from the file (at file.readPosition) into the byteBuffer.
+    *
+    * @param file
+    * @param bytes
+    * @return
+    * @throws Exception
+    */
+   public synchronized int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws
SQLException {
+      connection.setAutoCommit(false);
+      readLargeObject.setInt(1, file.getId());
+      int readLength = 0;
+      try (ResultSet rs = readLargeObject.executeQuery()) {
+         if (rs.next()) {
+            Blob blob = rs.getBlob(1);
+            readLength = (int) calculateReadLength(blob.length(), bytes.remaining(), file.position());
+            byte[] data = blob.getBytes(file.position() + 1, (int) readLength);
+            bytes.put(data);
+         }
+         connection.commit();
+         return readLength;
+      }
+      catch (Throwable e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Copy the data content of FileFrom to FileTo
+    *
+    * @param fileFrom
+    * @param fileTo
+    * @throws SQLException
+    */
+   public synchronized void copyFileData(JDBCSequentialFile fileFrom, JDBCSequentialFile
fileTo) throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         copyFileRecord.setInt(1, fileFrom.getId());
+         copyFileRecord.setInt(2, fileTo.getId());
+         copyFileRecord.executeUpdate();
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   /**
+    * Drop all tables and data
+    */
+   public synchronized void destroy() throws SQLException {
+      try {
+         connection.setAutoCommit(false);
+         Statement statement = connection.createStatement();
+         statement.executeUpdate(sqlProvider.getDropFileTableSQL());
+         connection.commit();
+      }
+      catch (SQLException e) {
+         connection.rollback();
+         throw e;
+      }
+   }
+
+   public long calculateReadLength(long objectLength, int bufferSpace, long readPosition)
{
+      long bytesRemaining = objectLength - readPosition;
+      if (bytesRemaining > bufferSpace) {
+         return bufferSpace;
+      }
+      else {
+         return bytesRemaining;
+      }
+   }
+
+   public int getMaxSize() {
+      return sqlProvider.getMaxBlobSize();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
index 1e6f393..6c05112 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/journal/JDBCJournalImpl.java
@@ -95,6 +95,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
       createTable(sqlProvider.getCreateJournalTableSQL());
    }
 
+   @Override
    protected void prepareStatements() throws SQLException {
       insertJournalRecords = connection.prepareStatement(sqlProvider.getInsertJournalRecordsSQL());
       selectJournalRecords = connection.prepareStatement(sqlProvider.getSelectJournalRecordsSQL());
@@ -104,7 +105,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal
{
    }
 
    @Override
-   public synchronized void stop() throws Exception {
+   public synchronized void stop() throws SQLException {
       if (started) {
          synchronized (journalLock) {
             syncTimer.cancel();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ba4d6a5..d8e4f11 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,14 @@
          </dependency>
 
          <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>9.4-1205-jdbc4</version>
+            <scope>provided</scope>
+            <!-- postgresql license -->
+         </dependency>
+
+         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections-testframework</artifactId>
             <version>${commons.collections.version}</version>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/79904aeb/tests/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 90f5425..2d62ce0 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -240,10 +240,18 @@
          <artifactId>jboss-javaee</artifactId>
          <version>5.0.0.GA</version>
       </dependency>
+
+      <!-- DB Test Deps -->
       <dependency>
          <groupId>org.apache.derby</groupId>
          <artifactId>derby</artifactId>
-         <version>${apache.derby.version}</version>
+         <scope>test</scope>
+      </dependency>
+
+      <dependency>
+         <groupId>org.postgresql</groupId>
+         <artifactId>postgresql</artifactId>
+         <scope>test</scope>
       </dependency>
       <!--Vertx provided dependencies-->
       <dependency>


Mime
View raw message