Repository: sqoop
Updated Branches:
refs/heads/trunk e247f76bf -> cfe503744
SQOOP-1411: The number of tasks is not set properly in PGBulkloadExportManager
(Masatake Iwasaki via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cfe50374
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cfe50374
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cfe50374
Branch: refs/heads/trunk
Commit: cfe503744b885defa0998462b4210bee12dec518
Parents: e247f76
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Aug 10 12:48:09 2014 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Aug 10 12:48:09 2014 -0700
----------------------------------------------------------------------
.../apache/sqoop/mapreduce/ExportJobBase.java | 4 ++--
.../org/apache/sqoop/mapreduce/JobBase.java | 22 +++++++++++++++++---
.../postgresql/PGBulkloadExportJob.java | 14 ++++---------
.../manager/PGBulkloadManagerManualTest.java | 6 ++++++
4 files changed, 31 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
index 9f510b9..54c27ee 100644
--- a/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
@@ -257,8 +257,8 @@ public class ExportJobBase extends JobBase {
}
@Override
- protected int configureNumTasks(Job job) throws IOException {
- int numMaps = super.configureNumTasks(job);
+ protected int configureNumMapTasks(Job job) throws IOException {
+ int numMaps = super.configureNumMapTasks(job);
job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numMaps);
return numMaps;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/JobBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JobBase.java b/src/java/org/apache/sqoop/mapreduce/JobBase.java
index 92a78ac..032d408 100644
--- a/src/java/org/apache/sqoop/mapreduce/JobBase.java
+++ b/src/java/org/apache/sqoop/mapreduce/JobBase.java
@@ -285,20 +285,36 @@ public class JobBase {
}
/**
- * Configure the number of map/reduce tasks to use in the job.
+ * Configure the number of map/reduce tasks to use in the job,
+ * returning the number of map tasks for backward compatibility.
*/
protected int configureNumTasks(Job job) throws IOException {
+ int numMapTasks = configureNumMapTasks(job);
+ configureNumReduceTasks(job);
+ return numMapTasks;
+ }
+
+ /**
+ * Configure the number of map tasks to use in the job.
+ */
+ protected int configureNumMapTasks(Job job) throws IOException {
int numMapTasks = options.getNumMappers();
if (numMapTasks < 1) {
numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
}
-
ConfigurationHelper.setJobNumMaps(job, numMapTasks);
- job.setNumReduceTasks(0);
return numMapTasks;
}
+ /**
+ * Configure the number of reduce tasks to use in the job.
+ */
+ protected int configureNumReduceTasks(Job job) throws IOException {
+ job.setNumReduceTasks(0);
+ return 0;
+ }
+
/** Set the main job that will be run. */
protected void setJob(Job job) {
mrJob = job;
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
index 1e2ad9f..32fe077 100644
--- a/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/postgresql/PGBulkloadExportJob.java
@@ -139,7 +139,6 @@ public class PGBulkloadExportJob extends ExportJobBase {
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.setInt("mapred.map.max.attempts", 1);
conf.setInt("mapred.reduce.max.attempts", 1);
- conf.setIfUnset("mapred.reduce.tasks", "1");
if (context.getOptions().doClearStagingTable()) {
conf.setBoolean("pgbulkload.clear.staging.table", true);
}
@@ -189,16 +188,11 @@ public class PGBulkloadExportJob extends ExportJobBase {
@Override
- protected int configureNumTasks(Job job) throws IOException {
- SqoopOptions options = context.getOptions();
- int numMapTasks = options.getNumMappers();
- if (numMapTasks < 1) {
- numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
- LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
+ protected int configureNumReduceTasks(Job job) throws IOException {
+ if (job.getNumReduceTasks() < 1) {
+ job.setNumReduceTasks(1);
}
-
- ConfigurationHelper.setJobNumMaps(job, numMapTasks);
- return numMapTasks;
+ return job.getNumReduceTasks();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/cfe50374/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
index 4d03a8b..fc5fd6d 100644
--- a/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
+++ b/src/test/com/cloudera/sqoop/manager/PGBulkloadManagerManualTest.java
@@ -192,6 +192,12 @@ public class PGBulkloadManagerManualTest extends TestExport {
}
+ public void testMultiReduceExportWithNewProp() throws IOException, SQLException {
+ String[] genericargs = newStrArray(null, "-Dmapreduce.job.reduces=2");
+ multiFileTestWithGenericArgs(2, 10, 2, genericargs);
+ }
+
+
public void testExportWithTablespace() throws IOException, SQLException {
String[] genericargs =
newStrArray(null, "-Dpgbulkload.staging.tablespace=" + TABLESPACE);
|