sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-1411: The number of tasks is not set properly in PGBulkloadExportManager
Date Sun, 10 Aug 2014 19:48:38 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk e247f76bf -> cfe503744


SQOOP-1411: The number of tasks is not set properly in PGBulkloadExportManager

(Masatake Iwasaki via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cfe50374
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cfe50374
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cfe50374

Branch: refs/heads/trunk
Commit: cfe503744b885defa0998462b4210bee12dec518
Parents: e247f76
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Aug 10 12:48:09 2014 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Aug 10 12:48:09 2014 -0700

----------------------------------------------------------------------
 .../apache/sqoop/mapreduce/ExportJobBase.java   |  4 ++--
 .../org/apache/sqoop/mapreduce/JobBase.java     | 22 +++++++++++++++++---
 .../postgresql/PGBulkloadExportJob.java         | 14 ++++---------
 .../manager/PGBulkloadManagerManualTest.java    |  6 ++++++
 4 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 9f510b9..54c27ee 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -257,8 +257,8 @@ public class ExportJobBase extends JobBase {
   }
 
   @Override
-  protected int configureNumTasks(Job job) throws IOException {
-    int numMaps = super.configureNumTasks(job);
+  protected int configureNumMapTasks(Job job) throws IOException {
+    int numMaps = super.configureNumMapTasks(job);
     job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numMaps);
     return numMaps;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/JobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 92a78ac..032d408 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -285,20 +285,36 @@ public class JobBase {
   }
 
   /**
-   * Configure the number of map/reduce tasks to use in the job.
+   * Configure the number of map/reduce tasks to use in the job,
+   * returning the number of map tasks for backward compatibility.
    */
   protected int configureNumTasks(Job job) throws IOException {
+    int numMapTasks = configureNumMapTasks(job);
+    configureNumReduceTasks(job);
+    return numMapTasks;
+  }
+
+  /**
+   * Configure the number of map tasks to use in the job.
+   */
+  protected int configureNumMapTasks(Job job) throws IOException {
     int numMapTasks = options.getNumMappers();
     if (numMapTasks < 1) {
       numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
       LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
     }
-
     ConfigurationHelper.setJobNumMaps(job, numMapTasks);
-    job.setNumReduceTasks(0);
     return numMapTasks;
   }
 
+  /**
+   * Configure the number of reduce tasks to use in the job.
+   */
+  protected int configureNumReduceTasks(Job job) throws IOException {
+    job.setNumReduceTasks(0);
+    return 0;
+  }
+
   /** Set the main job that will be run. */
   protected void setJob(Job job) {
     mrJob = job;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
index 1e2ad9f..32fe077 100644
--- a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
@@ -139,7 +139,6 @@ public class PGBulkloadExportJob extends ExportJobBase {
     conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
     conf.setInt("mapred.map.max.attempts", 1);
     conf.setInt("mapred.reduce.max.attempts", 1);
-    conf.setIfUnset("mapred.reduce.tasks",  "1");
     if (context.getOptions().doClearStagingTable()) {
       conf.setBoolean("pgbulkload.clear.staging.table", true);
     }
@@ -189,16 +188,11 @@ public class PGBulkloadExportJob extends ExportJobBase {
 
 
   @Override
-  protected int configureNumTasks(Job job) throws IOException {
-    SqoopOptions options = context.getOptions();
-    int numMapTasks = options.getNumMappers();
-    if (numMapTasks < 1) {
-      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
-      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+  protected int configureNumReduceTasks(Job job) throws IOException {
+    if (job.getNumReduceTasks() < 1) {
+      job.setNumReduceTasks(1);
     }
-
-    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
-    return numMapTasks;
+    return job.getNumReduceTasks();
   }
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
index 4d03a8b..fc5fd6d 100644
--- a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
@@ -192,6 +192,12 @@ public class PGBulkloadManagerManualTest extends TestExport {
   }
 
 
+  public void testMultiReduceExportWithNewProp() throws IOException, SQLException {
+    String[] genericargs = newStrArray(null, "-Dmapreduce.job.reduces=2");
+    multiFileTestWithGenericArgs(2, 10, 2, genericargs);
+  }
+
+
   public void testExportWithTablespace() throws IOException, SQLException {
     String[] genericargs =
       newStrArray(null, "-Dpgbulkload.staging.tablespace=" + TABLESPACE);


Mime
View raw message