sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject [4/5] sqoop git commit: SQOOP-1589: Sqoop2: Create common constants, error codes, and queries
Date Tue, 11 Nov 2014 00:37:25 GMT
http://git-wip-us.apache.org/repos/asf/sqoop/blob/5de4b437/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
new file mode 100644
index 0000000..9f4ecbc
--- /dev/null
+++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
@@ -0,0 +1,2286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.common.DirectionError;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.common.SupportedDirections;
+import org.apache.sqoop.driver.Driver;
+import org.apache.sqoop.model.MBooleanInput;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigType;
+import org.apache.sqoop.model.MConfigurableType;
+import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MDriver;
+import org.apache.sqoop.model.MDriverConfig;
+import org.apache.sqoop.model.MEnumInput;
+import org.apache.sqoop.model.MFromConfig;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MInputType;
+import org.apache.sqoop.model.MIntegerInput;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.model.MLinkConfig;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.model.MToConfig;
+import org.apache.sqoop.repository.JdbcRepositoryHandler;
+import org.apache.sqoop.submission.SubmissionStatus;
+import org.apache.sqoop.submission.counter.Counter;
+import org.apache.sqoop.submission.counter.CounterGroup;
+import org.apache.sqoop.submission.counter.Counters;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Set of methods required from each JDBC based repository.
+ */
+public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
+
+  private static final Logger LOG =
+      Logger.getLogger(CommonRepositoryHandler.class);
+
+  /**
+   * Name to be used during logging
+   *
+   * @return String
+   */
+  public abstract String name();
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MConnector findConnector(String shortName, Connection conn) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking up connector: " + shortName);
+    }
+    MConnector mc = null;
+    PreparedStatement connectorFetchStmt = null;
+    try {
+      connectorFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_FROM_CONFIGURABLE);
+      connectorFetchStmt.setString(1, shortName);
+
+      List<MConnector> connectors = loadConnectors(connectorFetchStmt, conn);
+
+      if (connectors.size() == 0) {
+        LOG.debug("No connector found by name: " + shortName);
+        return null;
+      } else if (connectors.size() == 1) {
+        LOG.debug("Looking up connector: " + shortName + ", found: " + mc);
+        return connectors.get(0);
+      } else {
+        throw new SqoopException(CommonRepositoryError.COMMON_0002, shortName);
+      }
+
+    } catch (SQLException ex) {
+      logException(ex, shortName);
+      throw new SqoopException(CommonRepositoryError.COMMON_0001, shortName, ex);
+    } finally {
+      closeStatements(connectorFetchStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MConnector> findConnectors(Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIGURABLE_ALL_FOR_TYPE);
+      stmt.setString(1, MConfigurableType.CONNECTOR.name());
+
+      return loadConnectors(stmt, conn);
+    } catch (SQLException ex) {
+      logException(ex);
+      throw new SqoopException(CommonRepositoryError.COMMON_0041, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void registerConnector(MConnector mc, Connection conn) {
+    if (mc.hasPersistenceId()) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0008,
+          mc.getUniqueName());
+    }
+    mc.setPersistenceId(insertAndGetConnectorId(mc, conn));
+    insertConfigsForConnector(mc, conn);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE);
+      stmt.setLong(1, connectorId);
+      stmt.setLong(2, connectorId);
+      return loadJobs(stmt, conn);
+
+    } catch (SQLException ex) {
+      logException(ex, connectorId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void upgradeConnectorAndConfigs(MConnector mConnector, Connection conn) {
+    updateConnectorAndDeleteConfigs(mConnector, conn);
+    insertConfigsForConnector(mConnector, conn);
+  }
+
+  private void updateConnectorAndDeleteConfigs(MConnector mConnector, Connection conn) {
+    PreparedStatement updateConnectorStatement = null;
+    PreparedStatement deleteConfig = null;
+    PreparedStatement deleteInput = null;
+    try {
+      updateConnectorStatement = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_UPDATE_CONFIGURABLE);
+      deleteInput = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
+      deleteConfig = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
+      updateConnectorStatement.setString(1, mConnector.getUniqueName());
+      updateConnectorStatement.setString(2, mConnector.getClassName());
+      updateConnectorStatement.setString(3, mConnector.getVersion());
+      updateConnectorStatement.setString(4, mConnector.getType().name());
+      updateConnectorStatement.setLong(5, mConnector.getPersistenceId());
+
+      if (updateConnectorStatement.executeUpdate() != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0035);
+      }
+      deleteInput.setLong(1, mConnector.getPersistenceId());
+      deleteConfig.setLong(1, mConnector.getPersistenceId());
+      deleteInput.executeUpdate();
+      deleteConfig.executeUpdate();
+
+    } catch (SQLException e) {
+      logException(e, mConnector);
+      throw new SqoopException(CommonRepositoryError.COMMON_0035, e);
+    } finally {
+      closeStatements(updateConnectorStatement, deleteConfig, deleteInput);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void upgradeDriverAndConfigs(MDriver mDriver, Connection conn) {
+    updateDriverAndDeleteConfigs(mDriver, conn);
+    insertConfigsForDriver(mDriver, conn);
+  }
+
+  private void updateDriverAndDeleteConfigs(MDriver mDriver, Connection conn) {
+    PreparedStatement updateDriverStatement = null;
+    PreparedStatement deleteConfig = null;
+    PreparedStatement deleteInput = null;
+    try {
+      updateDriverStatement = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_UPDATE_CONFIGURABLE);
+      deleteInput = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_INPUTS_FOR_CONFIGURABLE);
+      deleteConfig = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_CONFIGS_FOR_CONFIGURABLE);
+      updateDriverStatement.setString(1, mDriver.getUniqueName());
+      updateDriverStatement.setString(2, Driver.getClassName());
+      updateDriverStatement.setString(3, mDriver.getVersion());
+      updateDriverStatement.setString(4, mDriver.getType().name());
+      updateDriverStatement.setLong(5, mDriver.getPersistenceId());
+
+      if (updateDriverStatement.executeUpdate() != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0035);
+      }
+      deleteInput.setLong(1, mDriver.getPersistenceId());
+      deleteConfig.setLong(1, mDriver.getPersistenceId());
+      deleteInput.executeUpdate();
+      deleteConfig.executeUpdate();
+
+    } catch (SQLException e) {
+      logException(e, mDriver);
+      throw new SqoopException(CommonRepositoryError.COMMON_0040, e);
+    } finally {
+      closeStatements(updateDriverStatement, deleteConfig, deleteInput);
+    }
+  }
+
+  /**
+   * Helper method to insert the configs from the  into the
+   * repository.
+   *
+   * @param mDriver The driver instance to use to upgrade.
+   * @param conn    JDBC link to use for updating the configs
+   */
+  private void insertConfigsForDriver(MDriver mDriver, Connection conn) {
+    PreparedStatement baseConfigStmt = null;
+    PreparedStatement baseInputStmt = null;
+    try {
+      baseConfigStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_CONFIG,
+          Statement.RETURN_GENERATED_KEYS);
+
+      baseInputStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_INPUT,
+          Statement.RETURN_GENERATED_KEYS);
+
+      // Register the job config type, since driver config is per job
+      registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(),
+          MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+
+    } catch (SQLException ex) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0011, mDriver.toString(), ex);
+    } finally {
+      closeStatements(baseConfigStmt, baseInputStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MDriver findDriver(String shortName, Connection conn) {
+    LOG.debug("Looking up Driver and config ");
+    PreparedStatement driverFetchStmt = null;
+    PreparedStatement driverConfigFetchStmt = null;
+    PreparedStatement driverConfigInputFetchStmt = null;
+
+    MDriver mDriver;
+    try {
+      driverFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_FROM_CONFIGURABLE);
+      driverFetchStmt.setString(1, shortName);
+
+      ResultSet rsDriverSet = driverFetchStmt.executeQuery();
+      if (!rsDriverSet.next()) {
+        return null;
+      }
+      Long driverId = rsDriverSet.getLong(1);
+      String driverVersion = rsDriverSet.getString(4);
+
+      driverConfigFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      driverConfigFetchStmt.setLong(1, driverId);
+
+      driverConfigInputFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_INPUT);
+      List<MConfig> driverConfigs = new ArrayList<MConfig>();
+      loadDriverConfigs(driverConfigs, driverConfigFetchStmt, driverConfigInputFetchStmt, 1);
+
+      if (driverConfigs.isEmpty()) {
+        return null;
+      }
+      mDriver = new MDriver(new MDriverConfig(driverConfigs), driverVersion);
+      mDriver.setPersistenceId(driverId);
+
+    } catch (SQLException ex) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0001, "Driver", ex);
+    } finally {
+      if (driverConfigFetchStmt != null) {
+        try {
+          driverConfigFetchStmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close driver config fetch statement", ex);
+        }
+      }
+      if (driverConfigInputFetchStmt != null) {
+        try {
+          driverConfigInputFetchStmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close driver input fetch statement", ex);
+        }
+      }
+      if (driverFetchStmt != null) {
+        try {
+          driverFetchStmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Unable to close driver fetch statement", ex);
+        }
+      }
+    }
+
+    LOG.debug("Looked up Driver and config");
+    return mDriver;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void registerDriver(MDriver mDriver, Connection conn) {
+    if (mDriver.hasPersistenceId()) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0008, mDriver.getUniqueName());
+    }
+    mDriver.setPersistenceId(insertAndGetDriverId(mDriver, conn));
+    insertConfigsforDriver(mDriver, conn);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void createLink(MLink link, Connection conn) {
+    PreparedStatement stmt = null;
+    int result;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_LINK,
+          Statement.RETURN_GENERATED_KEYS);
+      stmt.setString(1, link.getName());
+      stmt.setLong(2, link.getConnectorId());
+      stmt.setBoolean(3, link.getEnabled());
+      stmt.setString(4, link.getCreationUser());
+      stmt.setTimestamp(5, new Timestamp(link.getCreationDate().getTime()));
+      stmt.setString(6, link.getLastUpdateUser());
+      stmt.setTimestamp(7, new Timestamp(link.getLastUpdateDate().getTime()));
+
+      result = stmt.executeUpdate();
+      if (result != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0009,
+            Integer.toString(result));
+      }
+
+      ResultSet rsetConnectionId = stmt.getGeneratedKeys();
+
+      if (!rsetConnectionId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+
+      long connectionId = rsetConnectionId.getLong(1);
+
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_LINK_INPUT,
+          connectionId,
+          link.getConnectorLinkConfig().getConfigs(),
+          conn);
+      link.setPersistenceId(connectionId);
+
+    } catch (SQLException ex) {
+      logException(ex, link);
+      throw new SqoopException(CommonRepositoryError.COMMON_0016, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateLink(MLink link, Connection conn) {
+    PreparedStatement deleteStmt = null;
+    PreparedStatement updateStmt = null;
+    try {
+      // Firstly remove old values
+      deleteStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_LINK_INPUT);
+      deleteStmt.setLong(1, link.getPersistenceId());
+      deleteStmt.executeUpdate();
+
+      // Update LINK_CONFIG table
+      updateStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_UPDATE_LINK);
+      updateStmt.setString(1, link.getName());
+      updateStmt.setString(2, link.getLastUpdateUser());
+      updateStmt.setTimestamp(3, new Timestamp(new Date().getTime()));
+
+      updateStmt.setLong(4, link.getPersistenceId());
+      updateStmt.executeUpdate();
+
+      // And reinsert new values
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_LINK_INPUT,
+          link.getPersistenceId(),
+          link.getConnectorLinkConfig().getConfigs(),
+          conn);
+
+    } catch (SQLException ex) {
+      logException(ex, link);
+      throw new SqoopException(CommonRepositoryError.COMMON_0018, ex);
+    } finally {
+      closeStatements(deleteStmt, updateStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean existsLink(long linkId, Connection conn) {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_LINK_CHECK_BY_ID);
+      stmt.setLong(1, linkId);
+      rs = stmt.executeQuery();
+
+      // Should be always valid in query with count
+      rs.next();
+
+      return rs.getLong(1) == 1;
+    } catch (SQLException ex) {
+      logException(ex, linkId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0022, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean inUseLink(long linkId, Connection conn) {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOBS_FOR_LINK_CHECK);
+      stmt.setLong(1, linkId);
+      rs = stmt.executeQuery();
+
+      // Should be always valid in case of count(*) query
+      rs.next();
+
+      return rs.getLong(1) != 0;
+
+    } catch (SQLException e) {
+      logException(e, linkId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0029, e);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void enableLink(long linkId, boolean enabled, Connection conn) {
+    PreparedStatement enableConn = null;
+
+    try {
+      enableConn = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_ENABLE_LINK);
+      enableConn.setBoolean(1, enabled);
+      enableConn.setLong(2, linkId);
+      enableConn.executeUpdate();
+    } catch (SQLException ex) {
+      logException(ex, linkId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0038, ex);
+    } finally {
+      closeStatements(enableConn);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteLink(long linkId, Connection conn) {
+    PreparedStatement dltConn = null;
+
+    try {
+      deleteLinkInputs(linkId, conn);
+      dltConn = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_LINK);
+      dltConn.setLong(1, linkId);
+      dltConn.executeUpdate();
+    } catch (SQLException ex) {
+      logException(ex, linkId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0019, ex);
+    } finally {
+      closeStatements(dltConn);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteLinkInputs(long id, Connection conn) {
+    PreparedStatement dltConnInput = null;
+    try {
+      dltConnInput = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_LINK_INPUT);
+      dltConnInput.setLong(1, id);
+      dltConnInput.executeUpdate();
+    } catch (SQLException ex) {
+      logException(ex, id);
+      throw new SqoopException(CommonRepositoryError.COMMON_0019, ex);
+    } finally {
+      closeStatements(dltConnInput);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MLink findLink(long linkId, Connection conn) {
+    PreparedStatement linkFetchStmt = null;
+    try {
+      linkFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_LINK_SINGLE);
+      linkFetchStmt.setLong(1, linkId);
+
+      List<MLink> links = loadLinks(linkFetchStmt, conn);
+
+      if (links.size() != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0021, "Couldn't find"
+            + " link with id " + linkId);
+      }
+
+      // Return the first and only one link object with the given id
+      return links.get(0);
+
+    } catch (SQLException ex) {
+      logException(ex, linkId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0020, ex);
+    } finally {
+      closeStatements(linkFetchStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MLink findLink(String linkName, Connection conn) {
+    PreparedStatement linkFetchStmt = null;
+    try {
+      linkFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_LINK_SINGLE_BY_NAME);
+      linkFetchStmt.setString(1, linkName);
+
+      List<MLink> links = loadLinks(linkFetchStmt, conn);
+
+      if (links.size() != 1) {
+        return null;
+      }
+
+      // Return the first and only one link object with the given name
+      return links.get(0);
+
+    } catch (SQLException ex) {
+      logException(ex, linkName);
+      throw new SqoopException(CommonRepositoryError.COMMON_0020, ex);
+    } finally {
+      closeStatements(linkFetchStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MLink> findLinks(Connection conn) {
+    PreparedStatement linksFetchStmt = null;
+    try {
+      linksFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_LINK_ALL);
+
+      return loadLinks(linksFetchStmt, conn);
+
+    } catch (SQLException ex) {
+      logException(ex);
+      throw new SqoopException(CommonRepositoryError.COMMON_0020, ex);
+    } finally {
+      closeStatements(linksFetchStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MLink> findLinksForConnector(long connectorId, Connection conn) {
+    PreparedStatement linkByConnectorFetchStmt = null;
+    try {
+      linkByConnectorFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_LINK_FOR_CONNECTOR_CONFIGURABLE);
+      linkByConnectorFetchStmt.setLong(1, connectorId);
+      return loadLinks(linkByConnectorFetchStmt, conn);
+    } catch (SQLException ex) {
+      logException(ex, connectorId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0020, ex);
+    } finally {
+      closeStatements(linkByConnectorFetchStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void createJob(MJob job, Connection conn) {
+    PreparedStatement stmt = null;
+    int result;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB, Statement.RETURN_GENERATED_KEYS);
+      stmt.setString(1, job.getName());
+      stmt.setLong(2, job.getLinkId(Direction.FROM));
+      stmt.setLong(3, job.getLinkId(Direction.TO));
+      stmt.setBoolean(4, job.getEnabled());
+      stmt.setString(5, job.getCreationUser());
+      stmt.setTimestamp(6, new Timestamp(job.getCreationDate().getTime()));
+      stmt.setString(7, job.getLastUpdateUser());
+      stmt.setTimestamp(8, new Timestamp(job.getLastUpdateDate().getTime()));
+
+      result = stmt.executeUpdate();
+      if (result != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0009,
+            Integer.toString(result));
+      }
+
+      ResultSet rsetJobId = stmt.getGeneratedKeys();
+
+      if (!rsetJobId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+
+      long jobId = rsetJobId.getLong(1);
+
+      // from config for the job
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB_INPUT,
+          jobId,
+          job.getJobConfig(Direction.FROM).getConfigs(),
+          conn);
+      // to config for the job
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB_INPUT,
+          jobId,
+          job.getJobConfig(Direction.TO).getConfigs(),
+          conn);
+      // driver config per job
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB_INPUT,
+          jobId,
+          job.getDriverConfig().getConfigs(),
+          conn);
+
+      job.setPersistenceId(jobId);
+
+    } catch (SQLException ex) {
+      logException(ex, job);
+      throw new SqoopException(CommonRepositoryError.COMMON_0023, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateJob(MJob job, Connection conn) {
+    PreparedStatement deleteStmt = null;
+    PreparedStatement updateStmt = null;
+    try {
+      // Firstly remove old values
+      deleteStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_JOB_INPUT);
+      deleteStmt.setLong(1, job.getPersistenceId());
+      deleteStmt.executeUpdate();
+
+      // Update job table
+      updateStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_UPDATE_JOB);
+      updateStmt.setString(1, job.getName());
+      updateStmt.setString(2, job.getLastUpdateUser());
+      updateStmt.setTimestamp(3, new Timestamp(new Date().getTime()));
+
+      updateStmt.setLong(4, job.getPersistenceId());
+      updateStmt.executeUpdate();
+
+      // And reinsert new values
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB_INPUT,
+          job.getPersistenceId(),
+          job.getJobConfig(Direction.FROM).getConfigs(),
+          conn);
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB_INPUT,
+          job.getPersistenceId(),
+          job.getJobConfig(Direction.TO).getConfigs(),
+          conn);
+      createInputValues(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_JOB_INPUT,
+          job.getPersistenceId(),
+          job.getDriverConfig().getConfigs(),
+          conn);
+
+    } catch (SQLException ex) {
+      logException(ex, job);
+      throw new SqoopException(CommonRepositoryError.COMMON_0024, ex);
+    } finally {
+      closeStatements(deleteStmt, updateStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean existsJob(long jobId, Connection conn) {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOB_CHECK_BY_ID);
+      stmt.setLong(1, jobId);
+      rs = stmt.executeQuery();
+
+      // Should be always valid in query with count
+      rs.next();
+
+      return rs.getLong(1) == 1;
+    } catch (SQLException ex) {
+      logException(ex, jobId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0026, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean inUseJob(long jobId, Connection conn) {
+    MSubmission submission = findLastSubmissionForJob(jobId, conn);
+
+    // We have no submissions and thus job can't be in use
+    if (submission == null) {
+      return false;
+    }
+
+    // We can't remove running job
+    if (submission.getStatus().isRunning()) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void enableJob(long jobId, boolean enabled, Connection conn) {
+    PreparedStatement enableConn = null;
+
+    try {
+      enableConn = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_ENABLE_JOB);
+      enableConn.setBoolean(1, enabled);
+      enableConn.setLong(2, jobId);
+      enableConn.executeUpdate();
+    } catch (SQLException ex) {
+      logException(ex, jobId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0039, ex);
+    } finally {
+      closeStatements(enableConn);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteJobInputs(long id, Connection conn) {
+    PreparedStatement dltInput = null;
+    try {
+      dltInput = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_JOB_INPUT);
+      dltInput.setLong(1, id);
+      dltInput.executeUpdate();
+    } catch (SQLException ex) {
+      logException(ex, id);
+      throw new SqoopException(CommonRepositoryError.COMMON_0025, ex);
+    } finally {
+      closeStatements(dltInput);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteJob(long jobId, Connection conn) {
+    PreparedStatement dlt = null;
+    try {
+      deleteJobInputs(jobId, conn);
+      dlt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_JOB);
+      dlt.setLong(1, jobId);
+      dlt.executeUpdate();
+    } catch (SQLException ex) {
+      logException(ex, jobId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0025, ex);
+    } finally {
+      closeStatements(dlt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MJob findJob(long jobId, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOB_SINGLE_BY_ID);
+      stmt.setLong(1, jobId);
+
+      List<MJob> jobs = loadJobs(stmt, conn);
+
+      if (jobs.size() != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0027, "Couldn't find"
+            + " job with id " + jobId);
+      }
+
+      // Return the first and only one link object
+      return jobs.get(0);
+
+    } catch (SQLException ex) {
+      logException(ex, jobId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MJob findJob(String name, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOB_SINGLE_BY_NAME);
+      stmt.setString(1, name);
+
+      List<MJob> jobs = loadJobs(stmt, conn);
+
+      if (jobs.size() != 1) {
+        return null;
+      }
+
+      // Return the first and only one link object
+      return jobs.get(0);
+
+    } catch (SQLException ex) {
+      logException(ex, name);
+      throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MJob> findJobs(Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_JOB);
+
+      return loadJobs(stmt, conn);
+
+    } catch (SQLException ex) {
+      logException(ex);
+      throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void createSubmission(MSubmission submission, Connection conn) {
+    PreparedStatement stmt = null;
+    int result;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_SUBMISSION,
+          Statement.RETURN_GENERATED_KEYS);
+      stmt.setLong(1, submission.getJobId());
+      stmt.setString(2, submission.getStatus().name());
+      stmt.setString(3, submission.getCreationUser());
+      stmt.setTimestamp(4, new Timestamp(submission.getCreationDate().getTime()));
+      stmt.setString(5, submission.getLastUpdateUser());
+      stmt.setTimestamp(6, new Timestamp(submission.getLastUpdateDate().getTime()));
+      stmt.setString(7, submission.getExternalId());
+      stmt.setString(8, submission.getExternalLink());
+      stmt.setString(9, submission.getExceptionInfo());
+      stmt.setString(10, submission.getExceptionStackTrace());
+
+      result = stmt.executeUpdate();
+      if (result != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0009,
+            Integer.toString(result));
+      }
+
+      ResultSet rsetSubmissionId = stmt.getGeneratedKeys();
+
+      if (!rsetSubmissionId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+
+      long submissionId = rsetSubmissionId.getLong(1);
+
+      if(submission.getCounters() != null) {
+        createSubmissionCounters(submissionId, submission.getCounters(), conn);
+      }
+
+      // Save created persistence id
+      submission.setPersistenceId(submissionId);
+
+    } catch (SQLException ex) {
+      logException(ex, submission);
+      throw new SqoopException(CommonRepositoryError.COMMON_0031, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean existsSubmission(long submissionId, Connection conn) {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SUBMISSION_CHECK);
+      stmt.setLong(1, submissionId);
+      rs = stmt.executeQuery();
+
+      // Should be always valid in query with count
+      rs.next();
+
+      return rs.getLong(1) == 1;
+    } catch (SQLException ex) {
+      logException(ex, submissionId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0030, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateSubmission(MSubmission submission, Connection conn) {
+    PreparedStatement stmt = null;
+    PreparedStatement deleteStmt = null;
+    try {
+      //  Update properties in main table
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_UPDATE_SUBMISSION);
+      stmt.setString(1, submission.getStatus().name());
+      stmt.setString(2, submission.getLastUpdateUser());
+      stmt.setTimestamp(3, new Timestamp(submission.getLastUpdateDate().getTime()));
+      stmt.setString(4, submission.getExceptionInfo());
+      stmt.setString(5, submission.getExceptionStackTrace());
+
+      stmt.setLong(6, submission.getPersistenceId());
+      stmt.executeUpdate();
+
+      // Delete previous counters
+      deleteStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_DELETE_COUNTER_SUBMISSION);
+      deleteStmt.setLong(1, submission.getPersistenceId());
+      deleteStmt.executeUpdate();
+
+      // Reinsert new counters if needed
+      if(submission.getCounters() != null) {
+        createSubmissionCounters(submission.getPersistenceId(), submission.getCounters(), conn);
+      }
+
+    } catch (SQLException ex) {
+      logException(ex, submission);
+      throw new SqoopException(CommonRepositoryError.COMMON_0032, ex);
+    } finally {
+      closeStatements(stmt, deleteStmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void purgeSubmissions(Date threshold, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_PURGE_SUBMISSIONS);
+      stmt.setTimestamp(1, new Timestamp(threshold.getTime()));
+      stmt.executeUpdate();
+
+    } catch (SQLException ex) {
+      logException(ex, threshold);
+      throw new SqoopException(CommonRepositoryError.COMMON_0033, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MSubmission> findUnfinishedSubmissions(Connection conn) {
+    List<MSubmission> submissions = new LinkedList<MSubmission>();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SUBMISSION_UNFINISHED);
+
+      for(SubmissionStatus status : SubmissionStatus.unfinished()) {
+        stmt.setString(1, status.name());
+        rs = stmt.executeQuery();
+
+        while(rs.next()) {
+          submissions.add(loadSubmission(rs, conn));
+        }
+
+        rs.close();
+        rs = null;
+      }
+    } catch (SQLException ex) {
+      logException(ex);
+      throw new SqoopException(CommonRepositoryError.COMMON_0034, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+
+    return submissions;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MSubmission> findSubmissions(Connection conn) {
+    List<MSubmission> submissions = new LinkedList<MSubmission>();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SUBMISSIONS);
+      rs = stmt.executeQuery();
+
+      while(rs.next()) {
+        submissions.add(loadSubmission(rs, conn));
+      }
+
+      rs.close();
+      rs = null;
+    } catch (SQLException ex) {
+      logException(ex);
+      throw new SqoopException(CommonRepositoryError.COMMON_0036, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+
+    return submissions;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MSubmission> findSubmissionsForJob(long jobId, Connection conn) {
+    List<MSubmission> submissions = new LinkedList<MSubmission>();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SUBMISSIONS_FOR_JOB);
+      stmt.setLong(1, jobId);
+      rs = stmt.executeQuery();
+
+      while(rs.next()) {
+        submissions.add(loadSubmission(rs, conn));
+      }
+
+      rs.close();
+      rs = null;
+    } catch (SQLException ex) {
+      logException(ex);
+      throw new SqoopException(CommonRepositoryError.COMMON_0037, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+
+    return submissions;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public MSubmission findLastSubmissionForJob(long jobId, Connection conn) {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SUBMISSIONS_FOR_JOB);
+      stmt.setLong(1, jobId);
+      stmt.setMaxRows(1);
+      rs = stmt.executeQuery();
+
+      if(!rs.next()) {
+        return null;
+      }
+
+      return loadSubmission(rs, conn);
+    } catch (SQLException ex) {
+      logException(ex, jobId);
+      throw new SqoopException(CommonRepositoryError.COMMON_0037, ex);
+    } finally {
+      closeResultSets(rs);
+      closeStatements(stmt);
+    }
+  }
+
+  private void insertConnectorDirection(Long connectorId, Direction direction, Connection conn)
+      throws SQLException {
+    PreparedStatement stmt = null;
+
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_SQ_CONNECTOR_DIRECTIONS);
+      stmt.setLong(1, connectorId);
+      stmt.setLong(2, getDirection(direction, conn));
+
+      if (stmt.executeUpdate() != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0043);
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  private void insertConnectorDirections(Long connectorId, SupportedDirections directions, Connection conn)
+      throws SQLException {
+    if (directions.isDirectionSupported(Direction.FROM)) {
+      insertConnectorDirection(connectorId, Direction.FROM, conn);
+    }
+
+    if (directions.isDirectionSupported(Direction.TO)) {
+      insertConnectorDirection(connectorId, Direction.TO, conn);
+    }
+  }
+
+  private long insertAndGetConnectorId(MConnector mc, Connection conn) {
+    PreparedStatement baseConnectorStmt = null;
+    try {
+      baseConnectorStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_CONFIGURABLE,
+          Statement.RETURN_GENERATED_KEYS);
+      baseConnectorStmt.setString(1, mc.getUniqueName());
+      baseConnectorStmt.setString(2, mc.getClassName());
+      baseConnectorStmt.setString(3, mc.getVersion());
+      baseConnectorStmt.setString(4, mc.getType().name());
+
+      int baseConnectorCount = baseConnectorStmt.executeUpdate();
+      if (baseConnectorCount != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0009,
+            Integer.toString(baseConnectorCount));
+      }
+
+      ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
+
+      if (!rsetConnectorId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+      // connector configurable also have directions
+      insertConnectorDirections(rsetConnectorId.getLong(1), mc.getSupportedDirections(), conn);
+      return rsetConnectorId.getLong(1);
+    } catch (SQLException ex) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0011, mc.toString(), ex);
+    } finally {
+      closeStatements(baseConnectorStmt);
+    }
+  }
+
+  /**
+   * Helper method to insert the configs from the MConnector into the
+   * repository. The job and connector configs within <code>mc</code> will get
+   * updated with the id of the configs when this function returns.
+   * @param mc The connector to use for updating configs
+   * @param conn JDBC connection to use for inserting the configs
+   */
+  private void insertConfigsForConnector(MConnector mc, Connection conn) {
+    long connectorId = mc.getPersistenceId();
+    PreparedStatement baseConfigStmt = null;
+    PreparedStatement baseInputStmt = null;
+    try{
+      baseConfigStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_CONFIG,
+          Statement.RETURN_GENERATED_KEYS);
+
+      baseInputStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_INPUT,
+          Statement.RETURN_GENERATED_KEYS);
+
+      // Register link type config
+      registerConfigs(connectorId, null /* No direction for LINK type config*/, mc.getLinkConfig().getConfigs(),
+          MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
+
+      // Register both from/to job type config for connector
+      if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
+        registerConfigs(connectorId, Direction.FROM, mc.getFromConfig().getConfigs(),
+            MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+      }
+      if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
+        registerConfigs(connectorId, Direction.TO, mc.getToConfig().getConfigs(),
+            MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+      }
+    } catch (SQLException ex) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0011,
+          mc.toString(), ex);
+    } finally {
+      closeStatements(baseConfigStmt, baseInputStmt);
+    }
+  }
+
+  private long insertAndGetDriverId(MDriver mDriver, Connection conn) {
+    PreparedStatement baseDriverStmt = null;
+    try {
+      baseDriverStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_CONFIGURABLE,
+          Statement.RETURN_GENERATED_KEYS);
+      baseDriverStmt.setString(1, mDriver.getUniqueName());
+      baseDriverStmt.setString(2, Driver.getClassName());
+      baseDriverStmt.setString(3, mDriver.getVersion());
+      baseDriverStmt.setString(4, mDriver.getType().name());
+
+      int baseDriverCount = baseDriverStmt.executeUpdate();
+      if (baseDriverCount != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0009, Integer.toString(baseDriverCount));
+      }
+
+      ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
+
+      if (!rsetDriverId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+      return rsetDriverId.getLong(1);
+    } catch (SQLException ex) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0044, mDriver.toString(), ex);
+    } finally {
+      closeStatements(baseDriverStmt);
+    }
+  }
+
+  private void insertConfigsforDriver(MDriver mDriver, Connection conn) {
+    PreparedStatement baseConfigStmt = null;
+    PreparedStatement baseInputStmt = null;
+    try {
+      baseConfigStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_CONFIG,
+          Statement.RETURN_GENERATED_KEYS);
+      baseInputStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_INPUT,
+          Statement.RETURN_GENERATED_KEYS);
+
+      // Register a driver config as a job type with no owner/connector and direction
+      registerConfigs(mDriver.getPersistenceId(), null /* no direction*/, mDriver.getDriverConfig().getConfigs(),
+          MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
+
+    } catch (SQLException ex) {
+      logException(ex, mDriver);
+      throw new SqoopException(CommonRepositoryError.COMMON_0011, ex);
+    } finally {
+      closeStatements(baseConfigStmt, baseInputStmt);
+    }
+  }
+
+  /**
+   * Stores counters for given submission in repository.
+   *
+   * @param submissionId Submission id
+   * @param counters Counters that should be stored
+   * @param conn Connection to derby repository
+   * @throws java.sql.SQLException
+   */
+  private void createSubmissionCounters(long submissionId, Counters counters, Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_COUNTER_SUBMISSION);
+
+      for(CounterGroup group : counters) {
+        long groupId = getCounterGroupId(group, conn);
+
+        for(Counter counter: group) {
+          long counterId = getCounterId(counter, conn);
+
+          stmt.setLong(1, groupId);
+          stmt.setLong(2, counterId);
+          stmt.setLong(3, submissionId);
+          stmt.setLong(4, counter.getValue());
+
+          stmt.executeUpdate();
+        }
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Resolves counter group database id.
+   *
+   * @param group Given group
+   * @param conn Connection to database
+   * @return Id
+   * @throws java.sql.SQLException
+   */
+  private long getCounterGroupId(CounterGroup group, Connection conn) throws SQLException {
+    PreparedStatement select = null;
+    PreparedStatement insert = null;
+    ResultSet rsSelect = null;
+    ResultSet rsInsert = null;
+
+    try {
+      select = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_COUNTER_GROUP);
+      select.setString(1, group.getName());
+
+      rsSelect = select.executeQuery();
+
+      if(rsSelect.next()) {
+        return rsSelect.getLong(1);
+      }
+
+      insert = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_COUNTER_GROUP, Statement.RETURN_GENERATED_KEYS);
+      insert.setString(1, group.getName());
+      insert.executeUpdate();
+
+      rsInsert = insert.getGeneratedKeys();
+
+      if (!rsInsert.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+
+      return rsInsert.getLong(1);
+    } finally {
+      closeResultSets(rsSelect, rsInsert);
+      closeStatements(select, insert);
+    }
+  }
+
+  /**
+   * Resolves counter id.
+   *
+   * @param counter Given counter
+   * @param conn Connection to database
+   * @return Id
+   * @throws java.sql.SQLException
+   */
+  private long getCounterId(Counter counter, Connection conn) throws SQLException {
+    PreparedStatement select = null;
+    PreparedStatement insert = null;
+    ResultSet rsSelect = null;
+    ResultSet rsInsert = null;
+
+    try {
+      select = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_COUNTER);
+      select.setString(1, counter.getName());
+
+      rsSelect = select.executeQuery();
+
+      if(rsSelect.next()) {
+        return rsSelect.getLong(1);
+      }
+
+      insert = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_COUNTER, Statement.RETURN_GENERATED_KEYS);
+      insert.setString(1, counter.getName());
+      insert.executeUpdate();
+
+      rsInsert = insert.getGeneratedKeys();
+
+      if (!rsInsert.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0010);
+      }
+
+      return rsInsert.getLong(1);
+    } finally {
+      closeResultSets(rsSelect, rsInsert);
+      closeStatements(select, insert);
+    }
+  }
+
+  /**
+   * Create MSubmission structure from result set.
+   *
+   * @param rs Result set, only active row will be fetched
+   * @param conn Connection to database
+   * @return Created MSubmission structure
+   * @throws java.sql.SQLException
+   */
+  private MSubmission loadSubmission(ResultSet rs, Connection conn) throws SQLException {
+    MSubmission submission = new MSubmission();
+
+    submission.setPersistenceId(rs.getLong(1));
+    submission.setJobId(rs.getLong(2));
+    submission.setStatus(SubmissionStatus.valueOf(rs.getString(3)));
+    submission.setCreationUser(rs.getString(4));
+    submission.setCreationDate(rs.getTimestamp(5));
+    submission.setLastUpdateUser(rs.getString(6));
+    submission.setLastUpdateDate(rs.getTimestamp(7));
+    submission.setExternalId(rs.getString(8));
+    submission.setExternalLink(rs.getString(9));
+    submission.setExceptionInfo(rs.getString(10));
+    submission.setExceptionStackTrace(rs.getString(11));
+
+    Counters counters = loadCountersSubmission(rs.getLong(1), conn);
+    submission.setCounters(counters);
+
+    return submission;
+  }
+
+  private Counters loadCountersSubmission(long submissionId, Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_COUNTER_SUBMISSION);
+      stmt.setLong(1, submissionId);
+      rs = stmt.executeQuery();
+
+      Counters counters = new Counters();
+
+      while (rs.next()) {
+        String groupName = rs.getString(1);
+        String counterName = rs.getString(2);
+        long value = rs.getLong(3);
+
+        CounterGroup group = counters.getCounterGroup(groupName);
+        if (group == null) {
+          group = new CounterGroup(groupName);
+          counters.addCounterGroup(group);
+        }
+
+        group.addCounter(new Counter(counterName, value));
+      }
+
+      if (counters.isEmpty()) {
+        return null;
+      } else {
+        return counters;
+      }
+    } finally {
+      closeStatements(stmt);
+      closeResultSets(rs);
+    }
+  }
+
+  private Long getDirection(Direction direction, Connection conn) throws SQLException {
+    PreparedStatement directionStmt = null;
+    ResultSet rs = null;
+
+    try {
+      directionStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SQD_ID_BY_SQD_NAME);
+      directionStmt.setString(1, direction.toString());
+      rs = directionStmt.executeQuery();
+
+      rs.next();
+      return rs.getLong(1);
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (directionStmt != null) {
+        closeStatements(directionStmt);
+      }
+    }
+  }
+
+  private Direction getDirection(long directionId, Connection conn) throws SQLException {
+    PreparedStatement directionStmt = null;
+    ResultSet rs = null;
+
+    try {
+      directionStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SQD_NAME_BY_SQD_ID);
+      directionStmt.setLong(1, directionId);
+      rs = directionStmt.executeQuery();
+
+      rs.next();
+      return Direction.valueOf(rs.getString(1));
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (directionStmt != null) {
+        closeStatements(directionStmt);
+      }
+    }
+  }
+
+  private SupportedDirections findConnectorSupportedDirections(long connectorId, Connection conn) throws SQLException {
+    PreparedStatement connectorDirectionsStmt = null;
+    ResultSet rs = null;
+
+    boolean from = false, to = false;
+
+    try {
+      connectorDirectionsStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SQ_CONNECTOR_DIRECTIONS);
+      connectorDirectionsStmt.setLong(1, connectorId);
+      rs = connectorDirectionsStmt.executeQuery();
+
+      while(rs.next()) {
+        switch(getDirection(rs.getLong(2), conn)) {
+          case FROM:
+            from = true;
+            break;
+
+          case TO:
+            to = true;
+            break;
+        }
+      }
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (connectorDirectionsStmt != null) {
+        closeStatements(connectorDirectionsStmt);
+      }
+    }
+
+    return new SupportedDirections(from, to);
+  }
+
+  private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn) throws SQLException {
+    List<MConnector> connectors = new ArrayList<MConnector>();
+    ResultSet rsConnectors = null;
+    PreparedStatement connectorConfigFetchStmt = null;
+    PreparedStatement connectorConfigInputFetchStmt = null;
+
+    try {
+      rsConnectors = stmt.executeQuery();
+      connectorConfigFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      connectorConfigInputFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_INPUT);
+
+      while(rsConnectors.next()) {
+        long connectorId = rsConnectors.getLong(1);
+        String connectorName = rsConnectors.getString(2);
+        String connectorClassName = rsConnectors.getString(3);
+        String connectorVersion = rsConnectors.getString(4);
+
+        connectorConfigFetchStmt.setLong(1, connectorId);
+
+        List<MConfig> linkConfig = new ArrayList<MConfig>();
+        List<MConfig> fromConfig = new ArrayList<MConfig>();
+        List<MConfig> toConfig = new ArrayList<MConfig>();
+
+        loadConnectorConfigTypes(linkConfig, fromConfig, toConfig, connectorConfigFetchStmt,
+            connectorConfigInputFetchStmt, 1, conn);
+
+        SupportedDirections supportedDirections
+            = findConnectorSupportedDirections(connectorId, conn);
+        MFromConfig fromJobConfig = null;
+        MToConfig toJobConfig = null;
+        if (supportedDirections.isDirectionSupported(Direction.FROM)) {
+          fromJobConfig = new MFromConfig(fromConfig);
+        }
+        if (supportedDirections.isDirectionSupported(Direction.TO)) {
+          toJobConfig = new MToConfig(toConfig);
+        }
+        MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
+            new MLinkConfig(linkConfig), fromJobConfig, toJobConfig);
+        mc.setPersistenceId(connectorId);
+
+        connectors.add(mc);
+      }
+    } finally {
+      closeResultSets(rsConnectors);
+      closeStatements(connectorConfigFetchStmt, connectorConfigInputFetchStmt);
+    }
+    return connectors;
+  }
+
+  private List<MLink> loadLinks(PreparedStatement stmt,
+                                Connection conn)
+      throws SQLException {
+    List<MLink> links = new ArrayList<MLink>();
+    ResultSet rsConnection = null;
+    PreparedStatement connectorConfigFetchStatement = null;
+    PreparedStatement connectorConfigInputStatement = null;
+
+    try {
+      rsConnection = stmt.executeQuery();
+
+      //
+      connectorConfigFetchStatement = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      connectorConfigInputStatement = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_FETCH_LINK_INPUT);
+
+      while(rsConnection.next()) {
+        long id = rsConnection.getLong(1);
+        String name = rsConnection.getString(2);
+        long connectorId = rsConnection.getLong(3);
+        boolean enabled = rsConnection.getBoolean(4);
+        String creationUser = rsConnection.getString(5);
+        Date creationDate = rsConnection.getTimestamp(6);
+        String updateUser = rsConnection.getString(7);
+        Date lastUpdateDate = rsConnection.getTimestamp(8);
+
+        connectorConfigFetchStatement.setLong(1, connectorId);
+        connectorConfigInputStatement.setLong(1, id);
+        connectorConfigInputStatement.setLong(3, id);
+
+        List<MConfig> connectorLinkConfig = new ArrayList<MConfig>();
+        List<MConfig> fromConfig = new ArrayList<MConfig>();
+        List<MConfig> toConfig = new ArrayList<MConfig>();
+
+        loadConnectorConfigTypes(connectorLinkConfig, fromConfig, toConfig, connectorConfigFetchStatement,
+            connectorConfigInputStatement, 2, conn);
+        MLink link = new MLink(connectorId, new MLinkConfig(connectorLinkConfig));
+
+        link.setPersistenceId(id);
+        link.setName(name);
+        link.setCreationUser(creationUser);
+        link.setCreationDate(creationDate);
+        link.setLastUpdateUser(updateUser);
+        link.setLastUpdateDate(lastUpdateDate);
+        link.setEnabled(enabled);
+
+        links.add(link);
+      }
+    } finally {
+      closeResultSets(rsConnection);
+      closeStatements(connectorConfigFetchStatement, connectorConfigInputStatement);
+    }
+
+    return links;
+  }
+
+  private List<MJob> loadJobs(PreparedStatement stmt,
+                              Connection conn)
+      throws SQLException {
+    List<MJob> jobs = new ArrayList<MJob>();
+    ResultSet rsJob = null;
+    PreparedStatement fromConfigFetchStmt = null;
+    PreparedStatement toConfigFetchStmt = null;
+    PreparedStatement driverConfigfetchStmt = null;
+    PreparedStatement jobInputFetchStmt = null;
+
+    try {
+      rsJob = stmt.executeQuery();
+      // Note: Job does not hold a explicit reference to the driver since every
+      // job has the same driver
+      long driverId = this.findDriver(MDriver.DRIVER_NAME, conn).getPersistenceId();
+      fromConfigFetchStmt  = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      toConfigFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      driverConfigfetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_CONFIG_FOR_CONFIGURABLE);
+      jobInputFetchStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_FETCH_JOB_INPUT);
+
+      while(rsJob.next()) {
+        long fromConnectorId = rsJob.getLong(1);
+        long toConnectorId = rsJob.getLong(2);
+        long id = rsJob.getLong(3);
+        String name = rsJob.getString(4);
+        long fromLinkId = rsJob.getLong(5);
+        long toLinkId = rsJob.getLong(6);
+        boolean enabled = rsJob.getBoolean(7);
+        String createBy = rsJob.getString(8);
+        Date creationDate = rsJob.getTimestamp(9);
+        String updateBy = rsJob.getString(10);
+        Date lastUpdateDate = rsJob.getTimestamp(11);
+
+        fromConfigFetchStmt.setLong(1, fromConnectorId);
+        toConfigFetchStmt.setLong(1,toConnectorId);
+        driverConfigfetchStmt.setLong(1, driverId);
+
+        jobInputFetchStmt.setLong(1, id);
+        jobInputFetchStmt.setLong(3, id);
+
+        // FROM entity configs
+        List<MConfig> fromConnectorLinkConfig = new ArrayList<MConfig>();
+        List<MConfig> fromConnectorFromJobConfig = new ArrayList<MConfig>();
+        List<MConfig> fromConnectorToJobConfig = new ArrayList<MConfig>();
+
+        loadConnectorConfigTypes(fromConnectorLinkConfig, fromConnectorFromJobConfig, fromConnectorToJobConfig,
+            fromConfigFetchStmt, jobInputFetchStmt, 2, conn);
+
+        // TO entity configs
+        List<MConfig> toConnectorLinkConfig = new ArrayList<MConfig>();
+        List<MConfig> toConnectorFromJobConfig = new ArrayList<MConfig>();
+        List<MConfig> toConnectorToJobConfig = new ArrayList<MConfig>();
+
+        // ?? dont we need 2 different driver configs for the from/to?
+        List<MConfig> driverConfig = new ArrayList<MConfig>();
+
+        loadConnectorConfigTypes(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig,
+            toConfigFetchStmt, jobInputFetchStmt, 2, conn);
+
+        loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2);
+
+        MJob job = new MJob(
+            fromConnectorId, toConnectorId,
+            fromLinkId, toLinkId,
+            new MFromConfig(fromConnectorFromJobConfig),
+            new MToConfig(toConnectorToJobConfig),
+            new MDriverConfig(driverConfig));
+
+        job.setPersistenceId(id);
+        job.setName(name);
+        job.setCreationUser(createBy);
+        job.setCreationDate(creationDate);
+        job.setLastUpdateUser(updateBy);
+        job.setLastUpdateDate(lastUpdateDate);
+        job.setEnabled(enabled);
+
+        jobs.add(job);
+      }
+    } finally {
+      closeResultSets(rsJob);
+      closeStatements(fromConfigFetchStmt, toConfigFetchStmt, driverConfigfetchStmt, jobInputFetchStmt);
+    }
+
+    return jobs;
+  }
+
+  private void registerConfigDirection(Long configId, Direction direction, Connection conn)
+      throws SQLException {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_SQ_CONFIG_DIRECTIONS);
+      stmt.setLong(1, configId);
+      stmt.setLong(2, getDirection(direction, conn));
+      if (stmt.executeUpdate() != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0042);
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Register configs in derby database. This method will insert the ids
+   * generated by the repository into the configs passed in itself.
+   *
+   * Use given prepared statements to create entire config structure in database.
+   *
+   * @param configurableId
+   * @param configs
+   * @param type
+   * @param baseConfigStmt
+   * @param baseInputStmt
+   * @param conn
+   * @return short number of configs registered.
+   * @throws java.sql.SQLException
+   */
+  private short registerConfigs(Long configurableId, Direction direction,
+                                List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
+                                PreparedStatement baseInputStmt, Connection conn)
+      throws SQLException {
+    short configIndex = 0;
+
+    for (MConfig config : configs) {
+      if (configurableId == null) {
+        baseConfigStmt.setNull(1, Types.BIGINT);
+      } else {
+        baseConfigStmt.setLong(1, configurableId);
+      }
+
+      baseConfigStmt.setString(2, config.getName());
+      baseConfigStmt.setString(3, type);
+      baseConfigStmt.setShort(4, configIndex++);
+
+      int baseConfigCount = baseConfigStmt.executeUpdate();
+      if (baseConfigCount != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0012,
+            Integer.toString(baseConfigCount));
+      }
+      ResultSet rsetConfigId = baseConfigStmt.getGeneratedKeys();
+      if (!rsetConfigId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0013);
+      }
+
+      long configId = rsetConfigId.getLong(1);
+      config.setPersistenceId(configId);
+
+      if (direction != null) {
+        registerConfigDirection(configId, direction, conn);
+      }
+
+      // Insert all the inputs
+      List<MInput<?>> inputs = config.getInputs();
+      registerConfigInputs(configId, inputs, baseInputStmt);
+    }
+    return configIndex;
+  }
+
+  /**
+   * Save given inputs to the database.
+   *
+   * Use given prepare statement to save all inputs into repository.
+   *
+   * @param configId Identifier for corresponding config
+   * @param inputs List of inputs that needs to be saved
+   * @param baseInputStmt Statement that we can utilize
+   * @throws java.sql.SQLException In case of any failure on Derby side
+   */
+  private void registerConfigInputs(long configId, List<MInput<?>> inputs,
+                                    PreparedStatement baseInputStmt) throws SQLException {
+    short inputIndex = 0;
+    for (MInput<?> input : inputs) {
+      baseInputStmt.setString(1, input.getName());
+      baseInputStmt.setLong(2, configId);
+      baseInputStmt.setShort(3, inputIndex++);
+      baseInputStmt.setString(4, input.getType().name());
+      baseInputStmt.setBoolean(5, input.isSensitive());
+      // String specific column(s)
+      if (input.getType().equals(MInputType.STRING)) {
+        MStringInput strInput = (MStringInput) input;
+        baseInputStmt.setShort(6, strInput.getMaxLength());
+      } else {
+        baseInputStmt.setNull(6, Types.INTEGER);
+      }
+      // Enum specific column(s)
+      if(input.getType() == MInputType.ENUM) {
+        baseInputStmt.setString(7, StringUtils.join(((MEnumInput) input).getValues(), ","));
+      } else {
+        baseInputStmt.setNull(7, Types.VARCHAR);
+      }
+
+      int baseInputCount = baseInputStmt.executeUpdate();
+      if (baseInputCount != 1) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0014,
+            Integer.toString(baseInputCount));
+      }
+
+      ResultSet rsetInputId = baseInputStmt.getGeneratedKeys();
+      if (!rsetInputId.next()) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0015);
+      }
+
+      long inputId = rsetInputId.getLong(1);
+      input.setPersistenceId(inputId);
+    }
+  }
+
+  /**
+   * Load configs and corresponding inputs from Derby database.
+   *
+   * Use given prepared statements to load all configs and corresponding inputs
+   * from Derby.
+   *
+   * @param driverConfig List of driver configs that will be filled up
+   * @param configFetchStatement Prepared statement for fetching configs
+   * @param inputFetchStmt Prepare statement for fetching inputs
+   * @param configPosition position of the config
+   * @throws java.sql.SQLException In case of any failure on Derby side
+   */
+  public void loadDriverConfigs(List<MConfig> driverConfig,
+                                PreparedStatement configFetchStatement,
+                                PreparedStatement inputFetchStmt,
+                                int configPosition) throws SQLException {
+
+    // Get list of structures from database
+    ResultSet rsetConfig = configFetchStatement.executeQuery();
+    while (rsetConfig.next()) {
+      long configId = rsetConfig.getLong(1);
+      Long fromConnectorId = rsetConfig.getLong(2);
+      String configName = rsetConfig.getString(3);
+      String configTYpe = rsetConfig.getString(4);
+      int configIndex = rsetConfig.getInt(5);
+      List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
+
+      MConfig mDriverConfig = new MConfig(configName, configInputs);
+      mDriverConfig.setPersistenceId(configId);
+
+      inputFetchStmt.setLong(configPosition, configId);
+
+      ResultSet rsetInput = inputFetchStmt.executeQuery();
+      while (rsetInput.next()) {
+        long inputId = rsetInput.getLong(1);
+        String inputName = rsetInput.getString(2);
+        long inputConfig = rsetInput.getLong(3);
+        short inputIndex = rsetInput.getShort(4);
+        String inputType = rsetInput.getString(5);
+        boolean inputSensitivity = rsetInput.getBoolean(6);
+        short inputStrLength = rsetInput.getShort(7);
+        String inputEnumValues = rsetInput.getString(8);
+        String value = rsetInput.getString(9);
+
+        MInputType mit = MInputType.valueOf(inputType);
+
+        MInput input = null;
+        switch (mit) {
+          case STRING:
+            input = new MStringInput(inputName, inputSensitivity, inputStrLength);
+            break;
+          case MAP:
+            input = new MMapInput(inputName, inputSensitivity);
+            break;
+          case BOOLEAN:
+            input = new MBooleanInput(inputName, inputSensitivity);
+            break;
+          case INTEGER:
+            input = new MIntegerInput(inputName, inputSensitivity);
+            break;
+          case ENUM:
+            input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(","));
+            break;
+          default:
+            throw new SqoopException(CommonRepositoryError.COMMON_0003,
+                "input-" + inputName + ":" + inputId + ":"
+                    + "config-" + inputConfig + ":" + mit.name());
+        }
+
+        // Set persistent ID
+        input.setPersistenceId(inputId);
+
+        // Set value
+        if(value == null) {
+          input.setEmpty();
+        } else {
+          input.restoreFromUrlSafeValueString(value);
+        }
+
+        if (mDriverConfig.getInputs().size() != inputIndex) {
+          throw new SqoopException(CommonRepositoryError.COMMON_0006,
+              "config: " + mDriverConfig
+                  + "; input: " + input
+                  + "; index: " + inputIndex
+                  + "; expected: " + mDriverConfig.getInputs().size()
+          );
+        }
+
+        mDriverConfig.getInputs().add(input);
+      }
+
+      if (mDriverConfig.getInputs().size() == 0) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0005,
+            "owner-" + fromConnectorId
+                + "; config: " + mDriverConfig
+        );
+      }
+
+      MConfigType configType = MConfigType.valueOf(configTYpe);
+      switch (configType) {
+        case JOB:
+          if (driverConfig.size() != configIndex) {
+            throw new SqoopException(CommonRepositoryError.COMMON_0007,
+                "owner-" + fromConnectorId
+                    + "; config: " + configType
+                    + "; index: " + configIndex
+                    + "; expected: " + driverConfig.size()
+            );
+          }
+          driverConfig.add(mDriverConfig);
+          break;
+        default:
+          throw new SqoopException(CommonRepositoryError.COMMON_0004,
+              "connector-" + fromConnectorId + ":" + configType);
+      }
+    }
+  }
+
+  private Direction findConfigDirection(long configId, Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_SELECT_SQ_CONFIG_DIRECTIONS);
+      stmt.setLong(1, configId);
+      rs = stmt.executeQuery();
+      rs.next();
+      return getDirection(rs.getLong(2), conn);
+    } finally {
+      if (rs != null) {
+        closeResultSets(rs);
+      }
+      if (stmt != null) {
+        closeStatements(stmt);
+      }
+    }
+  }
+
+  /**
+   * Load configs and corresponding inputs related to a connector.
+   *
+   * Use given prepared statements to load all configs and corresponding inputs
+   * from Derby.
+   *
+   * @param linkConfig List of link configs that will be filled up
+   * @param fromConfig FROM job configs that will be filled up
+   * @param toConfig TO job configs that will be filled up
+   * @param configFetchStmt Prepared statement for fetching configs
+   * @param inputFetchStmt Prepare statement for fetching inputs
+   * @param conn Connection object that is used to find config direction.
+   * @throws java.sql.SQLException In case of any failure on Derby side
+   */
+  public void loadConnectorConfigTypes(List<MConfig> linkConfig, List<MConfig> fromConfig,
+                                       List<MConfig> toConfig, PreparedStatement configFetchStmt, PreparedStatement inputFetchStmt,
+                                       int configPosition, Connection conn) throws SQLException {
+
+    // Get list of structures from database
+    ResultSet rsetConfig = configFetchStmt.executeQuery();
+    while (rsetConfig.next()) {
+      long configId = rsetConfig.getLong(1);
+      Long configConnectorId = rsetConfig.getLong(2);
+      String configName = rsetConfig.getString(3);
+      String configType = rsetConfig.getString(4);
+      int configIndex = rsetConfig.getInt(5);
+      List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
+
+      MConfig config = new MConfig(configName, configInputs);
+      config.setPersistenceId(configId);
+
+      inputFetchStmt.setLong(configPosition, configId);
+
+      ResultSet rsetInput = inputFetchStmt.executeQuery();
+      while (rsetInput.next()) {
+        long inputId = rsetInput.getLong(1);
+        String inputName = rsetInput.getString(2);
+        long inputConfig = rsetInput.getLong(3);
+        short inputIndex = rsetInput.getShort(4);
+        String inputType = rsetInput.getString(5);
+        boolean inputSensitivity = rsetInput.getBoolean(6);
+        short inputStrLength = rsetInput.getShort(7);
+        String inputEnumValues = rsetInput.getString(8);
+        String value = rsetInput.getString(9);
+
+        MInputType mit = MInputType.valueOf(inputType);
+
+        MInput<?> input = null;
+        switch (mit) {
+          case STRING:
+            input = new MStringInput(inputName, inputSensitivity, inputStrLength);
+            break;
+          case MAP:
+            input = new MMapInput(inputName, inputSensitivity);
+            break;
+          case BOOLEAN:
+            input = new MBooleanInput(inputName, inputSensitivity);
+            break;
+          case INTEGER:
+            input = new MIntegerInput(inputName, inputSensitivity);
+            break;
+          case ENUM:
+            input = new MEnumInput(inputName, inputSensitivity, inputEnumValues.split(","));
+            break;
+          default:
+            throw new SqoopException(CommonRepositoryError.COMMON_0003,
+                "input-" + inputName + ":" + inputId + ":"
+                    + "config-" + inputConfig + ":" + mit.name());
+        }
+
+        // Set persistent ID
+        input.setPersistenceId(inputId);
+
+        // Set value
+        if(value == null) {
+          input.setEmpty();
+        } else {
+          input.restoreFromUrlSafeValueString(value);
+        }
+
+        if (config.getInputs().size() != inputIndex) {
+          throw new SqoopException(CommonRepositoryError.COMMON_0006,
+              "config: " + config
+                  + "; input: " + input
+                  + "; index: " + inputIndex
+                  + "; expected: " + config.getInputs().size()
+          );
+        }
+
+        config.getInputs().add(input);
+      }
+
+      if (config.getInputs().size() == 0) {
+        throw new SqoopException(CommonRepositoryError.COMMON_0005,
+            "connector-" + configConnectorId
+                + "; config: " + config
+        );
+      }
+
+      MConfigType mConfigType = MConfigType.valueOf(configType);
+      switch (mConfigType) {
+        case LINK:
+          if (linkConfig.size() != configIndex) {
+            throw new SqoopException(CommonRepositoryError.COMMON_0007,
+                "connector-" + configConnectorId
+                    + "; config: " + config
+                    + "; index: " + configIndex
+                    + "; expected: " + linkConfig.size()
+            );
+          }
+          linkConfig.add(config);
+          break;
+        case JOB:
+          Direction type = findConfigDirection(configId, conn);
+          List<MConfig> jobConfigs;
+          switch(type) {
+            case FROM:
+              jobConfigs = fromConfig;
+              break;
+
+            case TO:
+              jobConfigs = toConfig;
+              break;
+
+            default:
+              throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
+          }
+
+          if (jobConfigs.size() != configIndex) {
+            throw new SqoopException(CommonRepositoryError.COMMON_0007,
+                "connector-" + configConnectorId
+                    + "; config: " + config
+                    + "; index: " + configIndex
+                    + "; expected: " + jobConfigs.size()
+            );
+          }
+
+          jobConfigs.add(config);
+          break;
+        default:
+          throw new SqoopException(CommonRepositoryError.COMMON_0004,
+              "connector-" + configConnectorId + ":" + config);
+      }
+    }
+  }
+
+  private void createInputValues(String query,
+                                 long id,
+                                 List<MConfig> configs,
+                                 Connection conn) throws SQLException {
+    PreparedStatement stmt = null;
+    int result;
+
+    try {
+      stmt = conn.prepareStatement(query);
+
+      for (MConfig config : configs) {
+        for (MInput input : config.getInputs()) {
+          // Skip empty values as we're not interested in storing those in db
+          if (input.isEmpty()) {
+            continue;
+          }
+          stmt.setLong(1, id);
+          stmt.setLong(2, input.getPersistenceId());
+          stmt.setString(3, input.getUrlSafeValueString());
+
+          result = stmt.executeUpdate();
+          if (result != 1) {
+            throw new SqoopException(CommonRepositoryError.COMMON_0017,
+                Integer.toString(result));
+          }
+        }
+      }
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Execute given query on database.
+   *
+   * @param query Query that should be executed
+   */
+  protected void runQuery(String query, Connection conn, Object... args) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(query);
+
+      for (int i = 0; i < args.length; ++i) {
+        if (args[i] instanceof String) {
+          stmt.setString(i + 1, (String)args[i]);
+        } else if (args[i] instanceof Long) {
+          stmt.setLong(i + 1, (Long) args[i]);
+        } else {
+          stmt.setObject(i + 1, args[i]);
+        }
+      }
+
+      if (stmt.execute()) {
+        ResultSet rset = stmt.getResultSet();
+        int count = 0;
+        while (rset.next()) {
+          count++;
+        }
+        LOG.info("QUERY(" + query + ") produced unused resultset with "+ count + " rows");
+      } else {
+        int updateCount = stmt.getUpdateCount();
+        LOG.info("QUERY(" + query + ") Update count: " + updateCount);
+      }
+    } catch (SQLException ex) {
+      throw new SqoopException(CommonRepositoryError.COMMON_0000, query, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * Close all given Results set.
+   *
+   * Any occurring exception is silently ignored and logged.
+   *
+   * @param resultSets Result sets to close
+   */
+  protected void closeResultSets(ResultSet ... resultSets) {
+    if(resultSets == null) {
+      return;
+    }
+    for (ResultSet rs : resultSets) {
+      if(rs != null) {
+        try {
+          rs.close();
+        } catch(SQLException ex) {
+          LOG.error("Exception during closing result set", ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Close all given statements.
+   *
+   * Any occurring exception is silently ignored and logged.
+   *
+   * @param stmts Statements to close
+   */
+  protected void closeStatements(Statement... stmts) {
+    if(stmts == null) {
+      return;
+    }
+    for (Statement stmt : stmts) {
+      if(stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException ex) {
+          LOG.error("Exception during closing statement", ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Log exception and all String variant of arbitrary number of objects.
+   *
+   * This method is useful to log SQLException with all objects that were
+   * used in the query generation to see where is the issue.
+   *
+   * @param throwable Arbitrary throwable object
+   * @param objects Arbitrary array of associated objects
+   */
+  protected void logException(Throwable throwable, Object ...objects) {
+    LOG.error("Exception in repository operation", throwable);
+    LOG.error("Using database " + name());
+    LOG.error("Associated objects: "+ objects.length);
+    for(Object object : objects) {
+      LOG.error("\t" + object.getClass().getSimpleName() + ": " + object.toString());
+    }
+  }
+}


Mime
View raw message