sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject sqoop git commit: SQOOP-1741: Port SQOOP-1736 to sqoop2 branch
Date Wed, 19 Nov 2014 16:52:57 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 dfd1fd348 -> 272fc2f8a


SQOOP-1741: Port SQOOP-1736 to sqoop2 branch

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/272fc2f8
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/272fc2f8
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/272fc2f8

Branch: refs/heads/sqoop2
Commit: 272fc2f8af735dac21e095328722878bc1ddc4f5
Parents: dfd1fd3
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Nov 19 08:52:11 2014 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Wed Nov 19 08:52:11 2014 -0800

----------------------------------------------------------------------
 .../apache/sqoop/repository/JdbcRepository.java |  8 +-
 .../common/CommonRepositoryHandler.java         |  2 +-
 .../sqoop/repository/derby/DerbyRepoError.java  |  3 +
 .../derby/DerbyRepositoryHandler.java           | 99 +++++++++++++++++++-
 .../derby/DerbySchemaUpgradeQuery.java          | 26 +++++
 .../apache/sqoop/tools/tool/UpgradeTool.java    |  6 +-
 6 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index f41e60e..e5415e8 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -220,14 +220,14 @@ public class JdbcRepository extends Repository {
     return (MDriver) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        MDriver existingDriverConfig = handler.findDriver(mDriver.getUniqueName(), conn);
-        if (existingDriverConfig == null) {
+        MDriver existingDriver = handler.findDriver(mDriver.getUniqueName(), conn);
+        if (existingDriver == null) {
           handler.registerDriver(mDriver, conn);
           return mDriver;
         } else {
           // We're currently not serializing version into repository
           // so let's just compare the structure to see if we need upgrade.
-          if(!mDriver.equals(existingDriverConfig)) {
+          if(!mDriver.equals(existingDriver)) {
             if (autoUpgrade) {
               upgradeDriver(mDriver);
               return mDriver;
@@ -236,7 +236,7 @@ public class JdbcRepository extends Repository {
                 "Driver: " + mDriver.getPersistenceId());
             }
           }
-          return existingDriverConfig;
+          return existingDriver;
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
index 3ae2bfc..c278406 100644
--- a/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
+++ b/repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
@@ -263,7 +263,7 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler
{
       baseInputStmt = conn.prepareStatement(CommonRepositoryInsertUpdateDeleteSelectQuery.STMT_INSERT_INTO_INPUT,
           Statement.RETURN_GENERATED_KEYS);
 
-      // Register the job config type, since driver config is per job
+      // Register a driver config as a job type with no direction
       registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(),
           MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 769544b..6bc5674 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -53,6 +53,9 @@ public enum DerbyRepoError implements ErrorCode {
   /** Can't get ID of direction **/
   DERBYREPO_0008("Could not get ID of recently added direction"),
 
+  /** The system was unable to register driver due to a server error **/
+  DERBYREPO_0009("Registration of driver failed"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index d792554..907978f 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -44,7 +44,10 @@ import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorHandler;
 import org.apache.sqoop.connector.ConnectorManagerUtils;
+import org.apache.sqoop.driver.Driver;
 import org.apache.sqoop.model.MConfigType;
+import org.apache.sqoop.model.MConfigurableType;
+import org.apache.sqoop.model.MDriver;
 import org.apache.sqoop.model.MInputType;
 import org.apache.sqoop.repository.JdbcRepositoryContext;
 import org.apache.sqoop.repository.common.CommonRepoConstants;
@@ -242,7 +245,10 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler {
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn);
 
-      // Data modifications only for non-fresh install.
+      // force register HDFS-connector as a first class citizen in the connector list
+      // and re-associate old frameworks configs and connections/links with the new hdfs
connector
+      // Data modifications only for non-fresh install hence the > 0 check
+
       if (repositoryVersion > 0) {
         LOG.info("Force registering the HDFS connector as a new configurable");
         long hdfsConnectorId = registerHdfsConnector(conn);
@@ -267,6 +273,21 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler {
       runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME, conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIGURABLE_ADD_UNIQUE_CONSTRAINT_NAME, conn);
+
+      // force register the driver as a first class citizen and re-associate the old framework
configs with the new driver Id
+      // Data modifications only for non-fresh install hence the > 0 check
+      if (repositoryVersion > 0) {
+        LOG.info("Force registering the Driver as a new configurable");
+        long driverId = registerDriver(conn);
+        LOG.info("Finished Force registering of the driver as a new configurable");
+
+        LOG.info("Updating config and inputs for the driver.");
+        updateDriverConfigInput(conn, driverId);
+        LOG.info("Finished Updating config and inputs for the driver.");
+      }
+    }
+
+    if (repositoryVersion <= 4) {
       runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIGURABLE_ID,
           conn);
       runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_ADD_UNIQUE_CONSTRAINT_NAME_TYPE_AND_CONFIG_ID,
conn);
@@ -498,6 +519,7 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler {
    * 10. Update 'table' config names to 'fromJobConfig' and 'toJobConfig'.
    *     Also update the relevant inputs as well.
    * @param conn
+   * @param hdfsConnectorId
    */
   // NOTE: This upgrade code happened before the SQOOP-1498 renaming, hence it uses the form/connection
   // tables instead of the latest config/link tables
@@ -558,6 +580,74 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler {
         "toJobConfig", "toJobConfig", Direction.TO.toString());
   }
 
+  // NOTE: This upgrade code happened after the SQOOP-1498 renaming, hence it
+  // uses the configurable and config
+  @Deprecated
+  private void updateDriverConfigInput(Connection conn, long driverId) {
+
+    // update configs and inputs for driver
+    // update the name from throttling ==> throttlingConfig config and associate
+    // it with the driverId
+    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER, conn,
+        "throttlingConfig", "throttling");
+
+    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER, conn,
+        driverId, "throttlingConfig");
+
+    // nuke security.maxConnections
+    runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER, conn,
+        "security.maxConnections");
+
+    // nuke the security config since 1.99.3 we do not use it
+    runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER, conn,
+        "security");
+
+    // update throttling.extractors ==> throttlingConfig.numExtractors
+    runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn,
+         "throttlingConfig.numExtractors", "throttling.extractors");
+
+   // update throttling.loaders ==> throttlingConfig.numLoaders
+    runQuery(QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER, conn,
+         "throttlingConfig.numLoaders", "throttling.loaders");
+
+  }
+
+  /**
+   * Pre-register Driver since the 1.99.3 release NOTE: This should be used only
+   * in the upgrade path
+   */
+  @Deprecated
+  protected long registerDriver(Connection conn) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Begin Driver loading.");
+    }
+
+    PreparedStatement baseDriverStmt = null;
+    try {
+      baseDriverStmt = conn.prepareStatement(STMT_INSERT_INTO_CONFIGURABLE,
+          Statement.RETURN_GENERATED_KEYS);
+      baseDriverStmt.setString(1, MDriver.DRIVER_NAME);
+      baseDriverStmt.setString(2, Driver.getClassName());
+      baseDriverStmt.setString(3, "1");
+      baseDriverStmt.setString(4, MConfigurableType.DRIVER.name());
+
+      int baseDriverCount = baseDriverStmt.executeUpdate();
+      if (baseDriverCount != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0003, Integer.toString(baseDriverCount));
+      }
+
+      ResultSet rsetDriverId = baseDriverStmt.getGeneratedKeys();
+
+      if (!rsetDriverId.next()) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
+      }
+      return rsetDriverId.getLong(1);
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0009, ex);
+    } finally {
+      closeStatements(baseDriverStmt);
+    }
+  }
 
   /**
    * Pre-register HDFS Connector so that config upgrade will work.
@@ -582,10 +672,10 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler
{
       if (handler.getConnectorConfigurable().getPersistenceId() != -1) {
         return handler.getConnectorConfigurable().getPersistenceId();
       }
-
+      PreparedStatement baseConnectorStmt = null;
       if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
         try {
-          PreparedStatement baseConnectorStmt = conn.prepareStatement(
+          baseConnectorStmt = conn.prepareStatement(
               STMT_INSERT_INTO_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
               Statement.RETURN_GENERATED_KEYS);
           baseConnectorStmt.setString(1, handler.getConnectorConfigurable().getUniqueName());
@@ -602,6 +692,8 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler {
           }
         } catch (SQLException e) {
           throw new SqoopException(DerbyRepoError.DERBYREPO_0004);
+        } finally {
+          closeStatements(baseConnectorStmt);
         }
 
         break;
@@ -660,7 +752,6 @@ public class DerbyRepositoryHandler extends CommonRepositoryHandler {
     }
   }
 
-
   /**
    * We are creating the LINK FORM for HDFS and later it the schema will
    * be renamed to LINK CONFIG

http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java
b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java
index 51024c8..fb48daf 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaUpgradeQuery.java
@@ -200,6 +200,32 @@ public final class DerbySchemaUpgradeQuery {
       "UPDATE " + TABLE_SQ_FORM + " SET "
         + COLUMN_SQF_DIRECTION + "= NULL"
         + " WHERE " + COLUMN_SQF_NAME + "= ?";
+
+   /** Intended for force driver creation and its related upgrades*/
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_NAME_FOR_DRIVER =
+      "UPDATE " + TABLE_SQ_CONFIG + " SET "
+        + COLUMN_SQ_CFG_NAME + "= ?"
+        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_CONFIGURABLE_ID_FOR_DRIVER =
+      "UPDATE " + TABLE_SQ_CONFIG + " SET "
+        + COLUMN_SQ_CFG_CONFIGURABLE + "= ?"
+        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_REMOVE_SECURITY_CONFIG_FOR_DRIVER
=
+      "DELETE FROM " + TABLE_SQ_CONFIG
+        + " WHERE " + COLUMN_SQ_CFG_NAME + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_REMOVE_SECURITY_CONFIG_INPUT_FOR_DRIVER
=
+      "DELETE FROM " + TABLE_SQ_INPUT
+        + " WHERE " + COLUMN_SQI_NAME + "= ?";
+
+  public static final String QUERY_UPGRADE_TABLE_SQ_INPUT_UPDATE_CONFIG_INPUT_FOR_DRIVER
=
+      "UPDATE " + TABLE_SQ_INPUT + " SET "
+          + COLUMN_SQI_NAME + "= ?"
+          + " WHERE " + COLUMN_SQI_NAME + "= ?";
+
   /**
    * Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT
    * throttling form, to IMPORT throttling form.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/272fc2f8/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java b/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java
index f117411..ba88ddd 100644
--- a/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java
+++ b/tools/src/main/java/org/apache/sqoop/tools/tool/UpgradeTool.java
@@ -38,12 +38,12 @@ public class UpgradeTool extends ConfiguredTool {
       LOG.info("Initializing the RepositoryManager with immutable option turned off.");
       RepositoryManager.getInstance().initialize(false);
 
-      LOG.info("Initializing the Driver with upgrade option turned on.");
-      Driver.getInstance().initialize(true);
-
       LOG.info("Initializing the Connection Manager with upgrade option turned on.");
       ConnectorManager.getInstance().initialize(true);
 
+      LOG.info("Initializing the Driver with upgrade option turned on.");
+      Driver.getInstance().initialize(true);
+
       LOG.info("Upgrade completed successfully.");
 
       LOG.info("Tearing all managers down.");


Mime
View raw message