sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject [2/5] sqoop git commit: SQOOP-1589: Sqoop2: Create common constants, error codes, and queries
Date Tue, 11 Nov 2014 00:37:23 GMT
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5de4b437/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 059a827..4d99a52 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -29,8 +29,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -38,47 +36,21 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.DirectionError;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.common.SupportedDirections;
 import org.apache.sqoop.connector.ConnectorHandler;
 import org.apache.sqoop.connector.ConnectorManagerUtils;
-import org.apache.sqoop.driver.Driver;
-import org.apache.sqoop.model.MBooleanInput;
-import org.apache.sqoop.model.MConfig;
-import org.apache.sqoop.model.MConfigType;
-import org.apache.sqoop.model.MConfigurableType;
-import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MDriver;
-import org.apache.sqoop.model.MDriverConfig;
-import org.apache.sqoop.model.MEnumInput;
-import org.apache.sqoop.model.MFromConfig;
-import org.apache.sqoop.model.MInput;
-import org.apache.sqoop.model.MInputType;
-import org.apache.sqoop.model.MIntegerInput;
-import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MLink;
-import org.apache.sqoop.model.MLinkConfig;
-import org.apache.sqoop.model.MMapInput;
-import org.apache.sqoop.model.MStringInput;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.model.MToConfig;
 import org.apache.sqoop.repository.JdbcRepositoryContext;
-import org.apache.sqoop.repository.JdbcRepositoryHandler;
-import org.apache.sqoop.submission.SubmissionStatus;
-import org.apache.sqoop.submission.counter.Counter;
-import org.apache.sqoop.submission.counter.CounterGroup;
-import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.repository.common.CommonRepoConstants;
+import org.apache.sqoop.repository.common.CommonRepositoryHandler;
 
 /**
  * JDBC based repository handler for Derby database.
  *
  * Repository implementation for Derby database.
  */
-public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
+public class DerbyRepositoryHandler extends CommonRepositoryHandler {
 
   private static final Logger LOG =
       Logger.getLogger(DerbyRepositoryHandler.class);
@@ -100,167 +72,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void registerConnector(MConnector mc, Connection conn) {
-    if (mc.hasPersistenceId()) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
-        mc.getUniqueName());
-    }
-    mc.setPersistenceId(insertAndGetConnectorId(mc, conn));
-    insertConfigsForConnector(mc, conn);
-  }
-
-  /**
-   * Helper method to insert the configs from the  into the
-   * repository.
-   * @param mDriver The driver instance to use to upgrade.
-   * @param conn JDBC link to use for updating the configs
-   */
-  private void insertConfigsForDriver(MDriver mDriver, Connection conn) {
-    PreparedStatement baseConfigStmt = null;
-    PreparedStatement baseInputStmt = null;
-    try{
-      baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
-        Statement.RETURN_GENERATED_KEYS);
-
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
-        Statement.RETURN_GENERATED_KEYS);
-
-      // Register the job config type, since driver config is per job
-      registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(),
-        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
-
-    } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mDriver.toString(), ex);
-    } finally {
-      closeStatements(baseConfigStmt, baseInputStmt);
-    }
-  }
-
-  /**
-   * Helper method to insert the configs from the MConnector into the
-   * repository. The job and connector configs within <code>mc</code> will get
-   * updated with the id of the configs when this function returns.
-   * @param mc The connector to use for updating configs
-   * @param conn JDBC connection to use for inserting the configs
-   */
-  private void insertConfigsForConnector (MConnector mc, Connection conn) {
-    long connectorId = mc.getPersistenceId();
-    PreparedStatement baseConfigStmt = null;
-    PreparedStatement baseInputStmt = null;
-    try{
-      baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
-        Statement.RETURN_GENERATED_KEYS);
-
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
-        Statement.RETURN_GENERATED_KEYS);
-
-      // Register link type config
-      registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(),
-      MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
-
-      // Register both from/to job type config for connector
-      if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
-        registerConfigs(connectorId, Direction.FROM, mc.getFromConfig().getConfigs(),
-            MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
-      }
-      if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
-        registerConfigs(connectorId, Direction.TO, mc.getToConfig().getConfigs(),
-            MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
-      }
-    } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
-        mc.toString(), ex);
-    } finally {
-      closeStatements(baseConfigStmt, baseInputStmt);
-    }
-
-  }
-
-  private void insertConnectorDirection(Long connectorId, Direction direction, Connection conn)
-      throws SQLException {
-    PreparedStatement stmt = null;
-
-    try {
-      stmt = conn.prepareStatement(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS);
-      stmt.setLong(1, connectorId);
-      stmt.setLong(2, getDirection(direction, conn));
-
-      if (stmt.executeUpdate() != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0049);
-      }
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  private void insertConnectorDirections(Long connectorId, SupportedDirections directions, Connection conn)
-      throws SQLException {
-    if (directions.isDirectionSupported(Direction.FROM)) {
-      insertConnectorDirection(connectorId, Direction.FROM, conn);
-    }
-
-    if (directions.isDirectionSupported(Direction.TO)) {
-      insertConnectorDirection(connectorId, Direction.TO, conn);
-    }
-  }
-
-  private long insertAndGetConnectorId(MConnector mc, Connection conn) {
-    PreparedStatement baseConnectorStmt = null;
-    try {
-      baseConnectorStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
-          Statement.RETURN_GENERATED_KEYS);
-      baseConnectorStmt.setString(1, mc.getUniqueName());
-      baseConnectorStmt.setString(2, mc.getClassName());
-      baseConnectorStmt.setString(3, mc.getVersion());
-      baseConnectorStmt.setString(4, mc.getType().name());
-
-      int baseConnectorCount = baseConnectorStmt.executeUpdate();
-      if (baseConnectorCount != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
-            Integer.toString(baseConnectorCount));
-      }
-
-      ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
-
-      if (!rsetConnectorId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-      // connector configurable also have directions
-      insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn);
-      return rsetConnectorId.getLong(1);
-    } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mc.toString(), ex);
-    } finally {
-      closeStatements(baseConnectorStmt);
-    }
-  }
-
-  private long insertAndGetDriverId(MDriver mDriver, Connection conn) {
-    PreparedStatement baseDriverStmt = null;
-    try {
-      baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
-          Statement.RETURN_GENERATED_KEYS);
-      baseDriverStmt.setString(1, mDriver.getUniqueName());
-      baseDriverStmt.setString(2, Driver.getClassName());
-      baseDriverStmt.setString(3, mDriver.getVersion());
-      baseDriverStmt.setString(4, mDriver.getType().name());
-
-      int baseDriverCount = baseDriverStmt.executeUpdate();
-      if (baseDriverCount != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0012, Integer.toString(baseDriverCount));
-      }
-
-      ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
-
-      if (!rsetDriverId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-      return rsetDriverId.getLong(1);
-    } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0050, mDriver.toString(), ex);
-    } finally {
-      closeStatements(baseDriverStmt);
-    }
+  public String name() {
+    return "Derby";
   }
 
   /**
@@ -301,7 +114,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
           // Shutdown for one db instance is expected to raise SQL STATE 45000
           if (ex.getErrorCode() != 45000) {
             throw new SqoopException(
-                DerbyRepoError.DERBYREPO_0002, shutDownUrl, ex);
+                DerbyRepoError.DERBYREPO_0001, shutDownUrl, ex);
           }
           LOG.info("Embedded Derby shutdown raised SQL STATE "
               + "45000 as expected.");
@@ -350,7 +163,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       }
 
     } catch (SQLException e) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0041, e);
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0006, e);
     } finally {
       closeResultSets(rs);
     }
@@ -360,7 +173,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
       // NOTE: Since we can different types of version stored in system table, we renamed the
       // key name for the repository version from "version" to "repository.version" for clarity
       stmt = conn.prepareStatement(STMT_SELECT_DEPRECATED_OR_NEW_SYSTEM_VERSION);
-      stmt.setString(1, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION);
+      stmt.setString(1, CommonRepoConstants.SYSKEY_VERSION);
       stmt.setString(2, DerbyRepoConstants.SYSKEY_VERSION);
       rs = stmt.executeQuery();
 
@@ -537,8 +350,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
   private void upgradeRepositoryVersion(Connection conn) {
     // remove the deprecated sys version
     runQuery(STMT_DELETE_SYSTEM, conn, DerbyRepoConstants.SYSKEY_VERSION);
-    runQuery(STMT_DELETE_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION);
-    runQuery(STMT_INSERT_SYSTEM, conn, DerbyRepoConstants.SYSKEY_DERBY_REPOSITORY_VERSION, ""
+    runQuery(STMT_DELETE_SYSTEM, conn, CommonRepoConstants.SYSKEY_VERSION);
+    runQuery(STMT_INSERT_SYSTEM, conn, CommonRepoConstants.SYSKEY_VERSION, ""
         + DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION);
   }
   /**
@@ -556,7 +369,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
         insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
         insertDirectionStmt.setString(1, direction.toString());
         if (insertDirectionStmt.executeUpdate() != 1) {
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0046, "Could not add directions FROM and TO.");
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0007, "Could not add directions FROM and TO.");
         }
 
         ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
@@ -567,7 +380,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
           directionMap.put(direction, directionId.getLong(1));
         } else {
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0047, "Could not get ID of direction " + direction);
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0008, "Could not get ID of direction " + direction);
         }
       }
     } catch (SQLException e) {
@@ -779,7 +592,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
             }
           }
         } catch (SQLException e) {
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+          throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
         }
 
         break;
@@ -817,13 +630,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       result = stmt.executeUpdate();
       if (result != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0003,
             Integer.toString(result));
       }
       ResultSet rsetConnectionId = stmt.getGeneratedKeys();
 
       if (!rsetConnectionId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
       }
 
       if (LOG.isTraceEnabled()) {
@@ -832,174 +645,12 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
       return rsetConnectionId.getLong(1);
     } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean isRespositorySuitableForUse(Connection conn) {
-    // TODO(jarcec): Verify that all structures are present (e.g. something like corruption validation)
-    // NOTE: At this point is is just checking if the repo version matches the version
-    // in the upgraded code
-    return detectRepositoryVersion(conn) == DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public MConnector findConnector(String shortName, Connection conn) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Looking up connector: " + shortName);
-    }
-    MConnector mc = null;
-    PreparedStatement connectorFetchStmt = null;
-    try {
-      connectorFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
-      connectorFetchStmt.setString(1, shortName);
-
-      List<MConnector> connectors = loadConnectors(connectorFetchStmt, conn);
-
-      if (connectors.size() == 0) {
-        LOG.debug("No connector found by name: " + shortName);
-        return null;
-      } else if (connectors.size() == 1) {
-        LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
-        return connectors.get(0);
-      } else {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0005, shortName);
-      }
-
-    } catch (SQLException ex) {
-      logException(ex, shortName);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0004, shortName, ex);
-    } finally {
-      closeStatements(connectorFetchStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<MConnector> findConnectors(Connection conn) {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE);
-      stmt.setString(1, MConfigurableType.CONNECTOR.name());
-
-      return loadConnectors(stmt,conn);
-    } catch (SQLException ex) {
-      logException(ex);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0045, ex);
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0005, ex);
     } finally {
       closeStatements(stmt);
     }
   }
 
-   /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void registerDriver(MDriver mDriver, Connection conn) {
-    if (mDriver.hasPersistenceId()) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0011, mDriver.getUniqueName());
-    }
-    mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn));
-    insertConfigsforDriver(mDriver, conn);
-  }
-
-  private void insertConfigsforDriver(MDriver mDriver, Connection conn) {
-    PreparedStatement baseConfigStmt = null;
-    PreparedStatement baseInputStmt = null;
-    try {
-      baseConfigStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIG,
-          Statement.RETURN_GENERATED_KEYS);
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INTO_INPUT,
-          Statement.RETURN_GENERATED_KEYS);
-
-      // Register a driver config as a job type with no owner/connector and direction
-      registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(),
-        MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
-
-    } catch (SQLException ex) {
-      logException(ex, mDriver);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0014, ex);
-    } finally {
-      closeStatements(baseConfigStmt, baseInputStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public MDriver findDriver(String shortName, Connection conn) {
-    LOG.debug("Looking up Driver and config ");
-    PreparedStatement driverFetchStmt = null;
-    PreparedStatement driverConfigFetchStmt = null;
-    PreparedStatement driverConfigInputFetchStmt = null;
-
-    MDriver mDriver;
-    try {
-      driverFetchStmt = conn.prepareStatement(STMT_SELECT_FROM_CONFIGURABLE);
-      driverFetchStmt.setString(1, shortName);
-
-      ResultSet rsDriverSet = driverFetchStmt.executeQuery();
-      if (!rsDriverSet.next()) {
-        return null;
-      }
-      Long driverId = rsDriverSet.getLong(1);
-      String driverVersion = rsDriverSet.getString(4);
-
-      driverConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
-      driverConfigFetchStmt.setLong(1, driverId);
-
-      driverConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
-      List<MConfig> driverConfigs = new ArrayList<MConfig>();
-      loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1);
-
-      if (driverConfigs.isEmpty()) {
-        return null;
-      }
-      mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion);
-      mDriver.setPersistenceId(driverId);
-
-    } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0004, "Driver", ex);
-    } finally {
-      if (driverConfigFetchStmt != null) {
-        try {
-          driverConfigFetchStmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close driver config fetch statement", ex);
-        }
-      }
-      if (driverConfigInputFetchStmt != null) {
-        try {
-          driverConfigInputFetchStmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close driver input fetch statement", ex);
-        }
-      }
-      if (driverFetchStmt != null) {
-        try {
-          driverFetchStmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close driver fetch statement", ex);
-        }
-      }
-    }
-
-    LOG.debug("Looked up Driver and config");
-    return mDriver;
-  }
-
   /**
    * {@inheritDoc}
    */
@@ -1012,1871 +663,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public void createLink(MLink link, Connection conn) {
-    PreparedStatement stmt = null;
-    int result;
-    try {
-      stmt = conn.prepareStatement(STMT_INSERT_LINK,
-        Statement.RETURN_GENERATED_KEYS);
-      stmt.setString(1, link.getName());
-      stmt.setLong(2, link.getConnectorId());
-      stmt.setBoolean(3, link.getEnabled());
-      stmt.setString(4, link.getCreationUser());
-      stmt.setTimestamp(5, new Timestamp(link.getCreationDate().getTime()));
-      stmt.setString(6, link.getLastUpdateUser());
-      stmt.setTimestamp(7, new Timestamp(link.getLastUpdateDate().getTime()));
-
-      result = stmt.executeUpdate();
-      if (result != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
-          Integer.toString(result));
-      }
-
-      ResultSet rsetConnectionId = stmt.getGeneratedKeys();
-
-      if (!rsetConnectionId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-
-      long connectionId = rsetConnectionId.getLong(1);
-
-      createInputValues(STMT_INSERT_LINK_INPUT,
-        connectionId,
-        link.getConnectorLinkConfig().getConfigs(),
-        conn);
-      link.setPersistenceId(connectionId);
-
-    } catch (SQLException ex) {
-      logException(ex, link);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void updateLink(MLink link, Connection conn) {
-    PreparedStatement deleteStmt = null;
-    PreparedStatement updateStmt = null;
-    try {
-      // Firstly remove old values
-      deleteStmt = conn.prepareStatement(STMT_DELETE_LINK_INPUT);
-      deleteStmt.setLong(1, link.getPersistenceId());
-      deleteStmt.executeUpdate();
-
-      // Update LINK_CONFIG table
-      updateStmt = conn.prepareStatement(STMT_UPDATE_LINK);
-      updateStmt.setString(1, link.getName());
-      updateStmt.setString(2, link.getLastUpdateUser());
-      updateStmt.setTimestamp(3, new Timestamp(new Date().getTime()));
-
-      updateStmt.setLong(4, link.getPersistenceId());
-      updateStmt.executeUpdate();
-
-      // And reinsert new values
-      createInputValues(STMT_INSERT_LINK_INPUT,
-        link.getPersistenceId(),
-        link.getConnectorLinkConfig().getConfigs(),
-        conn);
-
-    } catch (SQLException ex) {
-      logException(ex, link);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0021, ex);
-    } finally {
-      closeStatements(deleteStmt, updateStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean existsLink(long id, Connection conn) {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_LINK_CHECK_BY_ID);
-      stmt.setLong(1, id);
-      rs = stmt.executeQuery();
-
-      // Should be always valid in query with count
-      rs.next();
-
-      return rs.getLong(1) == 1;
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0025, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  @Override
-  public boolean inUseLink(long connectionId, Connection conn) {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_JOBS_FOR_LINK_CHECK);
-      stmt.setLong(1, connectionId);
-      rs = stmt.executeQuery();
-
-      // Should be always valid in case of count(*) query
-      rs.next();
-
-      return rs.getLong(1) != 0;
-
-    } catch (SQLException e) {
-      logException(e, connectionId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0032, e);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  @Override
-  public void enableLink(long connectionId, boolean enabled, Connection conn) {
-    PreparedStatement enableConn = null;
-
-    try {
-      enableConn = conn.prepareStatement(STMT_ENABLE_LINK);
-      enableConn.setBoolean(1, enabled);
-      enableConn.setLong(2, connectionId);
-      enableConn.executeUpdate();
-    } catch (SQLException ex) {
-      logException(ex, connectionId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0042, ex);
-    } finally {
-      closeStatements(enableConn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deleteLink(long id, Connection conn) {
-    PreparedStatement dltConn = null;
-
-    try {
-      deleteLinkInputs(id, conn);
-      dltConn = conn.prepareStatement(STMT_DELETE_LINK);
-      dltConn.setLong(1, id);
-      dltConn.executeUpdate();
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
-    } finally {
-      closeStatements(dltConn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deleteLinkInputs(long id, Connection conn) {
-    PreparedStatement dltConnInput = null;
-    try {
-      dltConnInput = conn.prepareStatement(STMT_DELETE_LINK_INPUT);
-      dltConnInput.setLong(1, id);
-      dltConnInput.executeUpdate();
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
-    } finally {
-      closeStatements(dltConnInput);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public MLink findLink(long linkId, Connection conn) {
-    PreparedStatement linkFetchStmt = null;
-    try {
-      linkFetchStmt = conn.prepareStatement(STMT_SELECT_LINK_SINGLE);
-      linkFetchStmt.setLong(1, linkId);
-
-      List<MLink> links = loadLinks(linkFetchStmt, conn);
-
-      if (links.size() != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0024, "Couldn't find link with id "
-            + linkId);
-      }
-
-      // Return the first and only one link object with the given id
-      return links.get(0);
-
-    } catch (SQLException ex) {
-      logException(ex, linkId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
-    } finally {
-      closeStatements(linkFetchStmt);
-    }
-  }
-
-  @Override
-  public MLink findLink(String linkName, Connection conn) {
-    PreparedStatement linkFetchStmt = null;
-    try {
-      linkFetchStmt = conn.prepareStatement(STMT_SELECT_LINK_SINGLE_BY_NAME);
-      linkFetchStmt.setString(1, linkName);
-
-      List<MLink> links = loadLinks(linkFetchStmt, conn);
-
-      if (links.size() != 1) {
-        return null;
-      }
-
-      // Return the first and only one link object with the given name
-      return links.get(0);
-
-    } catch (SQLException ex) {
-      logException(ex, linkName);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
-    } finally {
-      closeStatements(linkFetchStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<MLink> findLinks(Connection conn) {
-    PreparedStatement linksFetchStmt = null;
-    try {
-      linksFetchStmt = conn.prepareStatement(STMT_SELECT_LINK_ALL);
-
-      return loadLinks(linksFetchStmt, conn);
-
-    } catch (SQLException ex) {
-      logException(ex);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
-    } finally {
-      closeStatements(linksFetchStmt);
-    }
-  }
-
-
-  /**
-   *
-   * {@inheritDoc}
-   *
-   */
-  @Override
-  public List<MLink> findLinksForConnector(long connectorId, Connection conn) {
-    PreparedStatement linkByConnectorFetchStmt = null;
-    try {
-      linkByConnectorFetchStmt = conn.prepareStatement(STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE);
-      linkByConnectorFetchStmt.setLong(1, connectorId);
-      return loadLinks(linkByConnectorFetchStmt, conn);
-    } catch (SQLException ex) {
-      logException(ex, connectorId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
-    } finally {
-      closeStatements(linkByConnectorFetchStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) {
-    updateConnectorAndDeleteConfigs(mConnector, conn);
-    insertConfigsForConnector(mConnector, conn);
-  }
-
-  private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection conn) {
-    PreparedStatement updateConnectorStatement = null;
-    PreparedStatement deleteConfig = null;
-    PreparedStatement deleteInput = null;
-    try {
-      updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
-      deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
-      deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
-      updateConnectorStatement.setString(1, mConnector.getUniqueName());
-      updateConnectorStatement.setString(2, mConnector.getClassName());
-      updateConnectorStatement.setString(3, mConnector.getVersion());
-      updateConnectorStatement.setString(4, mConnector.getType().name());
-      updateConnectorStatement.setLong(5, mConnector.getPersistenceId());
-
-      if (updateConnectorStatement.executeUpdate() != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
-      }
-      deleteInput.setLong(1, mConnector.getPersistenceId());
-      deleteConfig.setLong(1, mConnector.getPersistenceId());
-      deleteInput.executeUpdate();
-      deleteConfig.executeUpdate();
-
-    } catch (SQLException e) {
-      logException(e, mConnector);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
-    } finally {
-      closeStatements(updateConnectorStatement, deleteConfig, deleteInput);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) {
-    updateDriverAndDeleteConfigs(mDriver, conn);
-    insertConfigsForDriver(mDriver, conn);
-  }
-
-  private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
-    PreparedStatement updateDriverStatement = null;
-    PreparedStatement deleteConfig = null;
-    PreparedStatement deleteInput = null;
-    try {
-      updateDriverStatement = conn.prepareStatement(STMT_UPDATE_CONFIGURABLE);
-      deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
-      deleteConfig = conn.prepareStatement(STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
-      updateDriverStatement.setString(1, mDriver.getUniqueName());
-      updateDriverStatement.setString(2, Driver.getClassName());
-      updateDriverStatement.setString(3, mDriver.getVersion());
-      updateDriverStatement.setString(4, mDriver.getType().name());
-      updateDriverStatement.setLong(5, mDriver.getPersistenceId());
-
-      if (updateDriverStatement.executeUpdate() != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
-      }
-      deleteInput.setLong(1, mDriver.getPersistenceId());
-      deleteConfig.setLong(1, mDriver.getPersistenceId());
-      deleteInput.executeUpdate();
-      deleteConfig.executeUpdate();
-
-    } catch (SQLException e) {
-      logException(e, mDriver);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0044, e);
-    } finally {
-      closeStatements(updateDriverStatement, deleteConfig, deleteInput);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void createJob(MJob job, Connection conn) {
-    PreparedStatement stmt = null;
-    int result;
-    try {
-      stmt = conn.prepareStatement(STMT_INSERT_JOB, Statement.RETURN_GENERATED_KEYS);
-      stmt.setString(1, job.getName());
-      stmt.setLong(2, job.getLinkId(Direction.FROM));
-      stmt.setLong(3, job.getLinkId(Direction.TO));
-      stmt.setBoolean(4, job.getEnabled());
-      stmt.setString(5, job.getCreationUser());
-      stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime()));
-      stmt.setString(7, job.getLastUpdateUser());
-      stmt.setTimestamp(8, new Timestamp(job.getLastUpdateDate().getTime()));
-
-      result = stmt.executeUpdate();
-      if (result != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
-          Integer.toString(result));
-      }
-
-      ResultSet rsetJobId = stmt.getGeneratedKeys();
-
-      if (!rsetJobId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-
-      long jobId = rsetJobId.getLong(1);
-
-      // from config for the job
-      createInputValues(STMT_INSERT_JOB_INPUT,
-                        jobId,
-                        job.getJobConfig(Direction.FROM).getConfigs(),
-                        conn);
-      // to config for the job
-      createInputValues(STMT_INSERT_JOB_INPUT,
-                        jobId,
-                        job.getJobConfig(Direction.TO).getConfigs(),
-                        conn);
-      // driver config per job
-      createInputValues(STMT_INSERT_JOB_INPUT,
-                        jobId,
-                        job.getDriverConfig().getConfigs(),
-                        conn);
-
-      job.setPersistenceId(jobId);
-
-    } catch (SQLException ex) {
-      logException(ex, job);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0026, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void updateJob(MJob job, Connection conn) {
-    PreparedStatement deleteStmt = null;
-    PreparedStatement updateStmt = null;
-    try {
-      // Firstly remove old values
-      deleteStmt = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
-      deleteStmt.setLong(1, job.getPersistenceId());
-      deleteStmt.executeUpdate();
-
-      // Update job table
-      updateStmt = conn.prepareStatement(STMT_UPDATE_JOB);
-      updateStmt.setString(1, job.getName());
-      updateStmt.setString(2, job.getLastUpdateUser());
-      updateStmt.setTimestamp(3, new Timestamp(new Date().getTime()));
-
-      updateStmt.setLong(4, job.getPersistenceId());
-      updateStmt.executeUpdate();
-
-      // And reinsert new values
-      createInputValues(STMT_INSERT_JOB_INPUT,
-                        job.getPersistenceId(),
-                        job.getJobConfig(Direction.FROM).getConfigs(),
-                        conn);
-      createInputValues(STMT_INSERT_JOB_INPUT,
-                        job.getPersistenceId(),
-                        job.getJobConfig(Direction.TO).getConfigs(),
-                        conn);
-      createInputValues(STMT_INSERT_JOB_INPUT,
-          job.getPersistenceId(),
-          job.getDriverConfig().getConfigs(),
-          conn);
-
-    } catch (SQLException ex) {
-      logException(ex, job);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0027, ex);
-    } finally {
-      closeStatements(deleteStmt, updateStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean existsJob(long id, Connection conn) {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_JOB_CHECK_BY_ID);
-      stmt.setLong(1, id);
-      rs = stmt.executeQuery();
-
-      // Should be always valid in query with count
-      rs.next();
-
-      return rs.getLong(1) == 1;
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0029, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  @Override
-  public boolean inUseJob(long jobId, Connection conn) {
-    MSubmission submission = findLastSubmissionForJob(jobId, conn);
-
-    // We have no submissions and thus job can't be in use
-    if(submission == null) {
-      return false;
-    }
-
-    // We can't remove running job
-    if(submission.getStatus().isRunning()) {
-      return true;
-    }
-
-    return false;
-  }
-
-  @Override
-  public void enableJob(long jobId, boolean enabled, Connection conn) {
-    PreparedStatement enableConn = null;
-
-    try {
-      enableConn = conn.prepareStatement(STMT_ENABLE_JOB);
-      enableConn.setBoolean(1, enabled);
-      enableConn.setLong(2, jobId);
-      enableConn.executeUpdate();
-    } catch (SQLException ex) {
-      logException(ex, jobId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0043, ex);
-    } finally {
-      closeStatements(enableConn);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deleteJob(long id, Connection conn) {
-    PreparedStatement dlt = null;
-    try {
-      deleteJobInputs(id, conn);
-      dlt = conn.prepareStatement(STMT_DELETE_JOB);
-      dlt.setLong(1, id);
-      dlt.executeUpdate();
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
-    } finally {
-      closeStatements(dlt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deleteJobInputs(long id, Connection conn) {
-    PreparedStatement dltInput = null;
-    try {
-      dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
-      dltInput.setLong(1, id);
-      dltInput.executeUpdate();
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
-    } finally {
-      closeStatements(dltInput);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public MJob findJob(long id, Connection conn) {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_ID);
-      stmt.setLong(1, id);
-
-      List<MJob> jobs = loadJobs(stmt, conn);
-
-      if (jobs.size() != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0030, "Couldn't find job with id "
-            + id);
-      }
-
-      // Return the first and only one link object
-      return jobs.get(0);
-
-    } catch (SQLException ex) {
-      logException(ex, id);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public MJob findJob(String name, Connection conn) {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_JOB_SINGLE_BY_NAME);
-      stmt.setString(1, name);
-
-      List<MJob> jobs = loadJobs(stmt, conn);
-
-      if (jobs.size() != 1) {
-        return null;
-      }
-
-      // Return the first and only one link object
-      return jobs.get(0);
-
-    } catch (SQLException ex) {
-      logException(ex, name);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<MJob> findJobs(Connection conn) {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_JOB);
-
-      return loadJobs(stmt, conn);
-
-    } catch (SQLException ex) {
-      logException(ex);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE);
-      stmt.setLong(1, connectorId);
-      stmt.setLong(2, connectorId);
-      return loadJobs(stmt, conn);
-
-    } catch (SQLException ex) {
-      logException(ex, connectorId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void createSubmission(MSubmission submission, Connection conn) {
-    PreparedStatement stmt = null;
-    int result;
-    try {
-      stmt = conn.prepareStatement(STMT_INSERT_SUBMISSION,
-        Statement.RETURN_GENERATED_KEYS);
-      stmt.setLong(1, submission.getJobId());
-      stmt.setString(2, submission.getStatus().name());
-      stmt.setString(3, submission.getCreationUser());
-      stmt.setTimestamp(4, new Timestamp(submission.getCreationDate().getTime()));
-      stmt.setString(5, submission.getLastUpdateUser());
-      stmt.setTimestamp(6, new Timestamp(submission.getLastUpdateDate().getTime()));
-      stmt.setString(7, submission.getExternalId());
-      stmt.setString(8, submission.getExternalLink());
-      stmt.setString(9, submission.getExceptionInfo());
-      stmt.setString(10, submission.getExceptionStackTrace());
-
-      result = stmt.executeUpdate();
-      if (result != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
-          Integer.toString(result));
-      }
-
-      ResultSet rsetSubmissionId = stmt.getGeneratedKeys();
-
-      if (!rsetSubmissionId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-
-      long submissionId = rsetSubmissionId.getLong(1);
-
-      if(submission.getCounters() != null) {
-        createSubmissionCounters(submissionId, submission.getCounters(), conn);
-      }
-
-      // Save created persistence id
-      submission.setPersistenceId(submissionId);
-
-    } catch (SQLException ex) {
-      logException(ex, submission);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0034, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean existsSubmission(long submissionId, Connection conn) {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_SUBMISSION_CHECK);
-      stmt.setLong(1, submissionId);
-      rs = stmt.executeQuery();
-
-      // Should be always valid in query with count
-      rs.next();
-
-      return rs.getLong(1) == 1;
-    } catch (SQLException ex) {
-      logException(ex, submissionId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0033, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void updateSubmission(MSubmission submission, Connection conn) {
-    PreparedStatement stmt = null;
-    PreparedStatement deleteStmt = null;
-    try {
-      //  Update properties in main table
-      stmt = conn.prepareStatement(STMT_UPDATE_SUBMISSION);
-      stmt.setString(1, submission.getStatus().name());
-      stmt.setString(2, submission.getLastUpdateUser());
-      stmt.setTimestamp(3, new Timestamp(submission.getLastUpdateDate().getTime()));
-      stmt.setString(4, submission.getExceptionInfo());
-      stmt.setString(5, submission.getExceptionStackTrace());
-
-      stmt.setLong(6, submission.getPersistenceId());
-      stmt.executeUpdate();
-
-      // Delete previous counters
-      deleteStmt = conn.prepareStatement(STMT_DELETE_COUNTER_SUBMISSION);
-      deleteStmt.setLong(1, submission.getPersistenceId());
-      deleteStmt.executeUpdate();
-
-      // Reinsert new counters if needed
-      if(submission.getCounters() != null) {
-        createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn);
-      }
-
-    } catch (SQLException ex) {
-      logException(ex, submission);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0035, ex);
-    } finally {
-      closeStatements(stmt, deleteStmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void purgeSubmissions(Date threshold, Connection conn) {
-     PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_PURGE_SUBMISSIONS);
-      stmt.setTimestamp(1, new Timestamp(threshold.getTime()));
-      stmt.executeUpdate();
-
-    } catch (SQLException ex) {
-      logException(ex, threshold);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0036, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<MSubmission> findUnfinishedSubmissions(Connection conn) {
-    List<MSubmission> submissions = new LinkedList<MSubmission>();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_SUBMISSION_UNFINISHED);
-
-      for(SubmissionStatus status : SubmissionStatus.unfinished()) {
-        stmt.setString(1, status.name());
-        rs = stmt.executeQuery();
-
-        while(rs.next()) {
-          submissions.add(loadSubmission(rs, conn));
-        }
-
-        rs.close();
-        rs = null;
-      }
-    } catch (SQLException ex) {
-      logException(ex);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0037, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-
-    return submissions;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<MSubmission> findSubmissions(Connection conn) {
-    List<MSubmission> submissions = new LinkedList<MSubmission>();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_SUBMISSIONS);
-      rs = stmt.executeQuery();
-
-      while(rs.next()) {
-        submissions.add(loadSubmission(rs, conn));
-      }
-
-      rs.close();
-      rs = null;
-    } catch (SQLException ex) {
-      logException(ex);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0039, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-
-    return submissions;
-  }
-
-  @Override
-  public List<MSubmission> findSubmissionsForJob(long jobId, Connection conn) {
-    List<MSubmission> submissions = new LinkedList<MSubmission>();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_SUBMISSIONS_FOR_JOB);
-      stmt.setLong(1, jobId);
-      rs = stmt.executeQuery();
-
-      while(rs.next()) {
-        submissions.add(loadSubmission(rs, conn));
-      }
-
-      rs.close();
-      rs = null;
-    } catch (SQLException ex) {
-      logException(ex);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0040, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-
-    return submissions;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public MSubmission findLastSubmissionForJob(long jobId, Connection conn) {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_SUBMISSIONS_FOR_JOB);
-      stmt.setLong(1, jobId);
-      stmt.setMaxRows(1);
-      rs = stmt.executeQuery();
-
-      if(!rs.next()) {
-        return null;
-      }
-
-      return loadSubmission(rs, conn);
-    } catch (SQLException ex) {
-      logException(ex, jobId);
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0040, ex);
-    } finally {
-      closeResultSets(rs);
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * Stores counters for given submission in repository.
-   *
-   * @param submissionId Submission id
-   * @param counters Counters that should be stored
-   * @param conn Connection to derby repository
-   * @throws SQLException
-   */
-  private void createSubmissionCounters(long submissionId, Counters counters, Connection conn) throws SQLException {
-    PreparedStatement stmt = null;
-
-    try {
-      stmt = conn.prepareStatement(STMT_INSERT_COUNTER_SUBMISSION);
-
-      for(CounterGroup group : counters) {
-        long groupId = getCounterGroupId(group, conn);
-
-        for(Counter counter: group) {
-          long counterId = getCounterId(counter, conn);
-
-          stmt.setLong(1, groupId);
-          stmt.setLong(2, counterId);
-          stmt.setLong(3, submissionId);
-          stmt.setLong(4, counter.getValue());
-
-          stmt.executeUpdate();
-        }
-      }
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * Resolves counter group database id.
-   *
-   * @param group Given group
-   * @param conn Connection to database
-   * @return Id
-   * @throws SQLException
-   */
-  private long getCounterGroupId(CounterGroup group, Connection conn) throws SQLException {
-    PreparedStatement select = null;
-    PreparedStatement insert = null;
-    ResultSet rsSelect = null;
-    ResultSet rsInsert = null;
-
-    try {
-      select = conn.prepareStatement(STMT_SELECT_COUNTER_GROUP);
-      select.setString(1, group.getName());
-
-      rsSelect = select.executeQuery();
-
-      if(rsSelect.next()) {
-        return rsSelect.getLong(1);
-      }
-
-      insert = conn.prepareStatement(STMT_INSERT_COUNTER_GROUP, Statement.RETURN_GENERATED_KEYS);
-      insert.setString(1, group.getName());
-      insert.executeUpdate();
-
-      rsInsert = insert.getGeneratedKeys();
-
-      if (!rsInsert.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-
-      return rsInsert.getLong(1);
-    } finally {
-      closeResultSets(rsSelect, rsInsert);
-      closeStatements(select, insert);
-    }
-  }
-
-  /**
-   * Resolves counter id.
-   *
-   * @param counter Given counter
-   * @param conn Connection to database
-   * @return Id
-   * @throws SQLException
-   */
-  private long getCounterId(Counter counter, Connection conn) throws SQLException {
-    PreparedStatement select = null;
-    PreparedStatement insert = null;
-    ResultSet rsSelect = null;
-    ResultSet rsInsert = null;
-
-    try {
-      select = conn.prepareStatement(STMT_SELECT_COUNTER);
-      select.setString(1, counter.getName());
-
-      rsSelect = select.executeQuery();
-
-      if(rsSelect.next()) {
-        return rsSelect.getLong(1);
-      }
-
-      insert = conn.prepareStatement(STMT_INSERT_COUNTER, Statement.RETURN_GENERATED_KEYS);
-      insert.setString(1, counter.getName());
-      insert.executeUpdate();
-
-      rsInsert = insert.getGeneratedKeys();
-
-      if (!rsInsert.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
-      }
-
-      return rsInsert.getLong(1);
-    } finally {
-      closeResultSets(rsSelect, rsInsert);
-      closeStatements(select, insert);
-    }
-  }
-
-  /**
-   * Create MSubmission structure from result set.
-   *
-   * @param rs Result set, only active row will be fetched
-   * @param conn Connection to database
-   * @return Created MSubmission structure
-   * @throws SQLException
-   */
-  private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLException {
-    MSubmission submission = new MSubmission();
-
-    submission.setPersistenceId(rs.getLong(1));
-    submission.setJobId(rs.getLong(2));
-    submission.setStatus(SubmissionStatus.valueOf(rs.getString(3)));
-    submission.setCreationUser(rs.getString(4));
-    submission.setCreationDate(rs.getTimestamp(5));
-    submission.setLastUpdateUser(rs.getString(6));
-    submission.setLastUpdateDate(rs.getTimestamp(7));
-    submission.setExternalId(rs.getString(8));
-    submission.setExternalLink(rs.getString(9));
-    submission.setExceptionInfo(rs.getString(10));
-    submission.setExceptionStackTrace(rs.getString(11));
-
-    Counters counters = loadCountersSubmission(rs.getLong(1), conn);
-    submission.setCounters(counters);
-
-    return submission;
-  }
-
-  private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_COUNTER_SUBMISSION);
-      stmt.setLong(1, submissionId);
-      rs = stmt.executeQuery();
-
-      Counters counters = new Counters();
-
-      while (rs.next()) {
-        String groupName = rs.getString(1);
-        String counterName = rs.getString(2);
-        long value = rs.getLong(3);
-
-        CounterGroup group = counters.getCounterGroup(groupName);
-        if (group == null) {
-          group = new CounterGroup(groupName);
-          counters.addCounterGroup(group);
-        }
-
-        group.addCounter(new Counter(counterName, value));
-      }
-
-      if (counters.isEmpty()) {
-        return null;
-      } else {
-        return counters;
-      }
-    } finally {
-      closeStatements(stmt);
-      closeResultSets(rs);
-    }
-  }
-
-  private Long getDirection(Direction direction, Connection conn) throws SQLException {
-    PreparedStatement directionStmt = null;
-    ResultSet rs = null;
-
-    try {
-      directionStmt = conn.prepareStatement(STMT_SELECT_SQD_ID_BY_SQD_NAME);
-      directionStmt.setString(1, direction.toString());
-      rs = directionStmt.executeQuery();
-
-      rs.next();
-      return rs.getLong(1);
-    } finally {
-      if (rs != null) {
-        closeResultSets(rs);
-      }
-      if (directionStmt != null) {
-        closeStatements(directionStmt);
-      }
-    }
-  }
-
-  private Direction getDirection(long directionId, Connection conn) throws SQLException {
-    PreparedStatement directionStmt = null;
-    ResultSet rs = null;
-
-    try {
-      directionStmt = conn.prepareStatement(STMT_SELECT_SQD_NAME_BY_SQD_ID);
-      directionStmt.setLong(1, directionId);
-      rs = directionStmt.executeQuery();
-
-      rs.next();
-      return Direction.valueOf(rs.getString(1));
-    } finally {
-      if (rs != null) {
-        closeResultSets(rs);
-      }
-      if (directionStmt != null) {
-        closeStatements(directionStmt);
-      }
-    }
-  }
-
-  private SupportedDirections findConnectorSupportedDirections(long connectorId, Connection conn) throws SQLException {
-    PreparedStatement connectorDirectionsStmt = null;
-    ResultSet rs = null;
-
-    boolean from = false, to = false;
-
-    try {
-      connectorDirectionsStmt = conn.prepareStatement(STMT_SELECT_SQ_CONNECTOR_DIRECTIONS);
-      connectorDirectionsStmt.setLong(1, connectorId);
-      rs = connectorDirectionsStmt.executeQuery();
-
-      while(rs.next()) {
-        switch(getDirection(rs.getLong(2), conn)) {
-          case FROM:
-            from = true;
-            break;
-
-          case TO:
-            to = true;
-            break;
-        }
-      }
-    } finally {
-      if (rs != null) {
-        closeResultSets(rs);
-      }
-      if (connectorDirectionsStmt != null) {
-        closeStatements(connectorDirectionsStmt);
-      }
-    }
-
-    return new SupportedDirections(from, to);
-  }
-
-  private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn) throws SQLException {
-    List<MConnector> connectors = new ArrayList<MConnector>();
-    ResultSet rsConnectors = null;
-    PreparedStatement connectorConfigFetchStmt = null;
-    PreparedStatement connectorConfigInputFetchStmt = null;
-
-    try {
-      rsConnectors = stmt.executeQuery();
-      connectorConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
-      connectorConfigInputFetchStmt = conn.prepareStatement(STMT_SELECT_INPUT);
-
-      while(rsConnectors.next()) {
-        long connectorId = rsConnectors.getLong(1);
-        String connectorName = rsConnectors.getString(2);
-        String connectorClassName = rsConnectors.getString(3);
-        String connectorVersion = rsConnectors.getString(4);
-
-        connectorConfigFetchStmt.setLong(1, connectorId);
-
-        List<MConfig> linkConfig = new ArrayList<MConfig>();
-        List<MConfig> fromConfig = new ArrayList<MConfig>();
-        List<MConfig> toConfig = new ArrayList<MConfig>();
-
-        loadConnectorConfigTypes(linkConfig, fromConfig, toConfig, connectorConfigFetchStmt,
-            connectorConfigInputFetchStmt, 1, conn);
-
-        SupportedDirections supportedDirections
-            = findConnectorSupportedDirections(connectorId, conn);
-        MFromConfig fromJobConfig = null;
-        MToConfig toJobConfig = null;
-        if (supportedDirections.isDirectionSupported(Direction.FROM)) {
-          fromJobConfig = new MFromConfig(fromConfig);
-        }
-        if (supportedDirections.isDirectionSupported(Direction.TO)) {
-          toJobConfig = new MToConfig(toConfig);
-        }
-        MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
-                                       new MLinkConfig(linkConfig), fromJobConfig, toJobConfig);
-        mc.setPersistenceId(connectorId);
-
-        connectors.add(mc);
-      }
-    } finally {
-      closeResultSets(rsConnectors);
-      closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt);
-    }
-    return connectors;
-  }
-
-  private List<MLink> loadLinks(PreparedStatement linkFetchStmt, Connection conn) throws SQLException {
-    List<MLink> links = new ArrayList<MLink>();
-    ResultSet rsConnection = null;
-    PreparedStatement connectorLinkConfigFetchStatement = null;
-    PreparedStatement connectorLinkConfigInputStatement = null;
-
-    try {
-      rsConnection = linkFetchStmt.executeQuery();
-      connectorLinkConfigFetchStatement = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
-      connectorLinkConfigInputStatement = conn.prepareStatement(STMT_FETCH_LINK_INPUT);
-
-      while(rsConnection.next()) {
-        long id = rsConnection.getLong(1);
-        String name = rsConnection.getString(2);
-        long connectorId = rsConnection.getLong(3);
-        boolean enabled = rsConnection.getBoolean(4);
-        String creationUser = rsConnection.getString(5);
-        Date creationDate = rsConnection.getTimestamp(6);
-        String updateUser = rsConnection.getString(7);
-        Date lastUpdateDate = rsConnection.getTimestamp(8);
-
-        connectorLinkConfigFetchStatement.setLong(1, connectorId);
-        connectorLinkConfigInputStatement.setLong(1, id);
-        connectorLinkConfigInputStatement.setLong(3, id);
-
-        List<MConfig> connectorLinkConfig = new ArrayList<MConfig>();
-        List<MConfig> fromConfig = new ArrayList<MConfig>();
-        List<MConfig> toConfig = new ArrayList<MConfig>();
-
-        loadConnectorConfigTypes(connectorLinkConfig, fromConfig, toConfig, connectorLinkConfigFetchStatement,
-            connectorLinkConfigInputStatement, 2, conn);
-        MLink link = new MLink(connectorId, new MLinkConfig(connectorLinkConfig));
-
-        link.setPersistenceId(id);
-        link.setName(name);
-        link.setCreationUser(creationUser);
-        link.setCreationDate(creationDate);
-        link.setLastUpdateUser(updateUser);
-        link.setLastUpdateDate(lastUpdateDate);
-        link.setEnabled(enabled);
-
-        links.add(link);
-      }
-    } finally {
-      closeResultSets(rsConnection);
-      closeStatements(connectorLinkConfigFetchStatement, connectorLinkConfigInputStatement);
-    }
-
-    return links;
-  }
-
-  private List<MJob> loadJobs(PreparedStatement stmt,
-                              Connection conn)
-                              throws SQLException {
-    List<MJob> jobs = new ArrayList<MJob>();
-    ResultSet rsJob = null;
-    PreparedStatement fromConfigFetchStmt = null;
-    PreparedStatement toConfigFetchStmt = null;
-    PreparedStatement driverConfigfetchStmt = null;
-    PreparedStatement jobInputFetchStmt = null;
-
-    try {
-      rsJob = stmt.executeQuery();
-      // Note: Job does not hold a explicit reference to the driver since every
-      // job has the same driver
-      long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId();
-      fromConfigFetchStmt  = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
-      toConfigFetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
-      driverConfigfetchStmt = conn.prepareStatement(STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
-      jobInputFetchStmt = conn.prepareStatement(STMT_FETCH_JOB_INPUT);
-
-      while(rsJob.next()) {
-        long fromConnectorId = rsJob.getLong(1);
-        long toConnectorId = rsJob.getLong(2);
-        long id = rsJob.getLong(3);
-        String name = rsJob.getString(4);
-        long fromLinkId = rsJob.getLong(5);
-        long toLinkId = rsJob.getLong(6);
-        boolean enabled = rsJob.getBoolean(7);
-        String createBy = rsJob.getString(8);
-        Date creationDate = rsJob.getTimestamp(9);
-        String updateBy = rsJob.getString(10);
-        Date lastUpdateDate = rsJob.getTimestamp(11);
-
-        fromConfigFetchStmt.setLong(1, fromConnectorId);
-        toConfigFetchStmt.setLong(1,toConnectorId);
-        driverConfigfetchStmt.setLong(1, driverId);
-
-        jobInputFetchStmt.setLong(1, id);
-        jobInputFetchStmt.setLong(3, id);
-
-        // FROM entity configs
-        List<MConfig> fromConnectorLinkConfig = new ArrayList<MConfig>();
-        List<MConfig> fromConnectorFromJobConfig = new ArrayList<MConfig>();
-        List<MConfig> fromConnectorToJobConfig = new ArrayList<MConfig>();
-
-        loadConnectorConfigTypes(fromConnectorLinkConfig, fromConnectorFromJobConfig, fromConnectorToJobConfig,
-            fromConfigFetchStmt, jobInputFetchStmt, 2, conn);
-
-        // TO entity configs
-        List<MConfig> toConnectorLinkConfig = new ArrayList<MConfig>();
-        List<MConfig> toConnectorFromJobConfig = new ArrayList<MConfig>();
-        List<MConfig> toConnectorToJobConfig = new ArrayList<MConfig>();
-
-        // ?? dont we need 2 different driver configs for the from/to?
-        List<MConfig> driverConfig = new ArrayList<MConfig>();
-
-        loadConnectorConfigTypes(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig,
-            toConfigFetchStmt, jobInputFetchStmt, 2, conn);
-
-        loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2);
-
-        MJob job = new MJob(
-          fromConnectorId, toConnectorId,
-          fromLinkId, toLinkId,
-          new MFromConfig(fromConnectorFromJobConfig),
-          new MToConfig(toConnectorToJobConfig),
-          new MDriverConfig(driverConfig));
-
-        job.setPersistenceId(id);
-        job.setName(name);
-        job.setCreationUser(createBy);
-        job.setCreationDate(creationDate);
-        job.setLastUpdateUser(updateBy);
-        job.setLastUpdateDate(lastUpdateDate);
-        job.setEnabled(enabled);
-
-        jobs.add(job);
-      }
-    } finally {
-      closeResultSets(rsJob);
-      closeStatements(fromConfigFetchStmt, toConfigFetchStmt, driverConfigfetchStmt, jobInputFetchStmt);
-    }
-
-    return jobs;
-  }
-
-  private void registerConfigDirection(Long configId, Direction direction, Connection conn)
-      throws SQLException {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(STMT_INSERT_SQ_CONFIG_DIRECTIONS);
-      stmt.setLong(1, configId);
-      stmt.setLong(2, getDirection(direction, conn));
-      if (stmt.executeUpdate() != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0048);
-      }
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * Register configs in derby database. This method will insert the ids
-   * generated by the repository into the configs passed in itself.
-   *
-   * Use given prepared statements to create entire config structure in database.
-   *
-   * @param configurableId
-   * @param configs
-   * @param type
-   * @param baseConfigStmt
-   * @param baseInputStmt
-   * @param conn
-   * @return short number of configs registered.
-   * @throws SQLException
-   */
-  private short registerConfigs(Long configurableId, Direction direction,
-      List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
-      PreparedStatement baseInputStmt, Connection conn)
-          throws SQLException {
-    short configIndex = 0;
-
-    for (MConfig config : configs) {
-      if (configurableId == null) {
-        baseConfigStmt.setNull(1, Types.BIGINT);
-      } else {
-        baseConfigStmt.setLong(1, configurableId);
-      }
-
-      baseConfigStmt.setString(2, config.getName());
-      baseConfigStmt.setString(3, type);
-      baseConfigStmt.setShort(4, configIndex++);
-
-      int baseConfigCount = baseConfigStmt.executeUpdate();
-      if (baseConfigCount != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0015,
-          Integer.toString(baseConfigCount));
-      }
-      ResultSet rsetConfigId = baseConfigStmt.getGeneratedKeys();
-      if (!rsetConfigId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0016);
-      }
-
-      long configId = rsetConfigId.getLong(1);
-      config.setPersistenceId(configId);
-
-      if (direction != null) {
-        registerConfigDirection(configId, direction, conn);
-      }
-
-      // Insert all the inputs
-      List<MInput<?>> inputs = config.getInputs();
-      registerConfigInputs(configId, inputs, baseInputStmt);
-    }
-    return configIndex;
-  }
-
-  /**
-   * Save given inputs to the database.
-   *
-   * Use given prepare statement to save all inputs into repository.
-   *
-   * @param configId Identifier for corresponding config
-   * @param inputs List of inputs that needs to be saved
-   * @param baseInputStmt Statement that we can utilize
-   * @throws SQLException In case of any failure on Derby side
-   */
-  private void registerConfigInputs(long configId, List<MInput<?>> inputs,
-      PreparedStatement baseInputStmt) throws SQLException {
-    short inputIndex = 0;
-    for (MInput<?> input : inputs) {
-      baseInputStmt.setString(1, input.getName());
-      baseInputStmt.setLong(2, configId);
-      baseInputStmt.setShort(3, inputIndex++);
-      baseInputStmt.setString(4, input.getType().name());
-      baseInputStmt.setBoolean(5, input.isSensitive());
-      // String specific column(s)
-      if (input.getType().equals(MInputType.STRING)) {
-        MStringInput  strInput = (MStringInput) input;
-        baseInputStmt.setShort(6, strInput.getMaxLength());
-      } else {
-        baseInputStmt.setNull(6, Types.INTEGER);
-      }
-      // Enum specific column(s)
-      if(input.getType() == MInputType.ENUM) {
-        baseInputStmt.setString(7, StringUtils.join(((MEnumInput)input).getValues(), ","));
-      } else {
-        baseInputStmt.setNull(7, Types.VARCHAR);
-      }
-
-      int baseInputCount = baseInputStmt.executeUpdate();
-      if (baseInputCount != 1) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0017,
-          Integer.toString(baseInputCount));
-      }
-
-      ResultSet rsetInputId = baseInputStmt.getGeneratedKeys();
-      if (!rsetInputId.next()) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0018);
-      }
-
-      long inputId = rsetInputId.getLong(1);
-      input.setPersistenceId(inputId);
-    }
-  }
-
-  /**
-   * Execute given query on database.
-   *
-   * @param query Query that should be executed
-   */
-  private void runQuery(String query, Connection conn, Object... args) {
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(query);
-
-      for (int i = 0; i < args.length; ++i) {
-        if (args[i] instanceof String) {
-          stmt.setString(i + 1, (String)args[i]);
-        } else if (args[i] instanceof Long) {
-          stmt.setLong(i + 1, (Long) args[i]);
-        } else {
-          stmt.setObject(i + 1, args[i]);
-        }
-      }
-
-      if (stmt.execute()) {
-        ResultSet rset = stmt.getResultSet();
-        int count = 0;
-        while (rset.next()) {
-          count++;
-        }
-        LOG.info("QUERY(" + query + ") produced unused resultset with "+ count + " rows");
-      } else {
-        int updateCount = stmt.getUpdateCount();
-        LOG.info("QUERY(" + query + ") Update count: " + updateCount);
-      }
-    } catch (SQLException ex) {
-      throw new SqoopException(DerbyRepoError.DERBYREPO_0003, query, ex);
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * Load configs and corresponding inputs from Derby database.
-   *
-   * Use given prepared statements to load all configs and corresponding inputs
-   * from Derby.
-   *
-   * @param driverConfig List of driver configs that will be filled up
-   * @param configFetchStatement Prepared statement for fetching configs
-   * @param inputFetchStmt Prepare statement for fetching inputs
-   * @param configPosition position of the config
-   * @throws SQLException In case of any failure on Derby side
-   */
-  public void loadDriverConfigs(List<MConfig> driverConfig,
-                                 PreparedStatement configFetchStatement,
-                                 PreparedStatement inputFetchStmt,
-                                 int configPosition) throws SQLException {
-
-    // Get list of structures from database
-    ResultSet rsetConfig = configFetchStatement.executeQuery();
-    while (rsetConfig.next()) {
-      long configId = rsetConfig.getLong(1);
-      Long fromConnectorId = rsetConfig.getLong(2);
-      String configName = rsetConfig.getString(3);
-      String configTYpe = rsetConfig.getString(4);
-      int configIndex = rsetConfig.getInt(5);
-      List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
-
-      MConfig mDriverConfig = new MConfig(configName, configInputs);
-      mDriverConfig.setPersistenceId(configId);
-
-      inputFetchStmt.setLong(configPosition, configId);
-
-      ResultSet rsetInput = inputFetchStmt.executeQuery();
-      while (rsetInput.next()) {
-        long inputId = rsetInput.getLong(1);
-        String inputName = rsetInput.getString(2);
-        long inputConfig = rsetInput.getLong(3);
-        short inputIndex = rsetInput.getShort(4);
-        String inputType = rsetInput.getString(5);
-        boolean inputSensitivity = rsetInput.getBoolean(6);
-        short inputStrLength = rsetInput.getShort(7);
-        String inputEnumValues = rsetInput.getString(8);
-        String value = rsetInput.getString(9);
-
-        MInputType mit = MInputType.valueOf(inputType);
-
-        MInput input = null;
-        switch (mit) {
-        case STRING:
-          input = new MStringInput(inputName, inputSensitivity, inputStrLength);
-          break;
-        case MAP:
-          input = new MMapInput(inputName, inputSensitivity);
-          break;
-        case BOOLEAN:
-          input = new MBooleanInput(inputName, inputSensitivity);
-          break;
-        case INTEGER:
-          input = new MIntegerInput(inputName, inputSensitivity);
-          break;
-        case ENUM:
-          input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(","));
-          break;
-        default:
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0006,
-              "input-" + inputName + ":" + inputId + ":"
-              + "config-" + inputConfig + ":" + mit.name());
-        }
-
-        // Set persistent ID
-        input.setPersistenceId(inputId);
-
-        // Set value
-        if(value == null) {
-          input.setEmpty();
-        } else {
-          input.restoreFromUrlSafeValueString(value);
-        }
-
-        if (mDriverConfig.getInputs().size() != inputIndex) {
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
-            "config: " + mDriverConfig
-            + "; input: " + input
-            + "; index: " + inputIndex
-            + "; expected: " + mDriverConfig.getInputs().size()
-          );
-        }
-
-        mDriverConfig.getInputs().add(input);
-      }
-
-      if (mDriverConfig.getInputs().size() == 0) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
-          "owner-" + fromConnectorId
-          + "; config: " + mDriverConfig
-        );
-      }
-
-      MConfigType configType = MConfigType.valueOf(configTYpe);
-      switch (configType) {
-      case JOB:
-        if (driverConfig.size() != configIndex) {
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
-            "owner-" + fromConnectorId
-            + "; config: " + configType
-            + "; index: " + configIndex
-            + "; expected: " + driverConfig.size()
-          );
-        }
-        driverConfig.add(mDriverConfig);
-        break;
-      default:
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
-            "connector-" + fromConnectorId + ":" + configType);
-      }
-    }
-  }
-
-  private Direction findConfigDirection(long configId, Connection conn) throws SQLException {
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-
-    try {
-      stmt = conn.prepareStatement(STMT_SELECT_SQ_CONFIG_DIRECTIONS);
-      stmt.setLong(1, configId);
-      rs = stmt.executeQuery();
-      rs.next();
-      return getDirection(rs.getLong(2), conn);
-    } finally {
-      if (rs != null) {
-        closeResultSets(rs);
-      }
-      if (stmt != null) {
-        closeStatements(stmt);
-      }
-    }
-  }
-
-  /**
-   * Load configs and corresponding inputs related to a connector
-   *
-   * Use given prepared statements to load all configs and corresponding inputs
-   * from Derby.
-   *
-   * @param linkConfig List of link configs that will be filled up
-   * @param fromConfig FROM job configs that will be filled up
-   * @param toConfig TO job configs that will be filled up
-   * @param configFetchStmt Prepared statement for fetching configs
-   * @param inputFetchStmt Prepare statement for fetching inputs
-   * @param conn Connection object that is used to find config direction.
-   * @throws SQLException In case of any failure on Derby side
-   */
-  public void loadConnectorConfigTypes(List<MConfig> linkConfig, List<MConfig> fromConfig,
-      List<MConfig> toConfig, PreparedStatement configFetchStmt, PreparedStatement inputFetchStmt,
-      int configPosition, Connection conn) throws SQLException {
-
-    // Get list of structures from database
-    ResultSet rsetConfig = configFetchStmt.executeQuery();
-    while (rsetConfig.next()) {
-      long configId = rsetConfig.getLong(1);
-      Long configConnectorId = rsetConfig.getLong(2);
-      String configName = rsetConfig.getString(3);
-      String configType = rsetConfig.getString(4);
-      int configIndex = rsetConfig.getInt(5);
-      List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
-
-      MConfig config = new MConfig(configName, configInputs);
-      config.setPersistenceId(configId);
-
-      inputFetchStmt.setLong(configPosition, configId);
-
-      ResultSet rsetInput = inputFetchStmt.executeQuery();
-      while (rsetInput.next()) {
-        long inputId = rsetInput.getLong(1);
-        String inputName = rsetInput.getString(2);
-        long inputConfig = rsetInput.getLong(3);
-        short inputIndex = rsetInput.getShort(4);
-        String inputType = rsetInput.getString(5);
-        boolean inputSensitivity = rsetInput.getBoolean(6);
-        short inputStrLength = rsetInput.getShort(7);
-        String inputEnumValues = rsetInput.getString(8);
-        String value = rsetInput.getString(9);
-
-        MInputType mit = MInputType.valueOf(inputType);
-
-        MInput<?> input = null;
-        switch (mit) {
-          case STRING:
-            input = new MStringInput(inputName, inputSensitivity, inputStrLength);
-            break;
-          case MAP:
-            input = new MMapInput(inputName, inputSensitivity);
-            break;
-          case BOOLEAN:
-            input = new MBooleanInput(inputName, inputSensitivity);
-            break;
-          case INTEGER:
-            input = new MIntegerInput(inputName, inputSensitivity);
-            break;
-          case ENUM:
-            input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(","));
-            break;
-          default:
-            throw new SqoopException(DerbyRepoError.DERBYREPO_0006,
-                "input-" + inputName + ":" + inputId + ":"
-                    + "config-" + inputConfig + ":" + mit.name());
-        }
-
-        // Set persistent ID
-        input.setPersistenceId(inputId);
-
-        // Set value
-        if(value == null) {
-          input.setEmpty();
-        } else {
-          input.restoreFromUrlSafeValueString(value);
-        }
-
-        if (config.getInputs().size() != inputIndex) {
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0009,
-              "config: " + config
-                  + "; input: " + input
-                  + "; index: " + inputIndex
-                  + "; expected: " + config.getInputs().size()
-          );
-        }
-
-        config.getInputs().add(input);
-      }
-
-      if (config.getInputs().size() == 0) {
-        throw new SqoopException(DerbyRepoError.DERBYREPO_0008,
-            "connector-" + configConnectorId
-                + "; config: " + config
-        );
-      }
-
-      MConfigType mConfigType = MConfigType.valueOf(configType);
-      switch (mConfigType) {
-        case LINK:
-          if (linkConfig.size() != configIndex) {
-            throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
-                "connector-" + configConnectorId
-                    + "; config: " + config
-                    + "; index: " + configIndex
-                    + "; expected: " + linkConfig.size()
-            );
-          }
-          linkConfig.add(config);
-          break;
-        case JOB:
-          Direction type = findConfigDirection(configId, conn);
-          List<MConfig> jobConfigs;
-          switch(type) {
-            case FROM:
-              jobConfigs = fromConfig;
-              break;
-
-            case TO:
-              jobConfigs = toConfig;
-              break;
-
-            default:
-              throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-          }
-
-          if (jobConfigs.size() != configIndex) {
-            throw new SqoopException(DerbyRepoError.DERBYREPO_0010,
-                "connector-" + configConnectorId
-                    + "; config: " + config
-                    + "; index: " + configIndex
-                    + "; expected: " + jobConfigs.size()
-                    + "; configIndex: " + configIndex
-
-                );
-          }
-
-          jobConfigs.add(config);
-          break;
-        default:
-          throw new SqoopException(DerbyRepoError.DERBYREPO_0007,
-              "connector-" + configConnectorId + ":" + config);
-      }
-    }
-  }
-
-  private void createInputValues(String query,
-                                 long id,
-                                 List<MConfig> configs,
-                                 Connection conn) throws SQLException {
-    PreparedStatement stmt = null;
-    int result;
-
-    try {
-      stmt = conn.prepareStatement(query);
-
-      for (MConfig config : configs) {
-        for (MInput input : config.getInputs()) {
-          // Skip empty values as we're not interested in storing those in db
-          if (input.isEmpty()) {
-            continue;
-          }
-          stmt.setLong(1, id);
-          stmt.setLong(2, input.getPersistenceId());
-          stmt.setString(3, input.getUrlSafeValueString());
-
-          result = stmt.executeUpdate();
-          if (result != 1) {
-            throw new SqoopException(DerbyRepoError.DERBYREPO_0020,
-              Integer.toString(result));
-          }
-        }
-      }
-    } finally {
-      closeStatements(stmt);
-    }
-  }
-
-  /**
-   * Close all given Results set.
-   *
-   * Any occurring exception is silently ignored and logged.
-   *
-   * @param resultSets Result sets to close
-   */
-  private void closeResultSets(ResultSet ... resultSets) {
-    if(resultSets == null) {
-      return;
-    }
-    for (ResultSet rs : resultSets) {
-      if(rs != null) {
-        try {
-          rs.close();
-        } catch(SQLException ex) {
-          LOG.error("Exception during closing result set", ex);
-        }
-      }
-    }
-  }
-
-  /**
-   * Close all given statements.
-   *
-   * Any occurring exception is silently ignored and logged.
-   *
-   * @param stmts Statements to close
-   */
-  private void closeStatements(Statement... stmts) {
-    if(stmts == null) {
-      return;
-    }
-    for (Statement stmt : stmts) {
-      if(stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Exception during closing statement", ex);
-        }
-      }
-    }
-  }
-
-  /**
-   * Log exception and all String variant of arbitrary number of objects.
-   *
-   * This method is useful to log SQLException with all objects that were
-   * used in the query generation to see where is the issue.
-   *
-   * @param throwable Arbitrary throwable object
-   * @param objects Arbitrary array of associated objects
-   */
-  private void logException(Throwable throwable, Object ...objects) {
-    LOG.error("Exception in repository operation", throwable);
-    LOG.error("Associated objects: "+ objects.length);
-    for(Object object : objects) {
-      LOG.error("\t" + object.getClass().getSimpleName() + ": " + object.toString());
-    }
+  public boolean isRespositorySuitableForUse(Connection conn) {
+    // TODO(jarcec): Verify that all structures are present (e.g. something like corruption validation)
+    // NOTE: At this point is is just checking if the repo version matches the version
+    // in the upgraded code
+    return detectRepositoryVersion(conn) == DerbyRepoConstants.LATEST_DERBY_REPOSITORY_VERSION;
   }
 }
\ No newline at end of file


Mime
View raw message