Repository: sqoop Updated Branches: refs/heads/trunk 933e71ef3 -> 7966f1966 SQOOP-1632: Add support for index organized tables to direct connector (David Robson via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/7966f196 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/7966f196 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/7966f196 Branch: refs/heads/trunk Commit: 7966f196674808eec641c616511c89fd79498c54 Parents: 933e71e Author: Abraham Elmahrek Authored: Fri Nov 7 11:25:28 2014 -0800 Committer: Abraham Elmahrek Committed: Fri Nov 7 11:25:28 2014 -0800 ---------------------------------------------------------------------- src/docs/user/connectors.txt | 4 +- .../manager/oracle/OraOopManagerFactory.java | 24 +++-- .../manager/oracle/OraOopOracleQueries.java | 20 +++-- src/test/oraoop/create_users.sql | 2 + src/test/oraoop/pkg_tst_product_gen.pbk | 5 +- src/test/oraoop/table_tst_product_part_iot.xml | 95 ++++++++++++++++++++ .../apache/sqoop/manager/oracle/ImportTest.java | 19 ++++ .../sqoop/manager/oracle/util/OracleData.java | 67 ++++++++++---- .../oracle/util/OracleTableDefinition.java | 17 ++++ 9 files changed, 224 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/docs/user/connectors.txt ---------------------------------------------------------------------- diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt index a118249..e59ef9e 100644 --- a/src/docs/user/connectors.txt +++ b/src/docs/user/connectors.txt @@ -522,7 +522,8 @@ with the following attributes: is a table. + NOTE: Data Connector for Oracle and Hadoop does not process index-organized -tables. +tables unless the table is partitioned and +oraoop.chunk.method+ is set +to +PARTITION+ - There are at least 2 mappers — Jobs where the Sqoop command-line does not include: +--num-mappers 1+ @@ -613,6 +614,7 @@ select_catalog_role role or all of the following object privileges: - +select on dba_objects+ - +select on dba_extents+ - +select on dba_segments+ — Required for Sqoop imports only +- +select on dba_constraints+ — Required for Sqoop imports only - +select on v_$database+ — Required for Sqoop imports only - +select on v_$parameter+ — Required for Sqoop imports only http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java b/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java index 9d75666..17873bc 100644 --- a/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java +++ b/src/java/org/apache/sqoop/manager/oracle/OraOopManagerFactory.java @@ -122,6 +122,25 @@ public class OraOopManagerFactory extends ManagerFactory { result = oraOopConnManager; // <- OraOop accepts // responsibility for this Sqoop // job! + } else { + OraOopConstants.OraOopOracleDataChunkMethod method = + OraOopUtilities.getOraOopOracleDataChunkMethod( + sqoopOptions.getConf()); + if (method == OraOopConstants. + OraOopOracleDataChunkMethod.PARTITION) { + result = oraOopConnManager; + } else { + LOG.info(String.format("%s will not process this Sqoop" + + " connection, as the Oracle table %s is an" + + " index-organized table. If the table is" + + " partitioned, set " + + OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD + + " to " + + OraOopConstants.OraOopOracleDataChunkMethod.PARTITION + + ".", + OraOopConstants.ORAOOP_PRODUCT_NAME, + oraOopConnManager.getOracleTableContext().toString())); + } } } } catch (SQLException ex) { @@ -685,11 +704,6 @@ public class OraOopManagerFactory extends ManagerFactory { result = OraOopOracleQueries.isTableAnIndexOrganizedTable(connection, tableContext); - if (result) { - LOG.info(String.format("%s will not process this Sqoop connection, " - + "as the Oracle table %s is an index-organized table.", - OraOopConstants.ORAOOP_PRODUCT_NAME, tableContext.toString())); - } return result; } catch (SQLException ex) { LOG.warn(String.format( http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java b/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java index 7fd18a1..b65d009 100644 --- a/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java +++ b/src/java/org/apache/sqoop/manager/oracle/OraOopOracleQueries.java @@ -288,11 +288,21 @@ public final class OraOopOracleQueries { + ") "; } - sql += - " ) pl, " + " dba_segments s " - + "WHERE s.owner =pl.table_owner " - + "AND s.segment_name =pl.table_name " - + "AND s.partition_name=pl.partition_name "; + sql += " ) pl, dba_tables t, dba_segments s " + + "WHERE t.owner=pl.table_owner " + + "AND t.table_name=pl.table_name " + + "AND ( " + + " (t.iot_type='IOT' AND (s.owner,s.segment_name)= " + + " (SELECT c.index_owner,c.index_name " + + " FROM dba_constraints c " + + " WHERE c.owner=pl.table_owner " + + " AND c.table_name=pl.table_name " + + " AND c.constraint_type='P')) " + + " OR (t.iot_type IS NULL " + + " AND s.owner=t.owner " + + " AND s.segment_name=t.table_name) " + + " ) " + + "AND s.partition_name=pl.partition_name"; PreparedStatement statement = connection.prepareStatement(sql); OraOopOracleQueries.setStringAtName(statement, "table_owner", table http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/oraoop/create_users.sql ---------------------------------------------------------------------- diff --git a/src/test/oraoop/create_users.sql b/src/test/oraoop/create_users.sql index ecaa409..a75fd49 100644 --- a/src/test/oraoop/create_users.sql +++ b/src/test/oraoop/create_users.sql @@ -24,6 +24,7 @@ grant select on dba_tab_columns to sqooptest; grant select on dba_objects to sqooptest; grant select on dba_extents to sqooptest; grant select on dba_segments to sqooptest; +grant select on dba_constraints to sqooptest; grant select on v_$database to sqooptest; grant select on v_$parameter to sqooptest; grant select on v_$session to sqooptest; @@ -49,6 +50,7 @@ grant select on dba_tab_columns to sqooptest2; grant select on dba_objects to sqooptest2; grant select on dba_extents to sqooptest2; grant select on dba_segments to sqooptest2; +grant select on dba_constraints to sqooptest2; grant select on v_$database to sqooptest2; grant select on v_$parameter to sqooptest2; grant select on v_$session to sqooptest2; http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/oraoop/pkg_tst_product_gen.pbk ---------------------------------------------------------------------- diff --git a/src/test/oraoop/pkg_tst_product_gen.pbk b/src/test/oraoop/pkg_tst_product_gen.pbk index 0bc7df7..791687b 100644 --- a/src/test/oraoop/pkg_tst_product_gen.pbk +++ b/src/test/oraoop/pkg_tst_product_gen.pbk @@ -51,9 +51,10 @@ AS ( $COLUMN_LIST ) + $TABLE_ORGANIZATION_CLAUSE + $PARTITION_CLAUSE NOLOGGING - PARALLEL - $PARTITION_CLAUSE'; + PARALLEL'; END; PROCEDURE prc_insert_data (i_degree NUMBER) http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/oraoop/table_tst_product_part_iot.xml ---------------------------------------------------------------------- diff --git a/src/test/oraoop/table_tst_product_part_iot.xml b/src/test/oraoop/table_tst_product_part_iot.xml new file mode 100644 index 0000000..b09015b --- /dev/null +++ b/src/test/oraoop/table_tst_product_part_iot.xml @@ -0,0 +1,95 @@ + + + + TST_PRODUCT_PART_IOT + + + product_id + INTEGER + id + + + supplier_code + VARCHAR2 (30) + TO_CHAR (id - MOD (id, 5000),'FMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') + + + product_code + VARCHAR2 (30) + TO_CHAR (MOD (id, 100000), 'FMXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') + + + product_descr + VARCHAR2 (255) + DBMS_RANDOM.string ('x', ROUND (DBMS_RANDOM.VALUE (1, 100))) + + + product_long_descr + VARCHAR2 (4000) + DBMS_RANDOM.string ('x', ROUND (DBMS_RANDOM.VALUE (1, 200))) + + + product_cost_price + NUMBER + ROUND (DBMS_RANDOM.VALUE (0, 100000), 2) + + + sell_from_date + DATE + TRUNC (SYSDATE + DBMS_RANDOM.VALUE (-365, 365)) + + + sell_price + NUMBER + ROUND (DBMS_RANDOM.VALUE (0, 200000), 2) + + + create_user + VARCHAR2 (30) + DBMS_RANDOM.string ('U', 30) + + + create_time + TIMESTAMP + TO_TIMESTAMP (TO_CHAR (SYSDATE + DBMS_RANDOM.VALUE (-730, 0),'YYYYMMDDHH24MISS') || '.' || TRUNC (TO_CHAR (DBMS_RANDOM.VALUE * 999999999)), 'YYYYMMDDHH24MISSXFF') + + + last_update_user + VARCHAR2 (30) + DBMS_RANDOM.string ('U', 30) + + + last_update_time + TIMESTAMP + TO_TIMESTAMP (TO_CHAR (SYSDATE + DBMS_RANDOM.VALUE (-730, 0),'YYYYMMDDHH24MISS') || '.' || TRUNC (TO_CHAR (DBMS_RANDOM.VALUE * 999999999)), 'YYYYMMDDHH24MISSXFF') + + + + product_id + + + supplier_code + product_code + + true + + PARTITION BY HASH(product_id) + PARTITIONS 4 + +
\ No newline at end of file http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/org/apache/sqoop/manager/oracle/ImportTest.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/ImportTest.java b/src/test/org/apache/sqoop/manager/oracle/ImportTest.java index d914e3f..b2134c6 100644 --- a/src/test/org/apache/sqoop/manager/oracle/ImportTest.java +++ b/src/test/org/apache/sqoop/manager/oracle/ImportTest.java @@ -238,4 +238,23 @@ public class ImportTest extends OraOopTestCase { } } + @Test + public void testProductPartIotImport() throws Exception { + setSqoopTargetDirectory(getSqoopTargetDirectory() + "tst_product_part"); + createTable("table_tst_product_part_iot.xml"); + + Configuration sqoopConf = getSqoopConf(); + sqoopConf.set(OraOopConstants.ORAOOP_ORACLE_DATA_CHUNK_METHOD, + OraOopConstants.OraOopOracleDataChunkMethod.PARTITION.toString()); + + try { + int retCode = runImport("tst_product_part_iot", sqoopConf, false); + Assert.assertEquals("Return code should be 0", 0, retCode); + + } finally { + cleanupFolders(); + closeTestEnvConnection(); + } + } + } http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java b/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java index 871d317..3f16e04 100644 --- a/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java +++ b/src/test/org/apache/sqoop/manager/oracle/util/OracleData.java @@ -47,10 +47,10 @@ public final class OracleData { } } - private static String getColumnList(List columnList) { + private static String getColumnList(OracleTableDefinition tableDefinition) { StringBuilder result = new StringBuilder(); String delim = ""; - for (OracleDataDefinition column : columnList) { + for (OracleDataDefinition column : tableDefinition.getColumnList()) { result.append(delim).append(column.getColumnName()).append(" ").append( column.getDataType()); delim = ",\n"; @@ -74,8 +74,7 @@ public final class OracleData { IOUtils.toString(classLoader.getResource( "oraoop/pkg_tst_product_gen.psk").openStream()); pkgSql = - pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition - .getColumnList())); + pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition)); pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName()); PreparedStatement stmt = conn.prepareStatement(pkgSql); stmt.execute(); @@ -86,13 +85,22 @@ public final class OracleData { String pkgSql = IOUtils.toString(classLoader.getResource( "oraoop/pkg_tst_product_gen.pbk").openStream()); + String columnList = getColumnList(tableDefinition); + if (tableDefinition.isIndexOrganizedTable()) { + columnList += "\n," + getKeyString(KeyType.PRIMARY, tableDefinition); + } pkgSql = - pkgSql.replaceAll("\\$COLUMN_LIST", getColumnList(tableDefinition - .getColumnList())); + pkgSql.replaceAll("\\$COLUMN_LIST", columnList); pkgSql = pkgSql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName()); pkgSql = pkgSql.replaceAll("\\$DATA_EXPRESSION_LIST", getDataExpression(tableDefinition.getColumnList())); + + pkgSql = + pkgSql.replaceAll("\\$TABLE_ORGANIZATION_CLAUSE", + tableDefinition.isIndexOrganizedTable() + ? "ORGANIZATION INDEX OVERFLOW NOLOGGING" : ""); + pkgSql = pkgSql.replaceAll("\\$PARTITION_CLAUSE", tableDefinition .getPartitionClause()); @@ -100,8 +108,9 @@ public final class OracleData { stmt.execute(); } - private static void createKey(Connection conn, KeyType keyType, - OracleTableDefinition tableDefinition) throws Exception { + private static String getKeyColumns(KeyType keyType, + OracleTableDefinition tableDefinition) { + String result = null; List columns = null; switch (keyType) { case PRIMARY: @@ -120,16 +129,40 @@ public final class OracleData { keyColumnList.append(delim).append(column); delim = ","; } - String keySql = - "alter table \"$TABLE_NAME\" add constraint \"$TABLE_NAME_" + result = keyColumnList.toString(); + } + return result; + } + + private static String getKeyString(KeyType keyType, + OracleTableDefinition tableDefinition) { + String keySql = null; + String keyColumnList = getKeyColumns(keyType, tableDefinition); + if (keyColumnList!=null) { + keySql = "constraint \"$TABLE_NAME_" + ((keyType == KeyType.PRIMARY) ? "PK\" primary key" - : "UK\" unique") + "($PK_COLUMN_LIST) " - + "using index (create unique index \"$TABLE_NAME_" - + ((keyType == KeyType.PRIMARY) ? "PK\"" : "UK\"") - + " on \"$TABLE_NAME\"($PK_COLUMN_LIST) " + "parallel nologging)"; + : "UK\" unique") + "($PK_COLUMN_LIST) "; + + keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList); + keySql = + keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName()); + } + return keySql; + } + + private static void createKey(Connection conn, KeyType keyType, + OracleTableDefinition tableDefinition) throws Exception { + String keySql = getKeyString(keyType, tableDefinition); + String keyColumnList = getKeyColumns(keyType, tableDefinition); + if (keySql!=null) { + keySql = "alter table \"$TABLE_NAME\" add " + keySql + + " using index (create unique index \"$TABLE_NAME_" + + ((keyType == KeyType.PRIMARY) ? "PK\"" : "UK\"") + + " on \"$TABLE_NAME\"($PK_COLUMN_LIST) " + "parallel nologging)"; + keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList); keySql = keySql.replaceAll("\\$TABLE_NAME", tableDefinition.getTableName()); - keySql = keySql.replaceAll("\\$PK_COLUMN_LIST", keyColumnList.toString()); + PreparedStatement stmt = conn.prepareStatement(keySql); stmt.execute(); } @@ -178,7 +211,9 @@ public final class OracleData { procStmt.setInt(2, rowsPerSlave); procStmt.execute(); - createKey(conn, KeyType.PRIMARY, tableDefinition); + if (!tableDefinition.isIndexOrganizedTable()) { + createKey(conn, KeyType.PRIMARY, tableDefinition); + } createKey(conn, KeyType.UNIQUE, tableDefinition); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/7966f196/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java ---------------------------------------------------------------------- diff --git a/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java b/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java index 5a8c42c..1ce9ff4 100644 --- a/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java +++ b/src/test/org/apache/sqoop/manager/oracle/util/OracleTableDefinition.java @@ -40,6 +40,7 @@ public class OracleTableDefinition { private List primaryKeyColumns = new ArrayList(); private List uniqueKeyColumns = new ArrayList(); private String partitionClause; + private boolean indexOrganizedTable = false; public List getUniqueKeyColumns() { return uniqueKeyColumns; @@ -81,6 +82,14 @@ public class OracleTableDefinition { this.partitionClause = newPartitionClause; } + public boolean isIndexOrganizedTable() { + return indexOrganizedTable; + } + + public void setIndexOrganizedTable(boolean newIndexOrganizedTable) { + this.indexOrganizedTable = newIndexOrganizedTable; + } + public OracleTableDefinition() { } @@ -142,6 +151,14 @@ public class OracleTableDefinition { this.partitionClause = partitionClauseNode.getChildNodes().item(0).getNodeValue(); } + + Node indexOrganizedTableNode = + table.getElementsByTagName("indexOrganizedTable").item(0); + if (indexOrganizedTableNode != null) { + String indexOrganizedTableStr = + indexOrganizedTableNode.getChildNodes().item(0).getNodeValue(); + this.indexOrganizedTable = Boolean.parseBoolean(indexOrganizedTableStr); + } } catch (Exception e) { throw new RuntimeException("Could not load table configuration", e); }