sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1196272 [2/4] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/manager/ org/apache/sqoop/manager/
Date Tue, 01 Nov 2011 21:01:11 GMT
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1196272&r1=1196271&r2=1196272&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Tue Nov  1 21:01:09 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,865 +18,30 @@
 
 package com.cloudera.sqoop.manager;
 
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.SqoopOptions.UpdateMode;
-import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
-import com.cloudera.sqoop.mapreduce.JdbcExportJob;
-import com.cloudera.sqoop.mapreduce.JdbcUpsertExportJob;
-import com.cloudera.sqoop.mapreduce.OracleUpsertOutputFormat;
-import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
-import com.cloudera.sqoop.util.ExportException;
-import com.cloudera.sqoop.util.ImportException;
 
 /**
- * Manages connections to Oracle databases.
- * Requires the Oracle JDBC driver.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class OracleManager extends GenericJdbcManager {
-
-  public static final Log LOG = LogFactory.getLog(
-      OracleManager.class.getName());
+public class OracleManager
+    extends org.apache.sqoop.manager.OracleManager {
 
-  /**
-   * ORA-00942: Table or view does not exist. Indicates that the user does
-   * not have permissions.
-   */
-  public static final int ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST = 942;
-
-  /**
-   * This is a catalog view query to list the databases. For Oracle we map the
-   * concept of a database to a schema, and a schema is identified by a user.
-   * In order for the catalog view DBA_USERS be visible to the user who executes
-   * this query, they must have the DBA privilege.
-   */
+  public static final int ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST =
+    org.apache.sqoop.manager.OracleManager.ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST;
   public static final String QUERY_LIST_DATABASES =
-    "SELECT USERNAME FROM DBA_USERS";
-
-  /**
-   * Query to list all tables visible to the current user. Note that this list
-   * does not identify the table owners which is required in order to
-   * ensure that the table can be operated on for import/export purposes.
-   */
+    org.apache.sqoop.manager.OracleManager.QUERY_LIST_DATABASES;
   public static final String QUERY_LIST_TABLES =
-    "SELECT TABLE_NAME FROM ALL_TABLES";
-
-  /**
-   * Query to list all columns of the given table. Even if the user has the
-   * privileges to access table objects from another schema, this query will
-   * limit it to explore tables only from within the active schema.
-   */
+    org.apache.sqoop.manager.OracleManager.QUERY_LIST_TABLES;
   public static final String QUERY_COLUMNS_FOR_TABLE =
-          "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE "
-        + "OWNER = ? AND TABLE_NAME = ?";
-
-  /**
-   * Query to find the primary key column name for a given table. This query
-   * is restricted to the current schema.
-   */
+    org.apache.sqoop.manager.OracleManager.QUERY_COLUMNS_FOR_TABLE;
   public static final String QUERY_PRIMARY_KEY_FOR_TABLE =
-    "SELECT ALL_CONS_COLUMNS.COLUMN_NAME FROM ALL_CONS_COLUMNS, "
-     + "ALL_CONSTRAINTS WHERE ALL_CONS_COLUMNS.CONSTRAINT_NAME = "
-     + "ALL_CONSTRAINTS.CONSTRAINT_NAME AND "
-     + "ALL_CONSTRAINTS.CONSTRAINT_TYPE = 'P' AND "
-     + "ALL_CONS_COLUMNS.TABLE_NAME = ? AND "
-     + "ALL_CONS_COLUMNS.OWNER = ?";
-
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
-
-  // Configuration key to use to set the session timezone.
-  public static final String ORACLE_TIMEZONE_KEY = "oracle.sessionTimeZone";
-
-  // Oracle XE does a poor job of releasing server-side resources for
-  // closed connections. So we actually want to cache connections as
-  // much as possible. This is especially important for JUnit tests which
-  // may need to make 60 or more connections (serially), since each test
-  // uses a different OracleManager instance.
-  private static class ConnCache {
-
-    public static final Log LOG = LogFactory.getLog(ConnCache.class.getName());
-
-    private static class CacheKey {
-      private final String connectString;
-      private final String username;
-
-      public CacheKey(String connect, String user) {
-        this.connectString = connect;
-        this.username = user; // note: may be null.
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (o instanceof CacheKey) {
-          CacheKey k = (CacheKey) o;
-          if (null == username) {
-            return k.username == null && k.connectString.equals(connectString);
-          } else {
-            return k.username.equals(username)
-                && k.connectString.equals(connectString);
-          }
-        } else {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        if (null == username) {
-          return connectString.hashCode();
-        } else {
-          return username.hashCode() ^ connectString.hashCode();
-        }
-      }
-
-      @Override
-      public String toString() {
-        return connectString + "/" + username;
-      }
-    }
-
-    private Map<CacheKey, Connection> connectionMap;
-
-    public ConnCache() {
-      LOG.debug("Instantiated new connection cache.");
-      connectionMap = new HashMap<CacheKey, Connection>();
-    }
-
-    /**
-     * @return a Connection instance that can be used to connect to the
-     * given database, if a previously-opened connection is available in
-     * the cache. Returns null if none is available in the map.
-     */
-    public synchronized Connection getConnection(String connectStr,
-        String username) throws SQLException {
-      CacheKey key = new CacheKey(connectStr, username);
-      Connection cached = connectionMap.get(key);
-      if (null != cached) {
-        connectionMap.remove(key);
-        if (cached.isReadOnly()) {
-          // Read-only mode? Don't want it.
-          cached.close();
-        }
-
-        if (cached.isClosed()) {
-          // This connection isn't usable.
-          return null;
-        }
-
-        cached.rollback(); // Reset any transaction state.
-        cached.clearWarnings();
-
-        LOG.debug("Got cached connection for " + key);
-      }
-
-      return cached;
-    }
-
-    /**
-     * Returns a connection to the cache pool for future use. If a connection
-     * is already cached for the connectstring/username pair, then this
-     * connection is closed and discarded.
-     */
-    public synchronized void recycle(String connectStr, String username,
-        Connection conn) throws SQLException {
-
-      CacheKey key = new CacheKey(connectStr, username);
-      Connection existing = connectionMap.get(key);
-      if (null != existing) {
-        // Cache is already full for this entry.
-        LOG.debug("Discarding additional connection for " + key);
-        conn.close();
-        return;
-      }
-
-      // Put it in the map for later use.
-      LOG.debug("Caching released connection for " + key);
-      connectionMap.put(key, conn);
-    }
-
-    @Override
-    protected synchronized void finalize() throws Throwable {
-      for (Connection c : connectionMap.values()) {
-        c.close();
-      }
-
-      super.finalize();
-    }
-  }
-
-  private static final ConnCache CACHE;
-  static {
-    CACHE = new ConnCache();
-  }
+    org.apache.sqoop.manager.OracleManager.QUERY_PRIMARY_KEY_FOR_TABLE;
+  public static final String ORACLE_TIMEZONE_KEY =
+    org.apache.sqoop.manager.OracleManager.ORACLE_TIMEZONE_KEY;
 
   public OracleManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
+    super(opts);
   }
 
-  public void close() throws SQLException {
-    release(); // Release any open statements associated with the connection.
-    if (hasOpenConnection()) {
-      // Release our open connection back to the cache.
-      CACHE.recycle(options.getConnectString(), options.getUsername(),
-          getConnection());
-      discardConnection(false);
-    }
-  }
-
-  protected String getColNamesQuery(String tableName) {
-    // SqlManager uses "tableName AS t" which doesn't work in Oracle.
-    String query =  "SELECT t.* FROM " + escapeTableName(tableName)
-            + " t WHERE 1=0";
-
-    LOG.debug("Using column names query: " + query);
-    return query;
-  }
-
-  /**
-   * Create a connection to the database; usually used only from within
-   * getConnection(), which enforces a singleton guarantee around the
-   * Connection object.
-   *
-   * Oracle-specific driver uses READ_COMMITTED which is the weakest
-   * semantics Oracle supports.
-   */
-  protected Connection makeConnection() throws SQLException {
-
-    Connection connection;
-    String driverClass = getDriverClass();
-
-    try {
-      Class.forName(driverClass);
-    } catch (ClassNotFoundException cnfe) {
-      throw new RuntimeException("Could not load db driver class: "
-          + driverClass);
-    }
-
-    String username = options.getUsername();
-    String password = options.getPassword();
-    String connectStr = options.getConnectString();
-
-    connection = CACHE.getConnection(connectStr, username);
-    if (null == connection) {
-      // Couldn't pull one from the cache. Get a new one.
-      LOG.debug("Creating a new connection for "
-              + connectStr + ", using username: " + username);
-      Properties connectionParams = options.getConnectionParams();
-      if (connectionParams != null && connectionParams.size() > 0) {
-        LOG.debug("User specified connection params. "
-                  + "Using properties specific API for making connection.");
-
-        Properties props = new Properties();
-        if (username != null) {
-          props.put("user", username);
-        }
-
-        if (password != null) {
-          props.put("password", password);
-        }
-
-        props.putAll(connectionParams);
-        connection = DriverManager.getConnection(connectStr, props);
-      } else {
-        LOG.debug("No connection paramenters specified. "
-                + "Using regular API for making connection.");
-        if (username == null) {
-          connection = DriverManager.getConnection(connectStr);
-        } else {
-          connection = DriverManager.getConnection(
-                              connectStr, username, password);
-        }
-      }
-    }
-
-    // We only use this for metadata queries. Loosest semantics are okay.
-    connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
-
-    // Setting session time zone
-    setSessionTimeZone(connection);
-
-    return connection;
-  }
-
-  /**
-   * Set session time zone.
-   * @param conn      Connection object
-   * @throws          SQLException instance
-   */
-  private void setSessionTimeZone(Connection conn) throws SQLException {
-    // Need to use reflection to call the method setSessionTimeZone on the
-    // OracleConnection class because oracle specific java libraries are not
-    // accessible in this context.
-    Method method;
-    try {
-      method = conn.getClass().getMethod(
-              "setSessionTimeZone", new Class [] {String.class});
-    } catch (Exception ex) {
-      LOG.error("Could not find method setSessionTimeZone in "
-          + conn.getClass().getName(), ex);
-      // rethrow SQLException
-      throw new SQLException(ex);
-    }
-
-    // Need to set the time zone in order for Java to correctly access the
-    // column "TIMESTAMP WITH LOCAL TIME ZONE".  The user may have set this in
-    // the configuration as 'oracle.sessionTimeZone'.
-    String clientTimeZoneStr = options.getConf().get(ORACLE_TIMEZONE_KEY,
-        "GMT");
-    try {
-      method.setAccessible(true);
-      method.invoke(conn, clientTimeZoneStr);
-      LOG.info("Time zone has been set to " + clientTimeZoneStr);
-    } catch (Exception ex) {
-      LOG.warn("Time zone " + clientTimeZoneStr
-               + " could not be set on Oracle database.");
-      LOG.info("Setting default time zone: GMT");
-      try {
-        // Per the documentation at:
-        // http://download-west.oracle.com/docs/cd/B19306_01
-        //     /server.102/b14225/applocaledata.htm#i637736
-        // The "GMT" timezone is guaranteed to exist in the available timezone
-        // regions, whereas others (e.g., "UTC") are not.
-        method.invoke(conn, "GMT");
-      } catch (Exception ex2) {
-        LOG.error("Could not set time zone for oracle connection", ex2);
-        // rethrow SQLException
-        throw new SQLException(ex);
-      }
-    }
-  }
-
-  @Override
-  public void importTable(ImportJobContext context)
-      throws IOException, ImportException {
-    context.setConnManager(this);
-    // Specify the Oracle-specific DBInputFormat for import.
-    context.setInputFormat(OracleDataDrivenDBInputFormat.class);
-    super.importTable(context);
-  }
-
-  /**
-   * Export data stored in HDFS into a table in a database.
-   */
-  public void exportTable(ExportJobContext context)
-      throws IOException, ExportException {
-    context.setConnManager(this);
-    JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-                                  ExportBatchOutputFormat.class);
-    exportJob.runExport();
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public void upsertTable(ExportJobContext context)
-      throws IOException, ExportException {
-    context.setConnManager(this);
-    JdbcUpsertExportJob exportJob =
-      new JdbcUpsertExportJob(context, OracleUpsertOutputFormat.class);
-    exportJob.runExport();
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public void configureDbOutputColumns(SqoopOptions options) {
-    if (options.getUpdateMode() == UpdateMode.UpdateOnly) {
-      super.configureDbOutputColumns(options);
-    } else {
-      // We're in upsert mode. We need to explicitly set
-      // the database output column ordering in the codeGenerator.
-      Set<String> updateKeys = new LinkedHashSet<String>();
-      Set<String> updateKeysUppercase = new HashSet<String>();
-      String updateKeyValue = options.getUpdateKeyCol();
-      StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
-      while (stok.hasMoreTokens()) {
-        String nextUpdateColumn = stok.nextToken().trim();
-        if (nextUpdateColumn.length() > 0) {
-          updateKeys.add(nextUpdateColumn);
-          updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
-        }  else {
-          throw new RuntimeException("Invalid update key column value specified"
-              + ": '" + updateKeyValue + "'");
-        }
-      }
-
-      String [] allColNames = getColumnNames(options.getTableName());
-      List<String> dbOutCols = new ArrayList<String>();
-      dbOutCols.addAll(updateKeys);
-      for (String col : allColNames) {
-        if (!updateKeysUppercase.contains(col.toUpperCase())) {
-          dbOutCols.add(col); // add update columns to the output order list.
-        }
-      }
-      for (String col : allColNames) {
-        dbOutCols.add(col); // add insert columns to the output order list.
-      }
-      options.setDbOutputColumns(dbOutCols.toArray(
-          new String[dbOutCols.size()]));
-    }
-  }
-
-  @Override
-  public ResultSet readTable(String tableName, String[] columns)
-      throws SQLException {
-    if (columns == null) {
-      columns = getColumnNames(tableName);
-    }
-
-    StringBuilder sb = new StringBuilder();
-    sb.append("SELECT ");
-    boolean first = true;
-    for (String col : columns) {
-      if (!first) {
-        sb.append(", ");
-      }
-      sb.append(escapeColName(col));
-      first = false;
-    }
-    sb.append(" FROM ");
-    sb.append(escapeTableName(tableName));
-
-    String sqlCmd = sb.toString();
-    LOG.debug("Reading table with command: " + sqlCmd);
-    return execute(sqlCmd);
-  }
-
-  /**
-   * Resolve a database-specific type to the Java type that should contain it.
-   * @param sqlType
-   * @return the name of a Java type to hold the sql datatype, or null if none.
-   */
-  public String toJavaType(int sqlType) {
-    String defaultJavaType = super.toJavaType(sqlType);
-    return (defaultJavaType == null) ? dbToJavaType(sqlType) : defaultJavaType;
-  }
-
-  /**
-   * Attempt to map sql type to java type.
-   * @param sqlType     sql type
-   * @return            java type
-   */
-  private String dbToJavaType(int sqlType) {
-    // load class oracle.jdbc.OracleTypes
-    // need to use reflection because oracle specific libraries
-    // are not accessible in this context
-    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
-
-    // check if it is TIMESTAMPTZ
-    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
-    if (sqlType == dbType) {
-      return "java.sql.Timestamp";
-    }
-
-    // check if it is TIMESTAMPLTZ
-    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
-    if (sqlType == dbType) {
-      return "java.sql.Timestamp";
-    }
-
-    // return null if no java type was found for sqlType
-    return null;
-  }
-
-  /**
-   * Attempt to map sql type to hive type.
-   * @param sqlType     sql data type
-   * @return            hive data type
-   */
-  public String toHiveType(int sqlType) {
-    String defaultHiveType = super.toHiveType(sqlType);
-    return (defaultHiveType == null) ? dbToHiveType(sqlType) : defaultHiveType;
-  }
-
-  /**
-   * Resolve a database-specific type to Hive type.
-   * @param sqlType     sql type
-   * @return            hive type
-   */
-  private String dbToHiveType(int sqlType) {
-    // load class oracle.jdbc.OracleTypes
-    // need to use reflection because oracle specific libraries
-    // are not accessible in this context
-    Class typeClass = getTypeClass("oracle.jdbc.OracleTypes");
-
-    // check if it is TIMESTAMPTZ
-    int dbType = getDatabaseType(typeClass, "TIMESTAMPTZ");
-    if (sqlType == dbType) {
-      return "STRING";
-    }
-
-    // check if it is TIMESTAMPLTZ
-    dbType = getDatabaseType(typeClass, "TIMESTAMPLTZ");
-    if (sqlType == dbType) {
-      return "STRING";
-    }
-
-    // return null if no hive type was found for sqlType
-    return null;
-  }
-
-  /**
-   * Get database type.
-   * @param clazz         oracle class representing sql types
-   * @param fieldName     field name
-   * @return              value of database type constant
-   */
-  private int getDatabaseType(Class clazz, String fieldName) {
-    // Need to use reflection to extract constant values because the database
-    // specific java libraries are not accessible in this context.
-    int value = -1;
-    try {
-      java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
-      value = field.getInt(null);
-    } catch (NoSuchFieldException ex) {
-      LOG.error("Could not retrieve value for field " + fieldName, ex);
-    } catch (IllegalAccessException ex) {
-      LOG.error("Could not retrieve value for field " + fieldName, ex);
-    }
-    return value;
-  }
-
-  /**
-   * Load class by name.
-   * @param className     class name
-   * @return              class instance
-   */
-  private Class getTypeClass(String className) {
-    // Need to use reflection to load class because the database specific java
-    // libraries are not accessible in this context.
-    Class typeClass = null;
-    try {
-      typeClass = Class.forName(className);
-    } catch (ClassNotFoundException ex) {
-      LOG.error("Could not load class " + className, ex);
-    }
-    return typeClass;
-  }
-
-  @Override
-  protected void finalize() throws Throwable {
-    close();
-    super.finalize();
-  }
-
-  @Override
-  protected String getCurTimestampQuery() {
-    return "SELECT SYSDATE FROM dual";
-  }
-
-  @Override
-  public String timestampToQueryString(Timestamp ts) {
-    return "TO_TIMESTAMP('" + ts + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
-  }
-
-  @Override
-  public String datetimeToQueryString(String datetime, int columnType) {
-    if (columnType == Types.TIMESTAMP) {
-      return "TO_TIMESTAMP('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS.FF')";
-    } else if (columnType == Types.DATE) {
-      return "TO_DATE('" + datetime + "', 'YYYY-MM-DD HH24:MI:SS')";
-    } else {
-      String msg = "Column type is neither timestamp nor date!";
-      LOG.error(msg);
-      throw new RuntimeException(msg);
-    }
-  }
-
-  @Override
-  public boolean supportsStagingForExport() {
-    return true;
-  }
-
-  /**
-   * The concept of database in Oracle is mapped to schemas. Each schema
-   * is identified by the corresponding username.
-   */
-  @Override
-  public String[] listDatabases() {
-    Connection conn = null;
-    Statement stmt = null;
-    ResultSet rset = null;
-    List<String> databases = new ArrayList<String>();
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
-              ResultSet.CONCUR_READ_ONLY);
-      rset = stmt.executeQuery(QUERY_LIST_DATABASES);
-
-      while (rset.next()) {
-        databases.add(rset.getString(1));
-      }
-      conn.commit();
-    } catch (SQLException e) {
-      try {
-        conn.rollback();
-      } catch (Exception ex) {
-        LOG.error("Failed to rollback transaction", ex);
-      }
-
-      if (e.getErrorCode() == ERROR_TABLE_OR_VIEW_DOES_NOT_EXIST) {
-        LOG.error("The catalog view DBA_USERS was not found. "
-            + "This may happen if the user does not have DBA privileges. "
-            + "Please check privileges and try again.");
-        LOG.debug("Full trace for ORA-00942 exception", e);
-      } else {
-        LOG.error("Failed to list databases", e);
-      }
-    } finally {
-      if (rset != null) {
-        try {
-          rset.close();
-        } catch (SQLException ex) {
-          LOG.error("Failed to close resultset", ex);
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (Exception ex) {
-          LOG.error("Failed to close statement", ex);
-        }
-      }
-
-      try {
-        close();
-      } catch (SQLException ex) {
-        LOG.error("Unable to discard connection", ex);
-      }
-    }
-
-    return databases.toArray(new String[databases.size()]);
-  }
-
-  @Override
-  public String[] listTables() {
-    Connection conn = null;
-    Statement stmt = null;
-    ResultSet rset = null;
-    List<String> tables = new ArrayList<String>();
-
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,
-              ResultSet.CONCUR_READ_ONLY);
-      rset = stmt.executeQuery(QUERY_LIST_TABLES);
-
-      while (rset.next()) {
-        tables.add(rset.getString(1));
-      }
-      conn.commit();
-    } catch (SQLException e) {
-      try {
-        conn.rollback();
-      } catch (Exception ex) {
-        LOG.error("Failed to rollback transaction", ex);
-      }
-      LOG.error("Failed to list tables", e);
-    } finally {
-      if (rset != null) {
-        try {
-          rset.close();
-        } catch (SQLException ex) {
-          LOG.error("Failed to close resultset", ex);
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (Exception ex) {
-          LOG.error("Failed to close statement", ex);
-        }
-      }
-
-      try {
-        close();
-      } catch (SQLException ex) {
-        LOG.error("Unable to discard connection", ex);
-      }
-    }
-
-    return tables.toArray(new String[tables.size()]);
-  }
-
-  @Override
-  public String[] getColumnNames(String tableName) {
-    Connection conn = null;
-    PreparedStatement pStmt = null;
-    ResultSet rset = null;
-    List<String> columns = new ArrayList<String>();
-
-    String tableOwner = this.options.getUsername();
-    String shortTableName = tableName;
-    int qualifierIndex = tableName.indexOf('.');
-    if (qualifierIndex != -1) {
-      tableOwner = tableName.substring(0, qualifierIndex);
-      shortTableName = tableName.substring(qualifierIndex + 1);
-    }
-
-    try {
-      conn = getConnection();
-
-      pStmt = conn.prepareStatement(QUERY_COLUMNS_FOR_TABLE,
-                  ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-
-      pStmt.setString(1, tableOwner);
-
-      pStmt.setString(2, shortTableName);
-      rset = pStmt.executeQuery();
-
-      while (rset.next()) {
-        columns.add(rset.getString(1));
-      }
-      conn.commit();
-    } catch (SQLException e) {
-      try {
-        conn.rollback();
-      } catch (Exception ex) {
-        LOG.error("Failed to rollback transaction", ex);
-      }
-      LOG.error("Failed to list columns", e);
-    } finally {
-      if (rset != null) {
-        try {
-          rset.close();
-        } catch (SQLException ex) {
-          LOG.error("Failed to close resultset", ex);
-        }
-      }
-      if (pStmt != null) {
-        try {
-          pStmt.close();
-        } catch (Exception ex) {
-          LOG.error("Failed to close statement", ex);
-        }
-      }
-
-      try {
-        close();
-      } catch (SQLException ex) {
-        LOG.error("Unable to discard connection", ex);
-      }
-    }
-
-    return columns.toArray(new String[columns.size()]);
-  }
-
-  @Override
-  public String getPrimaryKey(String tableName) {
-    Connection conn = null;
-    PreparedStatement pStmt = null;
-    ResultSet rset = null;
-    List<String> columns = new ArrayList<String>();
-
-    String tableOwner = this.options.getUsername();
-    String shortTableName = tableName;
-    int qualifierIndex = tableName.indexOf('.');
-    if (qualifierIndex != -1) {
-      tableOwner = tableName.substring(0, qualifierIndex);
-      shortTableName = tableName.substring(qualifierIndex + 1);
-    }
-
-    try {
-      conn = getConnection();
-
-      pStmt = conn.prepareStatement(QUERY_PRIMARY_KEY_FOR_TABLE,
-                  ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-      pStmt.setString(1, shortTableName);
-      pStmt.setString(2, tableOwner);
-      rset = pStmt.executeQuery();
-
-      while (rset.next()) {
-        columns.add(rset.getString(1));
-      }
-      conn.commit();
-    } catch (SQLException e) {
-      try {
-        conn.rollback();
-      } catch (Exception ex) {
-        LOG.error("Failed to rollback transaction", ex);
-      }
-      LOG.error("Failed to list columns", e);
-    } finally {
-      if (rset != null) {
-        try {
-          rset.close();
-        } catch (SQLException ex) {
-          LOG.error("Failed to close resultset", ex);
-        }
-      }
-      if (pStmt != null) {
-        try {
-          pStmt.close();
-        } catch (Exception ex) {
-          LOG.error("Failed to close statement", ex);
-        }
-      }
-
-      try {
-        close();
-      } catch (SQLException ex) {
-        LOG.error("Unable to discard connection", ex);
-      }
-    }
-
-    if (columns.size() == 0) {
-      // Table has no primary key
-      return null;
-    }
-
-    if (columns.size() > 1) {
-      // The primary key is multi-column primary key. Warn the user.
-      // TODO select the appropriate column instead of the first column based
-      // on the datatype - giving preference to numerics over other types.
-      LOG.warn("The table " + tableName + " "
-          + "contains a multi-column primary key. Sqoop will default to "
-          + "the column " + columns.get(0) + " only for this job.");
-    }
-
-    return columns.get(0);
-  }
-
-  @Override
-  public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
-      /*
-       * The default input bounds query generated by DataDrivenImportJob
-       * is of the form:
-       *  SELECT MIN(splitByCol), MAX(splitByCol) FROM (sanitizedQuery) AS t1
-       *
-       * This works for most databases but not Oracle since Oracle does not
-       * allow the use of "AS" to project the subquery as a table. Instead the
-       * correct format for use with Oracle is as follows:
-       *  SELECT MIN(splitByCol), MAX(splitByCol) FROM (sanitizedQuery) t1
-       */
-      return "SELECT MIN(" + splitByCol + "), MAX(" + splitByCol + ") FROM ("
-                   + sanitizedQuery + ") t1";
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java?rev=1196272&r1=1196271&r2=1196272&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/PostgresqlManager.java Tue Nov  1 21:01:09 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -155,5 +153,6 @@ public class PostgresqlManager extends C
   protected String getCurTimestampQuery() {
     return "SELECT CURRENT_TIMESTAMP";
   }
+
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java?rev=1196272&r1=1196271&r2=1196272&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java Tue Nov  1 21:01:09 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,63 +18,16 @@
 
 package com.cloudera.sqoop.manager;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
-import com.cloudera.sqoop.mapreduce.JdbcExportJob;
-import com.cloudera.sqoop.util.ExportException;
 
 /**
- * Manages connections to SQLServer databases. Requires the SQLServer JDBC
- * driver.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class SQLServerManager extends InformationSchemaManager {
-
-  public static final Log LOG = LogFactory.getLog(
-      SQLServerManager.class.getName());
-
-  // driver class to ensure is loaded when making db connection.
-  private static final String DRIVER_CLASS =
-      "com.microsoft.sqlserver.jdbc.SQLServerDriver";
+public class SQLServerManager
+    extends org.apache.sqoop.manager.SQLServerManager {
 
   public SQLServerManager(final SqoopOptions opts) {
-    super(DRIVER_CLASS, opts);
-  }
-
-  /**
-   * Export data stored in HDFS into a table in a database.
-   */
-  @Override
-  public void exportTable(ExportJobContext context)
-      throws IOException, ExportException {
-    context.setConnManager(this);
-    JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-      ExportBatchOutputFormat.class);
-    exportJob.runExport();
-  }
-
-  /**
-   * SQLServer does not support the CURRENT_TIMESTAMP() function. Instead
-   * it has the notion of keyword CURRENT_TIMESTAMP that resolves to the
-   * current time stamp for the database system.
-   */
-  @Override
-  public String getCurTimestampQuery() {
-      return "SELECT CURRENT_TIMESTAMP";
-  }
-
-  @Override
-  protected String getListDatabasesQuery() {
-    return "SELECT NAME FROM SYS.DATABASES";
-  }
-
-  @Override
-  protected String getSchemaQuery() {
-    return "SELECT SCHEMA_NAME()";
+    super(opts);
   }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java?rev=1196272&r1=1196271&r2=1196272&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SqlManager.java Tue Nov  1 21:01:09 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,872 +18,19 @@
 
 package com.cloudera.sqoop.manager;
 
-import java.sql.Timestamp;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.hbase.HBaseUtil;
-import com.cloudera.sqoop.hive.HiveTypes;
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
-import com.cloudera.sqoop.mapreduce.DataDrivenImportJob;
-import com.cloudera.sqoop.mapreduce.HBaseImportJob;
-import com.cloudera.sqoop.mapreduce.ImportJobBase;
-import com.cloudera.sqoop.mapreduce.JdbcExportJob;
-import com.cloudera.sqoop.mapreduce.JdbcUpdateExportJob;
-import com.cloudera.sqoop.util.ExportException;
-import com.cloudera.sqoop.util.ImportException;
-import com.cloudera.sqoop.util.ResultSetPrinter;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.Statement;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.util.StringUtils;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 
 /**
- * ConnManager implementation for generic SQL-compliant database.
- * This is an abstract class; it requires a database-specific
- * ConnManager implementation to actually create the connection.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public abstract class SqlManager extends ConnManager {
+public abstract class SqlManager
+    extends org.apache.sqoop.manager.SqlManager {
 
-  public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
-
-  /** Substring that must appear in free-form queries submitted by users.
-   * This is the string '$CONDITIONS'.
-   */
   public static final String SUBSTITUTE_TOKEN =
-      DataDrivenDBInputFormat.SUBSTITUTE_TOKEN;
-
-  protected static final int DEFAULT_FETCH_SIZE = 1000;
+          org.apache.sqoop.manager.SqlManager.SUBSTITUTE_TOKEN;
 
-  protected SqoopOptions options;
-  private Statement lastStatement;
-
-  /**
-   * Constructs the SqlManager.
-   * @param opts the SqoopOptions describing the user's requested action.
-   */
   public SqlManager(final SqoopOptions opts) {
-    this.options = opts;
-    initOptionDefaults();
-  }
-
-  /**
-   * Sets default values for values that were not provided by the user.
-   * Only options with database-specific defaults should be configured here.
-   */
-  protected void initOptionDefaults() {
-    if (options.getFetchSize() == null) {
-      LOG.info("Using default fetchSize of " + DEFAULT_FETCH_SIZE);
-      options.setFetchSize(DEFAULT_FETCH_SIZE);
-    }
-  }
-
-  /**
-   * @return the SQL query to use in getColumnNames() in case this logic must
-   * be tuned per-database, but the main extraction loop is still inheritable.
-   */
-  protected String getColNamesQuery(String tableName) {
-    // adding where clause to prevent loading a big table
-    return "SELECT t.* FROM " + escapeTableName(tableName) + " AS t WHERE 1=0";
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public String[] getColumnNames(String tableName) {
-    String stmt = getColNamesQuery(tableName);
-    return getColumnNamesForRawQuery(stmt);
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public String [] getColumnNamesForQuery(String query) {
-    String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) ");
-    return getColumnNamesForRawQuery(rawQuery);
-  }
-
-  /**
-   * Get column names for a query statement that we do not modify further.
-   */
-  public String[] getColumnNamesForRawQuery(String stmt) {
-    ResultSet results;
-    try {
-      results = execute(stmt);
-    } catch (SQLException sqlE) {
-      LOG.error("Error executing statement: " + sqlE.toString(), sqlE);
-      release();
-      return null;
-    }
-
-    try {
-      int cols = results.getMetaData().getColumnCount();
-      ArrayList<String> columns = new ArrayList<String>();
-      ResultSetMetaData metadata = results.getMetaData();
-      for (int i = 1; i < cols + 1; i++) {
-        String colName = metadata.getColumnName(i);
-        if (colName == null || colName.equals("")) {
-          colName = metadata.getColumnLabel(i);
-          if (null == colName) {
-            colName = "_RESULT_" + i;
-          }
-        }
-        columns.add(colName);
-      }
-      return columns.toArray(new String[0]);
-    } catch (SQLException sqlException) {
-      LOG.error("Error reading from database: "
-          + sqlException.toString(), sqlException);
-      return null;
-    } finally {
-      try {
-        results.close();
-        getConnection().commit();
-      } catch (SQLException sqlE) {
-        LOG.warn("SQLException closing ResultSet: " + sqlE.toString(), sqlE);
-      }
-
-      release();
-    }
-  }
-
-  /**
-   * @return the SQL query to use in getColumnTypes() in case this logic must
-   * be tuned per-database, but the main extraction loop is still inheritable.
-   */
-  protected String getColTypesQuery(String tableName) {
-    return getColNamesQuery(tableName);
-  }
-
-  @Override
-  public Map<String, Integer> getColumnTypes(String tableName) {
-    String stmt = getColTypesQuery(tableName);
-    return getColumnTypesForRawQuery(stmt);
-  }
-
-  @Override
-  public Map<String, Integer> getColumnTypesForQuery(String query) {
-    // Manipulate the query to return immediately, with zero rows.
-    String rawQuery = query.replace(SUBSTITUTE_TOKEN, " (1 = 0) ");
-    return getColumnTypesForRawQuery(rawQuery);
-  }
-
-  /**
-   * Get column types for a query statement that we do not modify further.
-   */
-  protected Map<String, Integer> getColumnTypesForRawQuery(String stmt) {
-    ResultSet results;
-    try {
-      results = execute(stmt);
-    } catch (SQLException sqlE) {
-      LOG.error("Error executing statement: " + sqlE.toString(), sqlE);
-      release();
-      return null;
-    }
-
-    try {
-      Map<String, Integer> colTypes = new HashMap<String, Integer>();
-
-      int cols = results.getMetaData().getColumnCount();
-      ResultSetMetaData metadata = results.getMetaData();
-      for (int i = 1; i < cols + 1; i++) {
-        int typeId = metadata.getColumnType(i);
-        // If we have an unsigned int we need to make extra room by
-        // plopping it into a bigint
-        if (typeId == Types.INTEGER &&  !metadata.isSigned(i)){
-            typeId = Types.BIGINT;
-        }
-
-        String colName = metadata.getColumnName(i);
-        if (colName == null || colName.equals("")) {
-          colName = metadata.getColumnLabel(i);
-        }
-
-        colTypes.put(colName, Integer.valueOf(typeId));
-      }
-
-      return colTypes;
-    } catch (SQLException sqlException) {
-      LOG.error("Error reading from database: " + sqlException.toString());
-      return null;
-    } finally {
-      try {
-        results.close();
-        getConnection().commit();
-      } catch (SQLException sqlE) {
-        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
-      }
-
-      release();
-    }
-  }
-
-  @Override
-  public ResultSet readTable(String tableName, String[] columns)
-      throws SQLException {
-    if (columns == null) {
-      columns = getColumnNames(tableName);
-    }
-
-    StringBuilder sb = new StringBuilder();
-    sb.append("SELECT ");
-    boolean first = true;
-    for (String col : columns) {
-      if (!first) {
-        sb.append(", ");
-      }
-      sb.append(escapeColName(col));
-      first = false;
-    }
-    sb.append(" FROM ");
-    sb.append(escapeTableName(tableName));
-    sb.append(" AS ");   // needed for hsqldb; doesn't hurt anyone else.
-    sb.append(escapeTableName(tableName));
-
-    String sqlCmd = sb.toString();
-    LOG.debug("Reading table with command: " + sqlCmd);
-    return execute(sqlCmd);
-  }
-
-  @Override
-  public String[] listDatabases() {
-    // TODO(aaron): Implement this!
-    LOG.error("Generic SqlManager.listDatabases() not implemented.");
-    return null;
-  }
-
-  @Override
-  public String[] listTables() {
-    ResultSet results = null;
-    String [] tableTypes = {"TABLE"};
-    try {
-      try {
-        DatabaseMetaData metaData = this.getConnection().getMetaData();
-        results = metaData.getTables(null, null, null, tableTypes);
-      } catch (SQLException sqlException) {
-        LOG.error("Error reading database metadata: "
-            + sqlException.toString());
-        return null;
-      }
-
-      if (null == results) {
-        return null;
-      }
-
-      try {
-        ArrayList<String> tables = new ArrayList<String>();
-        while (results.next()) {
-          String tableName = results.getString("TABLE_NAME");
-          tables.add(tableName);
-        }
-
-        return tables.toArray(new String[0]);
-      } catch (SQLException sqlException) {
-        LOG.error("Error reading from database: " + sqlException.toString());
-        return null;
-      }
-    } finally {
-      if (null != results) {
-        try {
-          results.close();
-          getConnection().commit();
-        } catch (SQLException sqlE) {
-          LOG.warn("Exception closing ResultSet: " + sqlE.toString());
-        }
-      }
-    }
-  }
-
-  @Override
-  public String getPrimaryKey(String tableName) {
-    try {
-      DatabaseMetaData metaData = this.getConnection().getMetaData();
-      ResultSet results = metaData.getPrimaryKeys(null, null, tableName);
-      if (null == results) {
-        return null;
-      }
-
-      try {
-        if (results.next()) {
-          return results.getString("COLUMN_NAME");
-        } else {
-          return null;
-        }
-      } finally {
-        results.close();
-        getConnection().commit();
-      }
-    } catch (SQLException sqlException) {
-      LOG.error("Error reading primary key metadata: "
-          + sqlException.toString());
-      return null;
-    }
+    super(opts);
   }
 
-  /**
-   * Retrieve the actual connection from the outer ConnManager.
-   */
-  public abstract Connection getConnection() throws SQLException;
-
-  /**
-   * Determine what column to use to split the table.
-   * @param opts the SqoopOptions controlling this import.
-   * @param tableName the table to import.
-   * @return the splitting column, if one is set or inferrable, or null
-   * otherwise.
-   */
-  protected String getSplitColumn(SqoopOptions opts, String tableName) {
-    String splitCol = opts.getSplitByCol();
-    if (null == splitCol && null != tableName) {
-      // If the user didn't specify a splitting column, try to infer one.
-      splitCol = getPrimaryKey(tableName);
-    }
-
-    return splitCol;
-  }
-
-  /**
-   * Offers the ConnManager an opportunity to validate that the
-   * options specified in the ImportJobContext are valid.
-   * @throws ImportException if the import is misconfigured.
-   */
-  protected void checkTableImportOptions(ImportJobContext context)
-      throws IOException, ImportException {
-    String tableName = context.getTableName();
-    SqoopOptions opts = context.getOptions();
-
-    // Default implementation: check that the split column is set
-    // correctly.
-    String splitCol = getSplitColumn(opts, tableName);
-    if (null == splitCol && opts.getNumMappers() > 1) {
-      // Can't infer a primary key.
-      throw new ImportException("No primary key could be found for table "
-          + tableName + ". Please specify one with --split-by or perform "
-          + "a sequential import with '-m 1'.");
-    }
-  }
-
-  /**
-   * Default implementation of importTable() is to launch a MapReduce job
-   * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat.
-   */
-  public void importTable(ImportJobContext context)
-      throws IOException, ImportException {
-    String tableName = context.getTableName();
-    String jarFile = context.getJarFile();
-    SqoopOptions opts = context.getOptions();
-
-    context.setConnManager(this);
-
-    ImportJobBase importer;
-    if (opts.getHBaseTable() != null) {
-      // Import to HBase.
-      if (!HBaseUtil.isHBaseJarPresent()) {
-        throw new ImportException("HBase jars are not present in "
-            + "classpath, cannot import to HBase!");
-      }
-      importer = new HBaseImportJob(opts, context);
-    } else {
-      // Import to HDFS.
-      importer = new DataDrivenImportJob(opts, context.getInputFormat(),
-          context);
-    }
-
-    checkTableImportOptions(context);
-
-    String splitCol = getSplitColumn(opts, tableName);
-    importer.runImport(tableName, jarFile, splitCol, opts.getConf());
-  }
-
-  /**
-   * Default implementation of importQuery() is to launch a MapReduce job
-   * via DataDrivenImportJob to read the table with DataDrivenDBInputFormat,
-   * using its free-form query importer.
-   */
-  public void importQuery(ImportJobContext context)
-      throws IOException, ImportException {
-    String jarFile = context.getJarFile();
-    SqoopOptions opts = context.getOptions();
-
-    context.setConnManager(this);
-
-    ImportJobBase importer;
-    if (opts.getHBaseTable() != null) {
-      // Import to HBase.
-      if (!HBaseUtil.isHBaseJarPresent()) {
-        throw new ImportException("HBase jars are not present in classpath,"
-            + " cannot import to HBase!");
-      }
-      importer = new HBaseImportJob(opts, context);
-    } else {
-      // Import to HDFS.
-      importer = new DataDrivenImportJob(opts, context.getInputFormat(),
-          context);
-    }
-
-    String splitCol = getSplitColumn(opts, null);
-    if (splitCol == null) {
-      String boundaryQuery = opts.getBoundaryQuery();
-      if (opts.getNumMappers() > 1) {
-        // Can't infer a primary key.
-        throw new ImportException("A split-by column must be specified for "
-            + "parallel free-form query imports. Please specify one with "
-            + "--split-by or perform a sequential import with '-m 1'.");
-      } else if (boundaryQuery != null && !boundaryQuery.isEmpty()) {
-        // Query import with boundary query and no split column specified
-        throw new ImportException("Using a boundary query for a query based "
-            + "import requires specifying the split by column as well. Please "
-            + "specify a column name using --split-by and try again.");
-      }
-    }
-
-    importer.runImport(null, jarFile, splitCol, opts.getConf());
-  }
-
-  /**
-   * Executes an arbitrary SQL statement.
-   * @param stmt The SQL statement to execute
-   * @param fetchSize Overrides default or parameterized fetch size
-   * @return A ResultSet encapsulating the results or null on error
-   */
-  protected ResultSet execute(String stmt, Integer fetchSize, Object... args)
-      throws SQLException {
-    // Release any previously-open statement.
-    release();
-
-    PreparedStatement statement = null;
-    statement = this.getConnection().prepareStatement(stmt,
-        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-    if (fetchSize != null) {
-      LOG.debug("Using fetchSize for next query: " + fetchSize);
-      statement.setFetchSize(fetchSize);
-    }
-    this.lastStatement = statement;
-    if (null != args) {
-      for (int i = 0; i < args.length; i++) {
-        statement.setObject(i + 1, args[i]);
-      }
-    }
-
-    LOG.info("Executing SQL statement: " + stmt);
-    return statement.executeQuery();
-  }
-
-  /**
-   * Executes an arbitrary SQL Statement.
-   * @param stmt The SQL statement to execute
-   * @return A ResultSet encapsulating the results or null on error
-   */
-  protected ResultSet execute(String stmt, Object... args) throws SQLException {
-    return execute(stmt, options.getFetchSize(), args);
-  }
-
-  /**
-   * Resolve a database-specific type to the Java type that should contain it.
-   * @param sqlType
-   * @return the name of a Java type to hold the sql datatype, or null if none.
-   */
-  public String toJavaType(int sqlType) {
-    // Mappings taken from:
-    // http://java.sun.com/j2se/1.3/docs/guide/jdbc/getstart/mapping.html
-    if (sqlType == Types.INTEGER) {
-      return "Integer";
-    } else if (sqlType == Types.VARCHAR) {
-      return "String";
-    } else if (sqlType == Types.CHAR) {
-      return "String";
-    } else if (sqlType == Types.LONGVARCHAR) {
-      return "String";
-    } else if (sqlType == Types.NVARCHAR) {
-      return "String";
-    } else if (sqlType == Types.NCHAR) {
-      return "String";
-    } else if (sqlType == Types.LONGNVARCHAR) {
-      return "String";
-    } else if (sqlType == Types.NUMERIC) {
-      return "java.math.BigDecimal";
-    } else if (sqlType == Types.DECIMAL) {
-      return "java.math.BigDecimal";
-    } else if (sqlType == Types.BIT) {
-      return "Boolean";
-    } else if (sqlType == Types.BOOLEAN) {
-      return "Boolean";
-    } else if (sqlType == Types.TINYINT) {
-      return "Integer";
-    } else if (sqlType == Types.SMALLINT) {
-      return "Integer";
-    } else if (sqlType == Types.BIGINT) {
-      return "Long";
-    } else if (sqlType == Types.REAL) {
-      return "Float";
-    } else if (sqlType == Types.FLOAT) {
-      return "Double";
-    } else if (sqlType == Types.DOUBLE) {
-      return "Double";
-    } else if (sqlType == Types.DATE) {
-      return "java.sql.Date";
-    } else if (sqlType == Types.TIME) {
-      return "java.sql.Time";
-    } else if (sqlType == Types.TIMESTAMP) {
-      return "java.sql.Timestamp";
-    } else if (sqlType == Types.BINARY
-        || sqlType == Types.VARBINARY) {
-      return BytesWritable.class.getName();
-    } else if (sqlType == Types.CLOB) {
-      return ClobRef.class.getName();
-    } else if (sqlType == Types.BLOB
-        || sqlType == Types.LONGVARBINARY) {
-      return BlobRef.class.getName();
-    } else {
-      // TODO(aaron): Support DISTINCT, ARRAY, STRUCT, REF, JAVA_OBJECT.
-      // Return null indicating database-specific manager should return a
-      // java data type if it can find one for any nonstandard type.
-      return null;
-    }
-  }
-
-  /**
-   * Resolve a database-specific type to Hive data type.
-   * @param sqlType     sql type
-   * @return            hive type
-   */
-  public String toHiveType(int sqlType) {
-    return HiveTypes.toHiveType(sqlType);
-  }
-
-  public void close() throws SQLException {
-    release();
-  }
-
-  /**
-   * Prints the contents of a ResultSet to the specified PrintWriter.
-   * The ResultSet is closed at the end of this method.
-   * @param results the ResultSet to print.
-   * @param pw the location to print the data to.
-   */
-  protected void formatAndPrintResultSet(ResultSet results, PrintWriter pw) {
-    try {
-      try {
-        int cols = results.getMetaData().getColumnCount();
-        pw.println("Got " + cols + " columns back");
-        if (cols > 0) {
-          ResultSetMetaData rsmd = results.getMetaData();
-          String schema = rsmd.getSchemaName(1);
-          String table = rsmd.getTableName(1);
-          if (null != schema) {
-            pw.println("Schema: " + schema);
-          }
-
-          if (null != table) {
-            pw.println("Table: " + table);
-          }
-        }
-      } catch (SQLException sqlE) {
-        LOG.error("SQLException reading result metadata: " + sqlE.toString());
-      }
-
-      try {
-        new ResultSetPrinter().printResultSet(pw, results);
-      } catch (IOException ioe) {
-        LOG.error("IOException writing results: " + ioe.toString());
-        return;
-      }
-    } finally {
-      try {
-        results.close();
-        getConnection().commit();
-      } catch (SQLException sqlE) {
-        LOG.warn("SQLException closing ResultSet: " + sqlE.toString());
-      }
-
-      release();
-    }
-  }
-
-  /**
-   * Poor man's SQL query interface; used for debugging.
-   * @param s the SQL statement to execute.
-   */
-  public void execAndPrint(String s) {
-    ResultSet results = null;
-    try {
-      results = execute(s);
-    } catch (SQLException sqlE) {
-      LOG.error("Error executing statement: "
-          + StringUtils.stringifyException(sqlE));
-      release();
-      return;
-    }
-
-    PrintWriter pw = new PrintWriter(System.out, true);
-    try {
-      formatAndPrintResultSet(results, pw);
-    } finally {
-      pw.close();
-    }
-  }
-
-  /**
-   * Create a connection to the database; usually used only from within
-   * getConnection(), which enforces a singleton guarantee around the
-   * Connection object.
-   */
-  protected Connection makeConnection() throws SQLException {
-
-    Connection connection;
-    String driverClass = getDriverClass();
-
-    try {
-      Class.forName(driverClass);
-    } catch (ClassNotFoundException cnfe) {
-      throw new RuntimeException("Could not load db driver class: "
-          + driverClass);
-    }
-
-    String username = options.getUsername();
-    String password = options.getPassword();
-    String connectString = options.getConnectString();
-    Properties connectionParams = options.getConnectionParams();
-    if (connectionParams != null && connectionParams.size() > 0) {
-      LOG.debug("User specified connection params. "
-              + "Using properties specific API for making connection.");
-
-      Properties props = new Properties();
-      if (username != null) {
-        props.put("user", username);
-      }
-
-      if (password != null) {
-        props.put("password", password);
-      }
-
-      props.putAll(connectionParams);
-      connection = DriverManager.getConnection(connectString, props);
-    } else {
-      LOG.debug("No connection paramenters specified. "
-              + "Using regular API for making connection.");
-      if (username == null) {
-        connection = DriverManager.getConnection(connectString);
-      } else {
-        connection = DriverManager.getConnection(
-                        connectString, username, password);
-      }
-    }
-
-    // We only use this for metadata queries. Loosest semantics are okay.
-    connection.setTransactionIsolation(getMetadataIsolationLevel());
-    connection.setAutoCommit(false);
-
-    return connection;
-  }
-
-  /**
-   * @return the transaction isolation level to use for metadata queries
-   * (queries executed by the ConnManager itself).
-   */
-  protected int getMetadataIsolationLevel() {
-    return Connection.TRANSACTION_READ_COMMITTED;
-  }
-
-  /**
-   * Export data stored in HDFS into a table in a database.
-   */
-  public void exportTable(ExportJobContext context)
-      throws IOException, ExportException {
-    context.setConnManager(this);
-    JdbcExportJob exportJob = new JdbcExportJob(context);
-    exportJob.runExport();
-  }
-
-  public void release() {
-    if (null != this.lastStatement) {
-      try {
-        this.lastStatement.close();
-      } catch (SQLException e) {
-        LOG.warn("Exception closing executed Statement: " + e);
-      }
-
-      this.lastStatement = null;
-    }
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public void updateTable(ExportJobContext context)
-      throws IOException, ExportException {
-    context.setConnManager(this);
-    JdbcUpdateExportJob exportJob = new JdbcUpdateExportJob(context);
-    exportJob.runExport();
-  }
-
-  /**
-   * @return a SQL query to retrieve the current timestamp from the db.
-   */
-  protected String getCurTimestampQuery() {
-    return "SELECT CURRENT_TIMESTAMP()";
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public Timestamp getCurrentDbTimestamp() {
-    release(); // Release any previous ResultSet.
-
-    Statement s = null;
-    ResultSet rs = null;
-    try {
-      Connection c = getConnection();
-      s = c.createStatement();
-      rs = s.executeQuery(getCurTimestampQuery());
-      if (rs == null || !rs.next()) {
-        return null; // empty ResultSet.
-      }
-
-      return rs.getTimestamp(1);
-    } catch (SQLException sqlE) {
-      LOG.warn("SQL exception accessing current timestamp: " + sqlE);
-      return null;
-    } finally {
-      try {
-        if (null != rs) {
-          rs.close();
-        }
-      } catch (SQLException sqlE) {
-        LOG.warn("SQL Exception closing resultset: " + sqlE);
-      }
-
-      try {
-        if (null != s) {
-          s.close();
-        }
-      } catch (SQLException sqlE) {
-        LOG.warn("SQL Exception closing statement: " + sqlE);
-      }
-    }
-  }
-
-  @Override
-  public long getTableRowCount(String tableName) throws SQLException {
-    release(); // Release any previous ResultSet
-    long result = -1;
-    String countQuery = "SELECT COUNT(*) FROM " + tableName;
-    Statement stmt = null;
-    ResultSet rset = null;
-    try {
-      Connection conn = getConnection();
-      stmt = conn.createStatement();
-      rset = stmt.executeQuery(countQuery);
-      rset.next();
-      result = rset.getLong(1);
-    } catch (SQLException ex) {
-      LOG.error("Unable to query count * for table " + tableName, ex);
-      throw ex;
-    } finally {
-      if (rset != null) {
-        try {
-          rset.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close result set", ex);
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close statement", ex);
-        }
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public void deleteAllRecords(String tableName) throws SQLException {
-    release(); // Release any previous ResultSet
-    String deleteQuery = "DELETE FROM " + tableName;
-    Statement stmt = null;
-    try {
-      Connection conn = getConnection();
-      stmt = conn.createStatement();
-      int updateCount = stmt.executeUpdate(deleteQuery);
-      conn.commit();
-      LOG.info("Deleted " + updateCount + " records from " + tableName);
-    } catch (SQLException ex) {
-      LOG.error("Unable to execute delete query: "  + deleteQuery, ex);
-      throw ex;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close statement", ex);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void migrateData(String fromTable, String toTable)
-    throws SQLException {
-    release(); // Release any previous ResultSet
-    String updateQuery = "INSERT INTO " + toTable
-          + " ( SELECT * FROM " + fromTable + " )";
-
-    String deleteQuery = "DELETE FROM " + fromTable;
-    Statement stmt = null;
-    try {
-      Connection conn = getConnection();
-      stmt = conn.createStatement();
-
-      // Insert data from the fromTable to the toTable
-      int updateCount = stmt.executeUpdate(updateQuery);
-      LOG.info("Migrated " + updateCount + " records from " + fromTable
-                  + " to " + toTable);
-
-      // Delete the records from the fromTable
-      int deleteCount = stmt.executeUpdate(deleteQuery);
-
-      // If the counts do not match, fail the transaction
-      if (updateCount != deleteCount) {
-        conn.rollback();
-        throw new RuntimeException("Inconsistent record counts");
-      }
-      conn.commit();
-    } catch (SQLException ex) {
-      LOG.error("Unable to migrate data from "
-          + fromTable + " to " + toTable, ex);
-      throw ex;
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException ex) {
-          LOG.error("Unable to close statement", ex);
-        }
-      }
-    }
-  }
-
-  public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
-    return options.getBoundaryQuery();
-  }
 }

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/CatalogQueryManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/CatalogQueryManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/CatalogQueryManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/CatalogQueryManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+
+/**
+ * Database manager that queries catalog tables directly
+ * (instead of metadata calls) to retrieve information.
+ */
+public abstract class CatalogQueryManager
+    extends com.cloudera.sqoop.manager.GenericJdbcManager {
+
+  public static final Log LOG = LogFactory.getLog(
+    CatalogQueryManager.class.getName());
+
+  public CatalogQueryManager(final String driverClass,
+    final SqoopOptions opts) {
+    super(driverClass, opts);
+  }
+
+  protected abstract String getListDatabasesQuery();
+  @Override
+  public String[] listDatabases() {
+    Connection c = null;
+    Statement s = null;
+    ResultSet rs = null;
+    List<String> databases = new ArrayList<String>();
+    try {
+      c = getConnection();
+      s = c.createStatement();
+      rs = s.executeQuery(getListDatabasesQuery());
+      while (rs.next()) {
+        databases.add(rs.getString(1));
+      }
+      c.commit();
+    } catch (SQLException sqle) {
+      try {
+        if (c != null) {
+          c.rollback();
+        }
+      } catch (SQLException ce) {
+        LOG.error("Failed to rollback transaction", ce);
+      }
+      LOG.error("Failed to list databases", sqle);
+      throw new RuntimeException(sqle);
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException re) {
+          LOG.error("Failed to close resultset", re);
+        }
+      }
+      if (s != null) {
+        try {
+          s.close();
+        } catch (SQLException se) {
+          LOG.error("Failed to close statement", se);
+        }
+      }
+    }
+
+    return databases.toArray(new String[databases.size()]);
+  }
+
+  protected abstract String getListTablesQuery();
+  @Override
+  public String[] listTables() {
+    Connection c = null;
+    Statement s = null;
+    ResultSet rs = null;
+    List<String> tables = new ArrayList<String>();
+    try {
+      c = getConnection();
+      s = c.createStatement();
+      rs = s.executeQuery(getListTablesQuery());
+      while (rs.next()) {
+        tables.add(rs.getString(1));
+      }
+      c.commit();
+    } catch (SQLException sqle) {
+      try {
+        if (c != null) {
+          c.rollback();
+        }
+      } catch (SQLException ce) {
+        LOG.error("Failed to rollback transaction", ce);
+      }
+      LOG.error("Failed to list tables", sqle);
+      throw new RuntimeException(sqle);
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException re) {
+          LOG.error("Failed to close resultset", re);
+        }
+      }
+      if (s != null) {
+        try {
+          s.close();
+        } catch (SQLException se) {
+          LOG.error("Failed to close statement", se);
+        }
+      }
+    }
+
+    return tables.toArray(new String[tables.size()]);
+  }
+
+  protected abstract String getListColumnsQuery(String tableName);
+  @Override
+  public String[] getColumnNames(String tableName) {
+    Connection c = null;
+    Statement s = null;
+    ResultSet rs = null;
+    List<String> columns = new ArrayList<String>();
+    try {
+      c = getConnection();
+      s = c.createStatement();
+      rs = s.executeQuery(getListColumnsQuery(tableName));
+      while (rs.next()) {
+        columns.add(rs.getString(1));
+      }
+      c.commit();
+    } catch (SQLException sqle) {
+      try {
+        if (c != null) {
+          c.rollback();
+        }
+      } catch (SQLException ce) {
+        LOG.error("Failed to rollback transaction", ce);
+      }
+      LOG.error("Failed to list columns", sqle);
+      throw new RuntimeException(sqle);
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException re) {
+          LOG.error("Failed to close resultset", re);
+        }
+      }
+      if (s != null) {
+        try {
+          s.close();
+        } catch (SQLException se) {
+          LOG.error("Failed to close statement", se);
+        }
+      }
+    }
+
+    return columns.toArray(new String[columns.size()]);
+  }
+
+  protected abstract String getPrimaryKeyQuery(String tableName);
+  @Override
+  public String getPrimaryKey(String tableName) {
+    Connection c = null;
+    Statement s = null;
+    ResultSet rs = null;
+    List<String> columns = new ArrayList<String>();
+    try {
+      c = getConnection();
+      s = c.createStatement();
+      rs = s.executeQuery(getPrimaryKeyQuery(tableName));
+      while (rs.next()) {
+        columns.add(rs.getString(1));
+      }
+      c.commit();
+    } catch (SQLException sqle) {
+      try {
+        if (c != null) {
+          c.rollback();
+        }
+      } catch (SQLException ce) {
+        LOG.error("Failed to rollback transaction", ce);
+      }
+      LOG.error("Failed to list primary key", sqle);
+      throw new RuntimeException(sqle);
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException re) {
+          LOG.error("Failed to close resultset", re);
+        }
+      }
+      if (s != null) {
+        try {
+          s.close();
+        } catch (SQLException se) {
+          LOG.error("Failed to close statement", se);
+        }
+      }
+    }
+
+    if (columns.size() == 0) {
+      // Table has no primary key
+      return null;
+    }
+
+    if (columns.size() > 1) {
+      // The primary key is multi-column primary key. Warn the user.
+      LOG.warn("The table " + tableName + " "
+        + "contains a multi-column primary key. Sqoop will default to "
+        + "the column " + columns.get(0) + " only for this job.");
+    }
+
+    return columns.get(0);
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/CatalogQueryManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Abstract interface that manages connections to a database.
+ * The implementations of this class drive the actual discussion with
+ * the database about table formats, etc.
+ */
+public abstract class ConnManager {
+
+  public static final Log LOG = LogFactory.getLog(SqlManager.class.getName());
+
+  /**
+   * Return a list of all databases on a server.
+   */
+  public abstract String [] listDatabases();
+
+  /**
+   * Return a list of all tables in a database.
+   */
+  public abstract String [] listTables();
+
+  /**
+   * Return a list of column names in a table in the order returned by the db.
+   */
+  public abstract String [] getColumnNames(String tableName);
+
+  /**
+   * Return a list of column names in query in the order returned by the db.
+   */
+  public String [] getColumnNamesForQuery(String query) {
+    LOG.error("This database does not support free-form query column names.");
+    return null;
+  }
+
+  /**
+   * Return the name of the primary key for a table, or null if there is none.
+   */
+  public abstract String getPrimaryKey(String tableName);
+
+  /**
+   * Return java type for SQL type.
+   * @param sqlType     sql type
+   * @return            java type
+   */
+  public abstract String toJavaType(int sqlType);
+
+    /**
+     * Return hive type for SQL type.
+     * @param sqlType   sql type
+     * @return          hive type
+     */
+  public abstract String toHiveType(int sqlType);
+
+  /**
+   * Return an unordered mapping from colname to sqltype for
+   * all columns in a table.
+   *
+   * The Integer type id is a constant from java.sql.Types
+   */
+  public abstract Map<String, Integer> getColumnTypes(String tableName);
+
+  /**
+   * Return an unordered mapping from colname to sqltype for
+   * all columns in a table or query.
+   *
+   * The Integer type id is a constant from java.sql.Types
+   *
+   * @param tableName the name of the table
+   * @param sqlQuery the SQL query to use if tableName is null
+   */
+  public Map<String, Integer> getColumnTypes(String tableName,
+      String sqlQuery) throws IOException {
+    Map<String, Integer> columnTypes;
+    if (null != tableName) {
+      // We're generating a class based on a table import.
+      columnTypes = getColumnTypes(tableName);
+    } else {
+      // This is based on an arbitrary query.
+      String query = sqlQuery;
+      if (query.indexOf(SqlManager.SUBSTITUTE_TOKEN) == -1) {
+        throw new IOException("Query [" + query + "] must contain '"
+            + SqlManager.SUBSTITUTE_TOKEN + "' in WHERE clause.");
+      }
+
+      columnTypes = getColumnTypesForQuery(query);
+    }
+    return columnTypes;
+  }
+
+  /**
+   * This method allows various connection managers to indicate if they support
+   * staging data for export jobs. The managers that do support this must
+   * override this method and return <tt>true</tt>.
+   *
+   * @return true if the connection manager supports staging data for export
+   * use-case.
+   */
+  public boolean supportsStagingForExport() {
+    return false;
+  }
+
+  /**
+   * Returns the count of all rows that exist in the given table.
+   * @param tableName the name of the table which will be queried.
+   * @return the number of rows present in the given table.
+   * @throws SQLException if an error occurs during execution
+   * @throws UnsupportedOperationException if the connection manager does not
+   * support this operation.
+   */
+  public long getTableRowCount(String tableName) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Deletes all records from the given table. This method is invoked during
+   * and export run when a staging table is specified. The staging table is
+   * cleaned before the commencement of export job, and after the data has
+   * been moved to the target table.
+   * @param tableName name of the table which will be emptied.
+   * @throws SQLException if an error occurs during execution
+   * @throws UnsupportedOperationException if the connection manager does not
+   * support this operation.
+   */
+  public void deleteAllRecords(String tableName) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Migrates all records from the given <tt>fromTable</tt> to the target
+   * <tt>toTable</tt>. This method is invoked as a last step of an export
+   * run where the staging is used to collect data before pushing it into the
+   * target table.
+   * @param fromTable the name of the staging table
+   * @param toTable the name of the target table
+   * @throws SQLException if an error occurs during execution
+   * @throws UnsupportedOperationException if the connection manager does not
+   * support this operation.
+   */
+  public void migrateData(String fromTable, String toTable)
+    throws SQLException {
+      throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Return an unordered mapping from colname to sqltype for
+   * all columns in a query.
+   *
+   * The Integer type id is a constant from java.sql.Types
+   */
+  public Map<String, Integer> getColumnTypesForQuery(String query) {
+    LOG.error("This database does not support free-form query column types.");
+    return null;
+  }
+
+  /**
+   * Execute a SQL statement to read the named set of columns from a table.
+   * If columns is null, all columns from the table are read. This is a direct
+   * (non-parallelized) read of the table back to the current client.
+   * The client is responsible for calling ResultSet.close() when done with the
+   * returned ResultSet object, and for calling release() after that to free
+   * internal state.
+   */
+  public abstract ResultSet readTable(String tableName, String [] columns)
+      throws SQLException;
+
+  /**
+   * @return the actual database connection.
+   */
+  public abstract Connection getConnection() throws SQLException;
+
+  /**
+   * @return a string identifying the driver class to load for this
+   * JDBC connection type.
+   */
+  public abstract String getDriverClass();
+
+  /**
+   * Execute a SQL statement 's' and print its results to stdout.
+   */
+  public abstract void execAndPrint(String s);
+
+  /**
+   * Perform an import of a table from the database into HDFS.
+   */
+  public abstract void importTable(
+          com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException;
+
+  /**
+   * Perform an import of a free-form query from the database into HDFS.
+   */
+  public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    throw new ImportException(
+        "This database only supports table-based imports.");
+  }
+
+  /**
+   * When using a column name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a column named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param colName the column name as provided by the user, etc.
+   * @return how the column name should be rendered in the sql text.
+   */
+  public String escapeColName(String colName) {
+    return colName;
+  }
+
+  /**
+   * When using a table name in a generated SQL query, how (if at all)
+   * should we escape that column name? e.g., a table named "table"
+   * may need to be quoted with backtiks: "`table`".
+   *
+   * @param tableName the table name as provided by the user, etc.
+   * @return how the table name should be rendered in the sql text.
+   */
+  public String escapeTableName(String tableName) {
+    return tableName;
+  }
+
+  /**
+   * Perform any shutdown operations on the connection.
+   */
+  public abstract void close() throws SQLException;
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   * This inserts new rows into the target table.
+   */
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("This database does not support exports");
+  }
+
+  /**
+   * Export updated data stored in HDFS into a database table.
+   * This updates existing rows in the target table, based on the
+   * updateKeyCol specified in the context's SqoopOptions.
+   */
+  public void updateTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("This database does not support updates");
+  }
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   * This may update or insert rows into the target table depending on
+   * whether rows already exist in the target table or not.
+   */
+  public void upsertTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("Mixed update/insert is not supported"
+        + " against the target database yet");
+  }
+
+  /**
+   * Configure database output column ordering explicitly for code generator.
+   * The code generator should generate the DBWritable.write(PreparedStatement)
+   * method with columns exporting in this order.
+   */
+  public void configureDbOutputColumns(SqoopOptions options) {
+    // We're in update mode. We need to explicitly set the database output
+    // column ordering in the codeGenerator.  The UpdateKeyCol must come
+    // last, because the UPDATE-based OutputFormat will generate the SET
+    // clause followed by the WHERE clause, and the SqoopRecord needs to
+    // serialize to this layout.
+    Set<String> updateKeys = new LinkedHashSet<String>();
+    Set<String> updateKeysUppercase = new HashSet<String>();
+    String updateKeyValue = options.getUpdateKeyCol();
+    StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
+    while (stok.hasMoreTokens()) {
+      String nextUpdateColumn = stok.nextToken().trim();
+      if (nextUpdateColumn.length() > 0) {
+        updateKeys.add(nextUpdateColumn);
+        updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
+      } else {
+        throw new RuntimeException("Invalid update key column value specified"
+                    + ": '" + updateKeyValue + "'");
+      }
+    }
+    String [] allColNames = getColumnNames(options.getTableName());
+    List<String> dbOutCols = new ArrayList<String>();
+    for (String col : allColNames) {
+      if (!updateKeysUppercase.contains(col.toUpperCase())) {
+        dbOutCols.add(col); // add non-key columns to the output order list.
+      }
+    }
+
+    // Then add the update key column last.
+    dbOutCols.addAll(updateKeys);
+    options.setDbOutputColumns(dbOutCols.toArray(
+        new String[dbOutCols.size()]));
+  }
+
+  /**
+   * If a method of this ConnManager has returned a ResultSet to you,
+   * you are responsible for calling release() after you close the
+   * ResultSet object, to free internal resources. ConnManager
+   * implementations do not guarantee the ability to have multiple
+   * returned ResultSets available concurrently. Requesting a new
+   * ResultSet from a ConnManager may cause other open ResulSets
+   * to close.
+   */
+  public abstract void release();
+
+  /**
+   * Return the current time from the perspective of the database server.
+   * Return null if this cannot be accessed.
+   */
+  public Timestamp getCurrentDbTimestamp() {
+    LOG.warn("getCurrentDbTimestamp(): Using local system timestamp.");
+    return new Timestamp(System.currentTimeMillis());
+  }
+
+  /**
+   * Given a non-null Timestamp, return the quoted string that can
+   * be inserted into a SQL statement, representing that timestamp.
+   */
+  public String timestampToQueryString(Timestamp ts) {
+    return "'" + ts + "'";
+  }
+
+  /**
+   * Given a date/time, return the quoted string that can
+   * be inserted into a SQL statement, representing that date/time.
+   */
+  public String datetimeToQueryString(String datetime, int columnType) {
+    return "'" + datetime + "'";
+  }
+
+  /**
+   * This method allows the ConnManager to override the creation of an
+   * input-bounds query that is used to create splits when running import
+   * based on free-form query. Any non-null return value is used, whereas a null
+   * return value indicates that the default input bounds query should be
+   * used.
+   * @param splitByCol the column name to split on.
+   * @param sanitizedQuery the sanitized input query specified by user.
+   * @return an input-bounds query or <tt>null</tt> if default query is
+   * acceptable.
+   */
+  public String getInputBoundsQuery(String splitByCol, String sanitizedQuery) {
+      return null;
+  }
+}
+

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/ConnManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/Db2Manager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/Db2Manager.java?rev=1196272&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/Db2Manager.java (added)
+++ incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/Db2Manager.java Tue Nov  1 21:01:09 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
+import com.cloudera.sqoop.mapreduce.JdbcExportJob;
+import com.cloudera.sqoop.util.ExportException;
+
+/**
+ * Manages connections to DB2 databases. Requires the DB2 JDBC driver.
+ */
+public class Db2Manager
+    extends com.cloudera.sqoop.manager.GenericJdbcManager {
+
+  public static final Log LOG = LogFactory.getLog(
+      Db2Manager.class.getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS =
+      "com.ibm.db2.jcc.DB2Driver";
+
+  public Db2Manager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   */
+  @Override
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    context.setConnManager(this);
+    JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
+      ExportBatchOutputFormat.class);
+    exportJob.runExport();
+  }
+
+  /**
+   * DB2 does not support the CURRENT_TIMESTAMP() function. Instead
+   * it uses the sysibm schema for timestamp lookup.
+   */
+  @Override
+  public String getCurTimestampQuery() {
+    return "SELECT CURRENT TIMESTAMP FROM SYSIBM.SYSDUMMY1 WITH UR";
+  }
+
+  @Override
+  public String[] listDatabases() {
+    Connection conn = null;
+    ResultSet rset = null;
+    List<String> databases = new ArrayList<String>();
+    try {
+      conn = getConnection();
+      rset = conn.getMetaData().getSchemas();
+      while (rset.next()) {
+        // The ResultSet contains two columns - TABLE_SCHEM(1),
+        // TABLE_CATALOG(2). We are only interested in TABLE_SCHEM which
+        // represents schema name.
+        databases.add(rset.getString(1));
+      }
+      conn.commit();
+    } catch (SQLException sqle) {
+      try {
+        if (conn != null) {
+          conn.rollback();
+        }
+      } catch (SQLException ce) {
+        LOG.error("Failed to rollback transaction", ce);
+      }
+      LOG.error("Failed to list databases", sqle);
+      throw new RuntimeException(sqle);
+    } finally {
+      if (rset != null) {
+        try {
+          rset.close();
+        } catch (SQLException re) {
+          LOG.error("Failed to close resultset", re);
+        }
+      }
+    }
+
+    return databases.toArray(new String[databases.size()]);
+  }
+}

Propchange: incubator/sqoop/trunk/src/java/org/apache/sqoop/manager/Db2Manager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message