http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index b7079dd..8061c78 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -30,10 +30,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.HdfsExportExtractor;
import org.apache.sqoop.job.etl.HdfsExportPartitioner;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
@@ -45,6 +47,9 @@ import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
import org.junit.Test;
public class TestHdfsExtract extends TestCase {
@@ -53,12 +58,22 @@ public class TestHdfsExtract extends TestCase {
private static final int NUMBER_OF_FILES = 5;
private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
- private String indir;
+ private final String indir;
public TestHdfsExtract() {
indir = INPUT_ROOT + getClass().getSimpleName();
}
+ @Override
+ public void setUp() throws IOException {
+ FileUtils.mkdirs(indir);
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ FileUtils.delete(indir);
+ }
+
/**
* Test case for validating the number of partitions creation
* based on input.
@@ -68,12 +83,12 @@ public class TestHdfsExtract extends TestCase {
*/
@Test
public void testHdfsExportPartitioner() throws Exception {
- FileUtils.delete(indir);
- FileUtils.mkdirs(indir);
createTextInput(null);
Configuration conf = new Configuration();
conf.set(JobConstants.HADOOP_INPUTDIR, indir);
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
HdfsExportPartitioner partitioner = new HdfsExportPartitioner();
PrefixContext prefixContext = new PrefixContext(conf, "");
int[] partitionValues = {2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 17};
@@ -87,87 +102,67 @@ public class TestHdfsExtract extends TestCase {
@Test
public void testUncompressedText() throws Exception {
- FileUtils.delete(indir);
- FileUtils.mkdirs(indir);
createTextInput(null);
- Configuration conf = new Configuration();
- ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
- conf.set(JobConstants.JOB_ETL_PARTITIONER,
- HdfsExportPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsExportExtractor.class.getName());
- conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
- conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
- conf.set(JobConstants.HADOOP_INPUTDIR, indir);
- JobUtils.runJob(conf);
+ JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
}
@Test
- public void testCompressedText() throws Exception {
- FileUtils.delete(indir);
- FileUtils.mkdirs(indir);
+ public void testDefaultCompressedText() throws Exception {
createTextInput(SqoopFileOutputFormat.DEFAULT_CODEC);
- Configuration conf = new Configuration();
- ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
- conf.set(JobConstants.JOB_ETL_PARTITIONER,
- HdfsExportPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsExportExtractor.class.getName());
- conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
- conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
- conf.set(JobConstants.HADOOP_INPUTDIR, indir);
- JobUtils.runJob(conf);
+ JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
+ }
- FileUtils.delete(indir);
- FileUtils.mkdirs(indir);
+ @Test
+ public void testBZip2CompressedText() throws Exception {
createTextInput(BZip2Codec.class);
- conf.set(JobConstants.JOB_ETL_PARTITIONER,
- HdfsExportPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsExportExtractor.class.getName());
- conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
- conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
- conf.set(JobConstants.HADOOP_INPUTDIR, indir);
- JobUtils.runJob(conf);
+ JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
}
@Test
- public void testCompressedSequence() throws Exception {
- FileUtils.delete(indir);
- FileUtils.mkdirs(indir);
+ public void testDefaultCompressedSequence() throws Exception {
createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC);
- Configuration conf = new Configuration();
- ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
- conf.set(JobConstants.JOB_ETL_PARTITIONER,
- HdfsExportPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR,
- HdfsExportExtractor.class.getName());
- conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
- conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
- conf.set(JobConstants.HADOOP_INPUTDIR, indir);
- JobUtils.runJob(conf);
+ JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
}
@Test
public void testUncompressedSequence() throws Exception {
- FileUtils.delete(indir);
- FileUtils.mkdirs(indir);
createSequenceInput(null);
+ JobUtils.runJob(createJob(createConf(), createSchema()).getConfiguration());
+ }
+
+ private Schema createSchema() {
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+ return schema;
+ }
+
+ private Configuration createConf() {
Configuration conf = new Configuration();
ConfigurationUtils.setJobType(conf, MJob.Type.EXPORT);
- conf.set(JobConstants.JOB_ETL_PARTITIONER,
+ conf.setIfUnset(JobConstants.JOB_ETL_PARTITIONER,
HdfsExportPartitioner.class.getName());
- conf.set(JobConstants.JOB_ETL_EXTRACTOR,
+ conf.setIfUnset(JobConstants.JOB_ETL_EXTRACTOR,
HdfsExportExtractor.class.getName());
- conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
- conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
- conf.set(JobConstants.HADOOP_INPUTDIR, indir);
- JobUtils.runJob(conf);
+ conf.setIfUnset(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+ conf.setIfUnset(Constants.JOB_ETL_NUMBER_PARTITIONS, "4");
+ conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
+ conf.setIfUnset(JobConstants.HADOOP_INPUTDIR, indir);
+ return conf;
+ }
+
+ private Job createJob(Configuration conf, Schema schema) throws Exception {
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ job.getConfiguration().set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
+ return job;
}
private void createTextInput(Class<? extends CompressionCodec> clz)
@@ -227,11 +222,11 @@ public class TestHdfsExtract extends TestCase {
SequenceFile.Writer filewriter;
if (codec != null) {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
- conf, filepath, Text.class, NullWritable.class,
- CompressionType.BLOCK, codec);
+ conf, filepath, Text.class, NullWritable.class,
+ CompressionType.BLOCK, codec);
} else {
filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf),
- conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
+ conf, filepath, Text.class, NullWritable.class, CompressionType.NONE);
}
Text text = new Text();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index f849aae..721bba6 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -26,6 +26,7 @@ import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;
+import com.google.common.base.Charsets;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
@@ -33,7 +34,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
@@ -45,6 +48,9 @@ import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
public class TestHdfsLoad extends TestCase {
@@ -68,13 +74,21 @@ public class TestHdfsLoad extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir);
- JobUtils.runJob(conf);
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ JobUtils.runJob(job.getConfiguration());
String fileName = outdir + "/" + OUTPUT_FILE;
InputStream filestream = FileUtils.open(fileName);
BufferedReader filereader = new BufferedReader(new InputStreamReader(
- filestream, Data.CHARSET_NAME));
+ filestream, Charsets.UTF_8));
verifyOutputText(filereader);
}
@@ -86,9 +100,18 @@ public class TestHdfsLoad extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir);
conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
- JobUtils.runJob(conf);
+
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ JobUtils.runJob(job.getConfiguration());
Class<? extends CompressionCodec> codecClass = conf.getClass(
JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC)
@@ -97,7 +120,7 @@ public class TestHdfsLoad extends TestCase {
String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension();
InputStream filestream = codec.createInputStream(FileUtils.open(fileName));
BufferedReader filereader = new BufferedReader(new InputStreamReader(
- filestream, Data.CHARSET_NAME));
+ filestream, Charsets.UTF_8));
verifyOutputText(filereader);
}
@@ -108,7 +131,7 @@ public class TestHdfsLoad extends TestCase {
int index = START_ID*NUMBER_OF_ROWS_PER_ID;
while ((actual = reader.readLine()) != null){
data.setContent(new Object[] {
- index, (double) index, String.valueOf(index) },
+ index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1)
},
Data.ARRAY_RECORD);
expected = data.toString();
index++;
@@ -129,8 +152,17 @@ public class TestHdfsLoad extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir);
- JobUtils.runJob(conf);
+
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ JobUtils.runJob(job.getConfiguration());
Path filepath = new Path(outdir,
OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
@@ -147,10 +179,18 @@ public class TestHdfsLoad extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
conf.set(JobConstants.HADOOP_OUTDIR, outdir);
conf.setBoolean(JobConstants.HADOOP_COMPRESS, true);
- JobUtils.runJob(conf);
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ JobUtils.runJob(job.getConfiguration());
Path filepath = new Path(outdir,
OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION);
SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf),
filepath, conf);
@@ -164,7 +204,7 @@ public class TestHdfsLoad extends TestCase {
Data data = new Data();
while (reader.next(actual)){
data.setContent(new Object[] {
- index, (double) index, String.valueOf(index) },
+ index, (double) index, new String(new byte[] {(byte)(index + 127)}, Charsets.ISO_8859_1)
},
Data.ARRAY_RECORD);
expected.set(data.toString());
index++;
@@ -225,7 +265,7 @@ public class TestHdfsLoad extends TestCase {
Object[] array = new Object[] {
id * NUMBER_OF_ROWS_PER_ID + row,
(double) (id * NUMBER_OF_ROWS_PER_ID + row),
- String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
+ new String(new byte[]{(byte)(id * NUMBER_OF_ROWS_PER_ID + row + 127)}, Charsets.ISO_8859_1)
};
context.getDataWriter().writeArrayRecord(array);
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 7b264c6..ba16b3c 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.job.etl.Loader;
@@ -42,12 +43,17 @@ import org.apache.sqoop.job.etl.Partition;
import org.apache.sqoop.job.etl.Partitioner;
import org.apache.sqoop.job.etl.PartitionerContext;
import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.ConfigurationUtils;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
import org.apache.sqoop.job.mr.SqoopSplit;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
public class TestMapReduce extends TestCase {
@@ -59,6 +65,8 @@ public class TestMapReduce extends TestCase {
Configuration conf = new Configuration();
ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
Job job = new Job(conf);
SqoopInputFormat inputformat = new SqoopInputFormat();
@@ -77,8 +85,15 @@ public class TestMapReduce extends TestCase {
ConfigurationUtils.setJobType(conf, MJob.Type.IMPORT);
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new org.apache.sqoop.schema.type.Text("3"));
- JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
DummyOutputFormat.class);
}
@@ -88,8 +103,15 @@ public class TestMapReduce extends TestCase {
conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+ conf.set(JobConstants.INTERMEDIATE_DATA_FORMAT,
+ CSVIntermediateDataFormat.class.getName());
+ Schema schema = new Schema("Test");
+ schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+ .addColumn(new Text("3"));
- JobUtils.runJob(conf, SqoopInputFormat.class, SqoopMapper.class,
+ Job job = new Job(conf);
+ ConfigurationUtils.setConnectorSchema(job, schema);
+ JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
SqoopNullOutputFormat.class);
}
@@ -152,14 +174,14 @@ public class TestMapReduce extends TestCase {
}
public static class DummyOutputFormat
- extends OutputFormat<Data, NullWritable> {
+ extends OutputFormat<SqoopWritable, NullWritable> {
@Override
public void checkOutputSpecs(JobContext context) {
// do nothing
}
@Override
- public RecordWriter<Data, NullWritable> getRecordWriter(
+ public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
TaskAttemptContext context) {
return new DummyRecordWriter();
}
@@ -170,12 +192,13 @@ public class TestMapReduce extends TestCase {
}
public static class DummyRecordWriter
- extends RecordWriter<Data, NullWritable> {
+ extends RecordWriter<SqoopWritable, NullWritable> {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data data = new Data();
@Override
- public void write(Data key, NullWritable value) {
+ public void write(SqoopWritable key, NullWritable value) {
+
data.setContent(new Object[] {
index,
(double) index,
@@ -215,22 +238,22 @@ public class TestMapReduce extends TestCase {
public static class DummyLoader extends Loader {
private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
private Data expected = new Data();
- private Data actual = new Data();
+ private CSVIntermediateDataFormat actual = new CSVIntermediateDataFormat();
@Override
public void load(LoaderContext context, Object oc, Object oj) throws Exception{
- Object[] array;
- while ((array = context.getDataReader().readArrayRecord()) != null) {
- actual.setContent(array, Data.ARRAY_RECORD);
+ String data;
+ while ((data = context.getDataReader().readTextRecord()) != null) {
+// actual.setSchema(context.getSchema());
+// actual.setObjectData(array, false);
expected.setContent(new Object[] {
index,
(double) index,
String.valueOf(index)},
Data.ARRAY_RECORD);
index++;
-
- assertEquals(expected.toString(), actual.toString());
+ assertEquals(expected.toString(), data);
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
new file mode 100644
index 0000000..b78b140
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import com.google.common.base.Charsets;
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.job.JobConstants;
+
+public class SqoopWritableTest extends TestCase {
+
+ private final SqoopWritable writable = new SqoopWritable();
+
+ public void testStringInStringOut() {
+ String testData = "Live Long and prosper";
+ writable.setString(testData);
+ Assert.assertEquals(testData,writable.getString());
+ }
+
+ public void testDataWritten() throws IOException {
+ String testData = "One ring to rule them all";
+ byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
+ writable.setString(testData);
+ ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(ostream);
+ writable.write(out);
+ byte[] written = ostream.toByteArray();
+ InputStream instream = new ByteArrayInputStream(written);
+ DataInput in = new DataInputStream(instream);
+ String readData = in.readUTF();
+ Assert.assertEquals(testData, readData);
+ }
+
+ public void testDataRead() throws IOException {
+ String testData = "Brandywine Bridge - 20 miles!";
+ ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(ostream);
+ out.writeUTF(testData);
+ InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
+ DataInput in = new DataInputStream(instream);
+ writable.readFields(in);
+ Assert.assertEquals(testData, writable.getString());
+ }
+
+ public void testWriteReadUsingStream() throws IOException {
+ String testData = "You shall not pass";
+ ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(ostream);
+ writable.setString(testData);
+ writable.write(out);
+ byte[] written = ostream.toByteArray();
+
+ //Don't test what the data is, test that SqoopWritable can read it.
+ InputStream instream = new ByteArrayInputStream(written);
+ SqoopWritable newWritable = new SqoopWritable();
+ DataInput in = new DataInputStream(instream);
+ newWritable.readFields(in);
+ Assert.assertEquals(testData, newWritable.getString());
+ ostream.close();
+ instream.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index bee8ab7..1f55f1b 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -23,11 +23,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.Loader;
import org.apache.sqoop.job.etl.LoaderContext;
-import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.job.io.SqoopWritable;
import org.junit.Before;
import org.junit.Test;
@@ -47,7 +49,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
- context.getDataReader().readContent(Data.CSV_RECORD);
+ context.getDataReader().readTextRecord();
throw new BrokenBarrierException();
}
}
@@ -62,7 +64,7 @@ public class TestSqoopOutputFormatLoadExecutor {
int runCount = 0;
Object o;
String[] arr;
- while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
+ while ((o = context.getDataReader().readTextRecord()) != null) {
arr = o.toString().split(",");
Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) {
@@ -84,7 +86,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Override
public void load(LoaderContext context, Object cc, Object jc) throws Exception {
- String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(",");
+ String[] arr = context.getDataReader().readTextRecord().toString().split(",");
Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) {
Assert.assertEquals(i, Integer.parseInt(arr[i]));
@@ -103,7 +105,7 @@ public class TestSqoopOutputFormatLoadExecutor {
int runCount = 0;
Object o;
String[] arr;
- while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
+ while ((o = context.getDataReader().readTextRecord()) != null) {
arr = o.toString().split(",");
Assert.assertEquals(100, arr.length);
for (int i = 0; i < arr.length; i++) {
@@ -119,6 +121,7 @@ public class TestSqoopOutputFormatLoadExecutor {
@Before
public void setUp() {
conf = new Configuration();
+ conf.setIfUnset(JobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
}
@@ -128,12 +131,14 @@ public class TestSqoopOutputFormatLoadExecutor {
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
- RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
- Data data = new Data();
+ RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+ IntermediateDataFormat data = new CSVIntermediateDataFormat();
+ SqoopWritable writable = new SqoopWritable();
try {
for (int count = 0; count < 100; count++) {
- data.setContent(String.valueOf(count), Data.CSV_RECORD);
- writer.write(data, null);
+ data.setTextData(String.valueOf(count));
+ writable.setString(data.getTextData());
+ writer.write(writable, null);
}
} catch (SqoopException ex) {
throw ex.getCause();
@@ -146,8 +151,9 @@ public class TestSqoopOutputFormatLoadExecutor {
conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
- RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
- Data data = new Data();
+ RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+ IntermediateDataFormat data = new CSVIntermediateDataFormat();
+ SqoopWritable writable = new SqoopWritable();
for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) {
@@ -156,8 +162,9 @@ public class TestSqoopOutputFormatLoadExecutor {
builder.append(",");
}
}
- data.setContent(builder.toString(), Data.CSV_RECORD);
- writer.write(data, null);
+ data.setTextData(builder.toString());
+ writable.setString(data.getTextData());
+ writer.write(writable, null);
}
writer.close(null);
}
@@ -166,8 +173,9 @@ public class TestSqoopOutputFormatLoadExecutor {
public void testSuccessfulLoader() throws Throwable {
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
- RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
- Data data = new Data();
+ RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+ IntermediateDataFormat data = new CSVIntermediateDataFormat();
+ SqoopWritable writable = new SqoopWritable();
StringBuilder builder = new StringBuilder();
for (int count = 0; count < 100; count++) {
builder.append(String.valueOf(count));
@@ -175,8 +183,10 @@ public class TestSqoopOutputFormatLoadExecutor {
builder.append(",");
}
}
- data.setContent(builder.toString(), Data.CSV_RECORD);
- writer.write(data, null);
+ data.setTextData(builder.toString());
+ writable.setString(data.getTextData());
+ writer.write(writable, null);
+
//Allow writer to complete.
TimeUnit.SECONDS.sleep(5);
writer.close(null);
@@ -189,8 +199,9 @@ public class TestSqoopOutputFormatLoadExecutor {
conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName());
SqoopOutputFormatLoadExecutor executor = new
SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
- RecordWriter<Data, NullWritable> writer = executor.getRecordWriter();
- Data data = new Data();
+ RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+ IntermediateDataFormat data = new CSVIntermediateDataFormat();
+ SqoopWritable writable = new SqoopWritable();
try {
for (int i = 0; i < 10; i++) {
StringBuilder builder = new StringBuilder();
@@ -200,8 +211,9 @@ public class TestSqoopOutputFormatLoadExecutor {
builder.append(",");
}
}
- data.setContent(builder.toString(), Data.CSV_RECORD);
- writer.write(data, null);
+ data.setTextData(builder.toString());
+ writable.setString(data.getTextData());
+ writer.write(writable, null);
}
writer.close(null);
} catch (SqoopException ex) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1e2f005..a722c74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,12 +143,6 @@ limitations under the License.
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.1.version}</version>
@@ -345,6 +339,11 @@ limitations under the License.
<version>${commons-lang.version}</version>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>${servlet.version}</version>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/spi/pom.xml
----------------------------------------------------------------------
diff --git a/spi/pom.xml b/spi/pom.xml
index 0b240e8..43f17d4 100644
--- a/spi/pom.xml
+++ b/spi/pom.xml
@@ -36,5 +36,10 @@ limitations under the License.
<groupId>org.apache.sqoop</groupId>
<artifactId>sqoop-common</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.sqoop</groupId>
+ <artifactId>connector-sdk</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 2becc56..50eb940 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -20,6 +20,8 @@ package org.apache.sqoop.connector.spi;
import java.util.Locale;
import java.util.ResourceBundle;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.job.etl.Exporter;
import org.apache.sqoop.job.etl.Importer;
import org.apache.sqoop.model.MJob;
@@ -79,4 +81,14 @@ public abstract class SqoopConnector {
*/
public abstract MetadataUpgrader getMetadataUpgrader();
+ /**
+ * Returns the {@linkplain IntermediateDataFormat} this connector
+ * can return natively in. This will support retrieving the data as text
+ * and an array of objects. This should never return null.
+ *
+ * @return {@linkplain IntermediateDataFormat} object
+ */
+ public Class<? extends IntermediateDataFormat<?>> getIntermediateDataFormat()
{
+ return CSVIntermediateDataFormat.class;
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/3c93930b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index bfc28ef..a05274a 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -198,6 +198,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
ConfigurationUtils.setConfigConnectorJob(job, request.getConfigConnectorJob());
ConfigurationUtils.setConfigFrameworkConnection(job, request.getConfigFrameworkConnection());
ConfigurationUtils.setConfigFrameworkJob(job, request.getConfigFrameworkJob());
+ ConfigurationUtils.setConnectorSchema(job, request.getSummary().getConnectorSchema());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());
|