sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-1632: Add support for index organized tables to direct connector
Date Fri, 07 Nov 2014 19:26:49 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk 933e71ef3 -> 7966f1966


SQOOP-1632: Add support for index organized tables to direct connector

(David Robson via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/7966f196
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/7966f196
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/7966f196

Branch: refs/heads/trunk
Commit: 7966f196674808eec641c616511c89fd79498c54
Parents: 933e71e
Author: Abraham Elmahrek <abraham@elmahrek.com>
Authored: Fri Nov 7 11:25:28 2014 -0800
Committer: Abraham Elmahrek <abraham@elmahrek.com>
Committed: Fri Nov 7 11:25:28 2014 -0800

----------------------------------------------------------------------
 src/docs/user/connectors.txt                    |  4 +-
 .../manager/oracle/OraOopManagerFactory.java    | 24 +++--
 .../manager/oracle/OraOopOracleQueries.java     | 20 +++--
 src/test/oraoop/create_users.sql                |  2 +
 src/test/oraoop/pkg_tst_product_gen.pbk         |  5 +-
 src/test/oraoop/table_tst_product_part_iot.xml  | 95 ++++++++++++++++++++
 .../apache/sqoop/manager/oracle/ImportTest.java | 19 ++++
 .../sqoop/manager/oracle/util/OracleData.java   | 67 ++++++++++----
 .../oracle/util/OracleTableDefinition.java      | 17 ++++
 9 files changed, 224 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index a118249..e59ef9e 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -522,7 +522,8 @@ with the following attributes:
 is a table.
 +
 NOTE: Data Connector for Oracle and Hadoop does not process index-organized
-tables.
+tables unless the table is partitioned and +oraoop.chunk.method+ is set
+to +PARTITION+
 
 - There are at least 2 mappers — Jobs where the Sqoop command-line does not
 include: +--num-mappers 1+
@@ -613,6 +614,7 @@ select_catalog_role role or all of the following object privileges:
 - +select on dba_objects+
 - +select on dba_extents+
 - +select on dba_segments+ — Required for Sqoop imports only
+- +select on dba_constraints+ — Required for Sqoop imports only
 - +select on v_$database+ — Required for Sqoop imports only
 - +select on v_$parameter+ — Required for Sqoop imports only
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java b/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java
index 9d75666..17873bc 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java
@@ -122,6 +122,25 @@ public class OraOopManagerFactory extends ManagerFactory {
                     result = oraOopConnManager; // <- OraOop accepts
                                                 // responsibility for this Sqoop
                                                 // job!
+                  } else {
+                    OraOopConstants.OraOopOracleDataChunkMethod method =
+                        OraOopUtilities.getOraOopOracleDataChunkMethod(
+                            sqoopOptions.getConf());
+                    if (method == OraOopConstants.
+                                      OraOopOracleDataChunkMethod.PARTITION) {
+                      result = oraOopConnManager;
+                    } else {
+                      LOG.info(String.format("%s will not process this Sqoop"
+                         + " connection, as the Oracle table %s is an"
+                         + " index-organized table. If the table is"
+                         + " partitioned, set "
+                         + OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD
+                         + " to "
+                         + OraOopConstants.OraOopOracleDataChunkMethod.PARTITION
+                         + ".",
+                         OraOopConstants.ORAOOP_PRODUCT_NAME,
+                         oraOopConnManager.getOracleTableContext().toString()));
+                    }
                   }
                 }
               } catch (SQLException ex) {
@@ -685,11 +704,6 @@ public class OraOopManagerFactory extends ManagerFactory {
       result =
           OraOopOracleQueries.isTableAnIndexOrganizedTable(connection,
               tableContext);
-      if (result) {
-        LOG.info(String.format("%s will not process this Sqoop connection, "
-            + "as the Oracle table %s is an index-organized table.",
-            OraOopConstants.ORAOOP_PRODUCT_NAME, tableContext.toString()));
-      }
       return result;
     } catch (SQLException ex) {
       LOG.warn(String.format(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java b/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
index 7fd18a1..b65d009 100644
--- a/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
+++ b/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java
@@ -288,11 +288,21 @@ public final class OraOopOracleQueries {
               + ") ";
     }
 
-    sql +=
-        "  ) pl, " + "  dba_segments s "
-            + "WHERE s.owner       =pl.table_owner "
-            + "AND s.segment_name  =pl.table_name "
-            + "AND s.partition_name=pl.partition_name ";
+    sql += "  ) pl, dba_tables t, dba_segments s "
+        + "WHERE t.owner=pl.table_owner "
+        + "AND t.table_name=pl.table_name "
+        + "AND ( "
+        + "     (t.iot_type='IOT' AND (s.owner,s.segment_name)= "
+        + "                          (SELECT c.index_owner,c.index_name "
+        + "                             FROM dba_constraints c "
+        + "                            WHERE c.owner=pl.table_owner "
+        + "                              AND c.table_name=pl.table_name "
+        + "                              AND c.constraint_type='P')) "
+        + "     OR (t.iot_type IS NULL "
+        + "         AND s.owner=t.owner "
+        + "         AND s.segment_name=t.table_name) "
+        + "    ) "
+        + "AND s.partition_name=pl.partition_name";
 
     PreparedStatement statement = connection.prepareStatement(sql);
     OraOopOracleQueries.setStringAtName(statement, "table_owner", table

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/oraoop/create_users.sql
----------------------------------------------------------------------
diff --git a/src/test/oraoop/create_users.sql b/src/test/oraoop/create_users.sql
index ecaa409..a75fd49 100644
--- a/src/test/oraoop/create_users.sql
+++ b/src/test/oraoop/create_users.sql
@@ -24,6 +24,7 @@ grant select on dba_tab_columns to sqooptest;
 grant select on dba_objects to sqooptest;
 grant select on dba_extents to sqooptest;
 grant select on dba_segments to sqooptest;
+grant select on dba_constraints to sqooptest;
 grant select on v_$database to sqooptest;
 grant select on v_$parameter to sqooptest;
 grant select on v_$session to sqooptest;
@@ -49,6 +50,7 @@ grant select on dba_tab_columns to sqooptest2;
 grant select on dba_objects to sqooptest2;
 grant select on dba_extents to sqooptest2;
 grant select on dba_segments to sqooptest2;
+grant select on dba_constraints to sqooptest2;
 grant select on v_$database to sqooptest2;
 grant select on v_$parameter to sqooptest2;
 grant select on v_$session to sqooptest2;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/oraoop/pkg_tst_product_gen.pbk
----------------------------------------------------------------------
diff --git a/src/test/oraoop/pkg_tst_product_gen.pbk b/src/test/oraoop/pkg_tst_product_gen.pbk
index 0bc7df7..791687b 100644
--- a/src/test/oraoop/pkg_tst_product_gen.pbk
+++ b/src/test/oraoop/pkg_tst_product_gen.pbk
@@ -51,9 +51,10 @@ AS
          (
             $COLUMN_LIST
          )
+         $TABLE_ORGANIZATION_CLAUSE
+         $PARTITION_CLAUSE
          NOLOGGING
-         PARALLEL
-         $PARTITION_CLAUSE';
+         PARALLEL';
    END;
 
    PROCEDURE prc_insert_data (i_degree NUMBER)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/oraoop/table_tst_product_part_iot.xml
----------------------------------------------------------------------
diff --git a/src/test/oraoop/table_tst_product_part_iot.xml b/src/test/oraoop/table_tst_product_part_iot.xml
new file mode 100644
index 0000000..b09015b
--- /dev/null
+++ b/src/test/oraoop/table_tst_product_part_iot.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<table>
+  <name>TST_PRODUCT_PART_IOT</name>
+  <columns>
+    <column>
+      <name>product_id</name>
+      <dataType>INTEGER</dataType>
+      <dataExpression>id</dataExpression>
+    </column>
+    <column>
+      <name>supplier_code</name>
+      <dataType>VARCHAR2 (30)</dataType>
+      <dataExpression>TO_CHAR (id - MOD (id, 5000),'FMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')</dataExpression>
+    </column>
+    <column>
+      <name>product_code</name>
+      <dataType>VARCHAR2 (30)</dataType>
+      <dataExpression>TO_CHAR (MOD (id, 100000), 'FMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')</dataExpression>
+    </column>
+    <column>
+      <name>product_descr</name>
+      <dataType>VARCHAR2 (255)</dataType>
+      <dataExpression>DBMS_RANDOM.string ('x', ROUND (DBMS_RANDOM.VALUE (1, 100)))</dataExpression>
+    </column>
+    <column>
+      <name>product_long_descr</name>
+      <dataType>VARCHAR2 (4000)</dataType>
+      <dataExpression>DBMS_RANDOM.string ('x', ROUND (DBMS_RANDOM.VALUE (1, 200)))</dataExpression>
+    </column>
+    <column>
+      <name>product_cost_price</name>
+      <dataType>NUMBER</dataType>
+      <dataExpression>ROUND (DBMS_RANDOM.VALUE (0, 100000), 2)</dataExpression>
+    </column>
+    <column>
+      <name>sell_from_date</name>
+      <dataType>DATE</dataType>
+      <dataExpression>TRUNC (SYSDATE + DBMS_RANDOM.VALUE (-365, 365))</dataExpression>
+    </column>
+    <column>
+      <name>sell_price</name>
+      <dataType>NUMBER</dataType>
+      <dataExpression>ROUND (DBMS_RANDOM.VALUE (0, 200000), 2)</dataExpression>
+    </column>
+    <column>
+      <name>create_user</name>
+      <dataType>VARCHAR2 (30)</dataType>
+      <dataExpression>DBMS_RANDOM.string ('U', 30)</dataExpression>
+    </column>
+    <column>
+      <name>create_time</name>
+      <dataType>TIMESTAMP</dataType>
+      <dataExpression>TO_TIMESTAMP (TO_CHAR (SYSDATE + DBMS_RANDOM.VALUE (-730, 0),'YYYYMMDDHH24MISS')
|| '.' || TRUNC (TO_CHAR (DBMS_RANDOM.VALUE * 999999999)), 'YYYYMMDDHH24MISSXFF')</dataExpression>
+    </column>
+    <column>
+      <name>last_update_user</name>
+      <dataType>VARCHAR2 (30)</dataType>
+      <dataExpression>DBMS_RANDOM.string ('U', 30)</dataExpression>
+    </column>
+    <column>
+      <name>last_update_time</name>
+      <dataType>TIMESTAMP</dataType>
+      <dataExpression>TO_TIMESTAMP (TO_CHAR (SYSDATE + DBMS_RANDOM.VALUE (-730, 0),'YYYYMMDDHH24MISS')
|| '.' || TRUNC (TO_CHAR (DBMS_RANDOM.VALUE * 999999999)), 'YYYYMMDDHH24MISSXFF')</dataExpression>
+    </column>
+  </columns>
+  <primaryKeyColumns>
+    <primaryKeyColumn>product_id</primaryKeyColumn>
+  </primaryKeyColumns>
+  <uniqueKeyColumns>
+    <uniqueKeyColumn>supplier_code</uniqueKeyColumn>
+    <uniqueKeyColumn>product_code</uniqueKeyColumn>
+  </uniqueKeyColumns>
+  <indexOrganizedTable>true</indexOrganizedTable>
+  <partitionClause>
+    PARTITION BY HASH(product_id)
+    PARTITIONS 4
+  </partitionClause>
+</table>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/org/apache/sqoop/manager/oracle/ImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/ImportTest.java b/src/test/org/apache/sqoop/manager/oracle/ImportTest.java
index d914e3f..b2134c6 100644
--- a/src/test/org/apache/sqoop/manager/oracle/ImportTest.java
+++ b/src/test/org/apache/sqoop/manager/oracle/ImportTest.java
@@ -238,4 +238,23 @@ public class ImportTest extends OraOopTestCase {
     }
   }
 
+  @Test
+  public void testProductPartIotImport() throws Exception {
+    setSqoopTargetDirectory(getSqoopTargetDirectory() + "tst_product_part");
+    createTable("table_tst_product_part_iot.xml");
+
+    Configuration sqoopConf = getSqoopConf();
+    sqoopConf.set(OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD,
+        OraOopConstants.OraOopOracleDataChunkMethod.PARTITION.toString());
+
+    try {
+      int retCode = runImport("tst_product_part_iot", sqoopConf, false);
+      Assert.assertEquals("Return code should be 0", 0, retCode);
+
+    } finally {
+      cleanupFolders();
+      closeTestEnvConnection();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java b/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java
index 871d317..3f16e04 100644
--- a/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java
+++ b/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java
@@ -47,10 +47,10 @@ public final class OracleData {
     }
   }
 
-  private static String getColumnList(List<OracleDataDefinition> columnList) {
+  private static String getColumnList(OracleTableDefinition tableDefinition) {
     StringBuilder result = new StringBuilder();
     String delim = "";
-    for (OracleDataDefinition column : columnList) {
+    for (OracleDataDefinition column : tableDefinition.getColumnList()) {
       result.append(delim).append(column.getColumnName()).append(" ").append(
           column.getDataType());
       delim = ",\n";
@@ -74,8 +74,7 @@ public final class OracleData {
         IOUtils.toString(classLoader.getResource(
             "oraoop/pkg_tst_product_gen.psk").openStream());
     pkgSql =
-        pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition
-            .getColumnList()));
+        pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition));
     pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
     PreparedStatement stmt = conn.prepareStatement(pkgSql);
     stmt.execute();
@@ -86,13 +85,22 @@ public final class OracleData {
     String pkgSql =
         IOUtils.toString(classLoader.getResource(
             "oraoop/pkg_tst_product_gen.pbk").openStream());
+    String columnList = getColumnList(tableDefinition);
+    if (tableDefinition.isIndexOrganizedTable()) {
+      columnList += "\n," + getKeyString(KeyType.PRIMARY, tableDefinition);
+    }
     pkgSql =
-        pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition
-            .getColumnList()));
+        pkgSql.replaceAll("\\$COLUMN_LIST", columnList);
     pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
     pkgSql =
         pkgSql.replaceAll("\\$DATA_EXPRESSION_LIST",
             getDataExpression(tableDefinition.getColumnList()));
+
+    pkgSql =
+        pkgSql.replaceAll("\\$TABLE_ORGANIZATION_CLAUSE",
+            tableDefinition.isIndexOrganizedTable()
+                ? "ORGANIZATION INDEX OVERFLOW NOLOGGING" : "");
+
     pkgSql =
         pkgSql.replaceAll("\\$PARTITION_CLAUSE", tableDefinition
             .getPartitionClause());
@@ -100,8 +108,9 @@ public final class OracleData {
     stmt.execute();
   }
 
-  private static void createKey(Connection conn, KeyType keyType,
-      OracleTableDefinition tableDefinition) throws Exception {
+  private static String getKeyColumns(KeyType keyType,
+      OracleTableDefinition tableDefinition) {
+    String result = null;
     List<String> columns = null;
     switch (keyType) {
       case PRIMARY:
@@ -120,16 +129,40 @@ public final class OracleData {
         keyColumnList.append(delim).append(column);
         delim = ",";
       }
-      String keySql =
-          "alter table \"$TABLE_NAME\" add constraint \"$TABLE_NAME_"
+      result = keyColumnList.toString();
+    }
+    return result;
+  }
+
+  private static String getKeyString(KeyType keyType,
+      OracleTableDefinition tableDefinition) {
+    String keySql = null;
+    String keyColumnList = getKeyColumns(keyType, tableDefinition);
+    if (keyColumnList!=null) {
+      keySql = "constraint \"$TABLE_NAME_"
               + ((keyType == KeyType.PRIMARY) ? "PK\" primary key"
-                  : "UK\" unique") + "($PK_COLUMN_LIST) "
-              + "using index (create unique index \"$TABLE_NAME_"
-              + ((keyType == KeyType.PRIMARY) ? "PK\"" : "UK\"")
-              + " on \"$TABLE_NAME\"($PK_COLUMN_LIST) " + "parallel nologging)";
+                  : "UK\" unique") + "($PK_COLUMN_LIST) ";
+
+      keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList);
+      keySql =
+          keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
+    }
+    return keySql;
+  }
+
+  private static void createKey(Connection conn, KeyType keyType,
+      OracleTableDefinition tableDefinition) throws Exception {
+    String keySql = getKeyString(keyType, tableDefinition);
+    String keyColumnList = getKeyColumns(keyType, tableDefinition);
+    if (keySql!=null) {
+      keySql = "alter table \"$TABLE_NAME\" add " + keySql
+             + " using index (create unique index \"$TABLE_NAME_"
+             + ((keyType == KeyType.PRIMARY) ? "PK\"" : "UK\"")
+             + " on \"$TABLE_NAME\"($PK_COLUMN_LIST) " + "parallel nologging)";
+      keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList);
       keySql =
           keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName());
-      keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList.toString());
+
       PreparedStatement stmt = conn.prepareStatement(keySql);
       stmt.execute();
     }
@@ -178,7 +211,9 @@ public final class OracleData {
     procStmt.setInt(2, rowsPerSlave);
     procStmt.execute();
 
-    createKey(conn, KeyType.PRIMARY, tableDefinition);
+    if (!tableDefinition.isIndexOrganizedTable()) {
+      createKey(conn, KeyType.PRIMARY, tableDefinition);
+    }
     createKey(conn, KeyType.UNIQUE, tableDefinition);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java b/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java
index 5a8c42c..1ce9ff4 100644
--- a/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java
+++ b/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java
@@ -40,6 +40,7 @@ public class OracleTableDefinition {
   private List<String> primaryKeyColumns = new ArrayList<String>();
   private List<String> uniqueKeyColumns = new ArrayList<String>();
   private String partitionClause;
+  private boolean indexOrganizedTable = false;
 
   public List<String> getUniqueKeyColumns() {
     return uniqueKeyColumns;
@@ -81,6 +82,14 @@ public class OracleTableDefinition {
     this.partitionClause = newPartitionClause;
   }
 
+  public boolean isIndexOrganizedTable() {
+    return indexOrganizedTable;
+  }
+
+  public void setIndexOrganizedTable(boolean newIndexOrganizedTable) {
+    this.indexOrganizedTable = newIndexOrganizedTable;
+  }
+
   public OracleTableDefinition() {
 
   }
@@ -142,6 +151,14 @@ public class OracleTableDefinition {
         this.partitionClause =
             partitionClauseNode.getChildNodes().item(0).getNodeValue();
       }
+
+      Node indexOrganizedTableNode =
+          table.getElementsByTagName("indexOrganizedTable").item(0);
+      if (indexOrganizedTableNode != null) {
+        String indexOrganizedTableStr =
+            indexOrganizedTableNode.getChildNodes().item(0).getNodeValue();
+        this.indexOrganizedTable = Boolean.parseBoolean(indexOrganizedTableStr);
+      }
     } catch (Exception e) {
       throw new RuntimeException("Could not load table configuration", e);
     }


Mime
View raw message