Updated Branches:
refs/heads/sqoop2 3b8e8d15d -> 3d9aaa0d0
SQOOP-783: Sqoop2: Merge HdfsSequenceExportExtractor and HdfsTextExportExtractor to one Extractor
(Vasanth kumar RJ via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3d9aaa0d
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3d9aaa0d
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3d9aaa0d
Branch: refs/heads/sqoop2
Commit: 3d9aaa0d0d8f559798f667749ebd406d9a20af91
Parents: 3b8e8d1
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Mar 17 12:27:21 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Mar 17 12:27:21 2013 -0700
----------------------------------------------------------------------
.../mapreduce/MapreduceExecutionEngine.java | 6 +-
.../apache/sqoop/job/etl/HdfsExportExtractor.java | 203 +++++++++++++++
.../sqoop/job/etl/HdfsSequenceExportExtractor.java | 101 -------
.../sqoop/job/etl/HdfsTextExportExtractor.java | 131 ----------
.../java/org/apache/sqoop/job/TestHdfsExtract.java | 13 +-
5 files changed, 212 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index b201a8d..767080c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -29,9 +29,9 @@ import org.apache.sqoop.framework.configuration.OutputFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.MapreduceExecutionError;
import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.HdfsExportExtractor;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
import org.apache.sqoop.job.etl.HdfsTextImportLoader;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.job.io.Data;
@@ -128,8 +128,8 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName());
- // We should make one extractor that will be able to read all supported file types
- context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName());
+ // Extractor that will be able to read all supported file types
+ context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName());
context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory);
if(request.getExtractors() != null) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
new file mode 100644
index 0000000..9281bb4
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.LineReader;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
+import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * Extract from HDFS.
+ * Default field delimiter of a record is comma.
+ */
+public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration,
HdfsExportPartition> {
+
+ public static final Log LOG = LogFactory.getLog(HdfsExportExtractor.class.getName());
+
+ private Configuration conf;
+ private DataWriter dataWriter;
+ private long rowRead = 0;
+
+ private final char fieldDelimiter;
+
+ public HdfsExportExtractor() {
+ fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+ }
+
+ @Override
+ public void extract(ExtractorContext context,
+ ConnectionConfiguration connectionConfiguration,
+ ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
+
+ conf = ((PrefixContext) context.getContext()).getConfiguration();
+ dataWriter = context.getDataWriter();
+ dataWriter.setFieldDelimiter(fieldDelimiter);
+
+ try {
+ HdfsExportPartition p = partition;
+ LOG.info("Working on partition: " + p);
+ int numFiles = p.getNumberOfFiles();
+ for (int i = 0; i < numFiles; i++) {
+ extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
+ }
+ } catch (IOException e) {
+ throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
+ }
+ }
+
+ private void extractFile(Path file, long start, long length)
+ throws IOException {
+ long end = start + length;
+ LOG.info("Extracting file " + file);
+ LOG.info("\t from offset " + start);
+ LOG.info("\t to offset " + end);
+ LOG.info("\t of length " + length);
+ if(isSequenceFile(file)) {
+ extractSequenceFile(file, start, length);
+ } else {
+ extractTextFile(file, start, length);
+ }
+ }
+
+ /**
+ * Extracts Sequence file
+ * @param file
+ * @param start
+ * @param length
+ * @throws IOException
+ */
+ private void extractSequenceFile(Path file, long start, long length)
+ throws IOException {
+ LOG.info("Extracting sequence file");
+ long end = start + length;
+ SequenceFile.Reader filereader = new SequenceFile.Reader(
+ file.getFileSystem(conf), file, conf);
+
+ if (start > filereader.getPosition()) {
+ filereader.sync(start); // sync to start
+ }
+
+ Text line = new Text();
+ boolean hasNext = filereader.next(line);
+ while (hasNext) {
+ rowRead++;
+ dataWriter.writeCsvRecord(line.toString());
+ line = new Text();
+ hasNext = filereader.next(line);
+ if (filereader.getPosition() >= end && filereader.syncSeen()) {
+ break;
+ }
+ }
+ filereader.close();
+ }
+
+ /**
+ * Extracts Text file
+ * @param file
+ * @param start
+ * @param length
+ * @throws IOException
+ */
+ private void extractTextFile(Path file, long start, long length)
+ throws IOException {
+ LOG.info("Extracting text file");
+ long end = start + length;
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataInputStream filestream = fs.open(file);
+ CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
+ LineReader filereader;
+ Seekable fileseeker = filestream;
+
+ // Hadoop 1.0 does not have support for custom record delimiter and thus
+ // we
+ // are supporting only default one.
+ // We might add another "else if" case for SplittableCompressionCodec once
+ // we drop support for Hadoop 1.0.
+ if (codec == null) {
+ filestream.seek(start);
+ filereader = new LineReader(filestream);
+ } else {
+ filereader = new LineReader(codec.createInputStream(filestream,
+ codec.createDecompressor()), conf);
+ fileseeker = filestream;
+ }
+ if (start != 0) {
+ // always throw away first record because
+ // one extra line is read in previous split
+ start += filereader.readLine(new Text(), 0);
+ }
+ int size;
+ LOG.info("Start position: " + String.valueOf(start));
+ long next = start;
+ while (next <= end) {
+ Text line = new Text();
+ size = filereader.readLine(line, Integer.MAX_VALUE);
+ if (size == 0) {
+ break;
+ }
+ if (codec == null) {
+ next += size;
+ } else {
+ next = fileseeker.getPos();
+ }
+ rowRead++;
+ dataWriter.writeCsvRecord(line.toString());
+ }
+ LOG.info("Extracting ended on position: " + fileseeker.getPos());
+ filestream.close();
+ }
+
+ @Override
+ public long getRowsRead() {
+ return rowRead;
+ }
+
+ /**
+ * Returns true if given file is sequence
+ * @param file
+ * @return boolean
+ */
+ private boolean isSequenceFile(Path file) {
+ SequenceFile.Reader filereader = null;
+ try {
+ filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf);
+ filereader.close();
+ } catch (IOException e) {
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
deleted file mode 100644
index 2280828..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.etl.io.DataWriter;
-
-public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration,
HdfsExportPartition> {
-
- public static final Log LOG =
- LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
-
- private Configuration conf;
- private DataWriter dataWriter;
-
- private final char fieldDelimiter;
-
- public HdfsSequenceExportExtractor() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- }
-
- @Override
- public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
- ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-
- conf = ((PrefixContext)context.getContext()).getConfiguration();
- dataWriter = context.getDataWriter();
- dataWriter.setFieldDelimiter(fieldDelimiter);
-
- try {
- LOG.info("Working on partition: " + partition);
- int numFiles = partition.getNumberOfFiles();
- for (int i=0; i<numFiles; i++) {
- extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i));
- }
- } catch (IOException e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
- }
- }
-
- private void extractFile(Path file, long start, long length)
- throws IOException {
- long end = start + length;
- LOG.info("Extracting file " + file);
- LOG.info("\t from offset " + start);
- LOG.info("\t to offset " + end);
- LOG.info("\t of length " + length);
-
- SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file,
conf);
-
- if (start > filereader.getPosition()) {
- filereader.sync(start); // sync to start
- }
-
- Text line = new Text();
- boolean hasNext = filereader.next(line);
- while (hasNext) {
- dataWriter.writeCsvRecord(line.toString());
- line = new Text();
- hasNext = filereader.next(line);
- if(filereader.getPosition() >= end && filereader.syncSeen()) {
- break;
- }
- }
- }
-
- @Override
- public long getRowsRead() {
- // TODO need to return the rows read
- return 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
deleted file mode 100644
index ae419ff..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.util.LineReader;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
-import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.MapreduceExecutionError;
-import org.apache.sqoop.job.PrefixContext;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.etl.io.DataWriter;
-
-public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration,
HdfsExportPartition> {
-
- public static final Log LOG =
- LogFactory.getLog(HdfsTextExportExtractor.class.getName());
-
- private Configuration conf;
- private DataWriter dataWriter;
-
- private final char fieldDelimiter;
-
- public HdfsTextExportExtractor() {
- fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
- }
-
- @Override
- public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
- ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
-
- conf = ((PrefixContext)context.getContext()).getConfiguration();
- dataWriter = context.getDataWriter();
- dataWriter.setFieldDelimiter(fieldDelimiter);
-
- try {
- HdfsExportPartition p = partition;
- LOG.info("Working on partition: " + p);
- int numFiles = p.getNumberOfFiles();
- for (int i=0; i<numFiles; i++) {
- extractFile(p.getFile(i), p.getOffset(i), p.getLength(i));
- }
- } catch (IOException e) {
- throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e);
- }
- }
-
- private void extractFile(Path file, long start, long length)
- throws IOException {
- long end = start + length;
- LOG.info("Extracting file " + file);
- LOG.info("\t from offset " + start);
- LOG.info("\t to offset " + end);
- LOG.info("\t of length " + length);
-
- FileSystem fs = file.getFileSystem(conf);
- FSDataInputStream filestream = fs.open(file);
- CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file);
- LineReader filereader;
- Seekable fileseeker = filestream;
-
- // Hadoop 1.0 does not have support for custom record delimiter and thus we
- // are supporting only default one.
- // We might add another "else if" case for SplittableCompressionCodec once
- // we drop support for Hadoop 1.0.
- if (codec == null) {
- filestream.seek(start);
- filereader = new LineReader(filestream);
- } else {
- filereader = new LineReader(
- codec.createInputStream(filestream, codec.createDecompressor()), conf);
- fileseeker = filestream;
- }
-
- if (start != 0) {
- // always throw away first record because
- // one extra line is read in previous split
- start += filereader.readLine(new Text(), 0);
- }
- int size;
- LOG.info("Start position: " + String.valueOf(start));
- long next = start;
- while (next <= end) {
- Text line = new Text();
- size = filereader.readLine(line, Integer.MAX_VALUE);
- if (size == 0) {
- break;
- }
- if (codec == null) {
- next += size;
- } else {
- next = fileseeker.getPos();
- }
- dataWriter.writeCsvRecord(line.toString());
- }
- LOG.info("Extracting ended on position: " + fileseeker.getPos());
- }
-
- @Override
- public long getRowsRead() {
- // TODO need to return the rows read
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index fae6573..62f3a03 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -33,10 +33,9 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.job.etl.HdfsExportExtractor;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
-import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
-import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
import org.apache.sqoop.job.io.Data;
@@ -66,7 +65,7 @@ public class TestHdfsExtract extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsTextExportExtractor.class.getName());
+ HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -84,7 +83,7 @@ public class TestHdfsExtract extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsTextExportExtractor.class.getName());
+ HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -97,7 +96,7 @@ public class TestHdfsExtract extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsTextExportExtractor.class.getName());
+ HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -115,7 +114,7 @@ public class TestHdfsExtract extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsSequenceExportExtractor.class.getName());
+ HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
@@ -133,7 +132,7 @@ public class TestHdfsExtract extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsSequenceExportExtractor.class.getName());
+ HdfsExportExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
|