sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [1/2] sqoop git commit: SQOOP-1348: Sqoop2: Remove Data class
Date Mon, 10 Nov 2014 21:41:30 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 49a7431ba -> ace222374


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
deleted file mode 100644
index 139883e..0000000
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.MRExecutionError;
-
-public class Data implements WritableComparable<Data> {
-
-  // The content is an Object to accommodate different kinds of data.
-  // For example, it can be:
-  // - Object[] for an array of object record
-  // - String for a text of CSV record
-  private volatile Object content = null;
-
-  public static final int EMPTY_DATA = 0;
-  public static final int CSV_RECORD = 1;
-  public static final int ARRAY_RECORD = 2;
-  private int type = EMPTY_DATA;
-
-  public static final String CHARSET_NAME = "UTF-8";
-
-  public static final char DEFAULT_RECORD_DELIMITER = '\n';
-  public static final char DEFAULT_FIELD_DELIMITER = ',';
-  public static final char DEFAULT_STRING_DELIMITER = '\'';
-  public static final char DEFAULT_STRING_ESCAPE = '\\';
-  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
-  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
-  private char stringEscape = DEFAULT_STRING_ESCAPE;
-  private String escapedStringDelimiter = String.valueOf(new char[] {
-      stringEscape, stringDelimiter
-  });
-
-  private int[] fieldTypes = null;
-
-  public void setFieldDelimiter(char fieldDelimiter) {
-    this.fieldDelimiter = fieldDelimiter;
-  }
-
-  public void setFieldTypes(int[] fieldTypes) {
-    this.fieldTypes = fieldTypes;
-  }
-
-  public void setContent(Object content, int type) {
-    switch (type) {
-    case EMPTY_DATA:
-    case CSV_RECORD:
-    case ARRAY_RECORD:
-      this.type = type;
-      this.content = content;
-      break;
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
-    }
-  }
-
-  public Object getContent(int targetType) {
-    switch (targetType) {
-    case CSV_RECORD:
-      return format();
-    case ARRAY_RECORD:
-      return parse();
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
-    }
-  }
-
-  public int getType() {
-    return type;
-  }
-
-  public boolean isEmpty() {
-    return (type == EMPTY_DATA);
-  }
-
-  @Override
-  public String toString() {
-    return (String)getContent(CSV_RECORD);
-  }
-
-  @Override
-  public int compareTo(Data other) {
-    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
-    byte[] otherBytes = other.toString().getBytes(
-        Charset.forName(CHARSET_NAME));
-    return WritableComparator.compareBytes(
-        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof Data)) {
-      return false;
-    }
-
-    Data data = (Data)other;
-    if (type != data.getType()) {
-      return false;
-    }
-
-    return toString().equals(data.toString());
-  }
-
-  @Override
-  public int hashCode() {
-    int result = super.hashCode();
-    switch (type) {
-    case CSV_RECORD:
-      result += 31 * content.hashCode();
-      return result;
-    case ARRAY_RECORD:
-      Object[] array = (Object[])content;
-      for (int i = 0; i < array.length; i++) {
-        result += 31 * array[i].hashCode();
-      }
-      return result;
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    type = readType(in);
-    switch (type) {
-    case CSV_RECORD:
-      readCsv(in);
-      break;
-    case ARRAY_RECORD:
-      readArray(in);
-      break;
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    writeType(out, type);
-    switch (type) {
-    case CSV_RECORD:
-      writeCsv(out);
-      break;
-    case ARRAY_RECORD:
-      writeArray(out);
-      break;
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
-    }
-  }
-
-  private int readType(DataInput in) throws IOException {
-    return WritableUtils.readVInt(in);
-  }
-
-  private void writeType(DataOutput out, int type) throws IOException {
-    WritableUtils.writeVInt(out, type);
-  }
-
-  private void readCsv(DataInput in) throws IOException {
-    content = in.readUTF();
-  }
-
-  private void writeCsv(DataOutput out) throws IOException {
-    out.writeUTF((String)content);
-  }
-
-  private void readArray(DataInput in) throws IOException {
-    // read number of columns
-    int columns = in.readInt();
-    content = new Object[columns];
-    Object[] array = (Object[])content;
-    // read each column
-    for (int i = 0; i < array.length; i++) {
-      int type = readType(in);
-      switch (type) {
-      case FieldTypes.UTF:
-        array[i] = in.readUTF();
-        break;
-
-      case FieldTypes.BIN:
-        int length = in.readInt();
-        byte[] bytes = new byte[length];
-        in.readFully(bytes);
-        array[i] = bytes;
-        break;
-
-      case FieldTypes.DOUBLE:
-        array[i] = in.readDouble();
-        break;
-
-      case FieldTypes.FLOAT:
-        array[i] = in.readFloat();
-        break;
-
-      case FieldTypes.LONG:
-        array[i] = in.readLong();
-        break;
-
-      case FieldTypes.INT:
-        array[i] = in.readInt();
-        break;
-
-      case FieldTypes.SHORT:
-        array[i] = in.readShort();
-        break;
-
-      case FieldTypes.CHAR:
-        array[i] = in.readChar();
-        break;
-
-      case FieldTypes.BYTE:
-        array[i] = in.readByte();
-        break;
-
-      case FieldTypes.BOOLEAN:
-        array[i] = in.readBoolean();
-        break;
-
-      case FieldTypes.NULL:
-        array[i] = null;
-        break;
-
-      default:
-        throw new IOException(
-          new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
-        );
-      }
-    }
-  }
-
-  private void writeArray(DataOutput out) throws IOException {
-    Object[] array = (Object[])content;
-    // write number of columns
-    out.writeInt(array.length);
-    // write each column
-    for (int i = 0; i < array.length; i++) {
-      if (array[i] instanceof String) {
-        writeType(out, FieldTypes.UTF);
-        out.writeUTF((String)array[i]);
-
-      } else if (array[i] instanceof byte[]) {
-        writeType(out, FieldTypes.BIN);
-        out.writeInt(((byte[])array[i]).length);
-        out.write((byte[])array[i]);
-
-      } else if (array[i] instanceof Double) {
-        writeType(out, FieldTypes.DOUBLE);
-        out.writeDouble((Double)array[i]);
-
-      } else if (array[i] instanceof Float) {
-        writeType(out, FieldTypes.FLOAT);
-        out.writeFloat((Float)array[i]);
-
-      } else if (array[i] instanceof Long) {
-        writeType(out, FieldTypes.LONG);
-        out.writeLong((Long)array[i]);
-
-      } else if (array[i] instanceof Integer) {
-        writeType(out, FieldTypes.INT);
-        out.writeInt((Integer)array[i]);
-
-      } else if (array[i] instanceof Short) {
-        writeType(out, FieldTypes.SHORT);
-        out.writeShort((Short)array[i]);
-
-      } else if (array[i] instanceof Character) {
-        writeType(out, FieldTypes.CHAR);
-        out.writeChar((Character)array[i]);
-
-      } else if (array[i] instanceof Byte) {
-        writeType(out, FieldTypes.BYTE);
-        out.writeByte((Byte)array[i]);
-
-      } else if (array[i] instanceof Boolean) {
-        writeType(out, FieldTypes.BOOLEAN);
-        out.writeBoolean((Boolean)array[i]);
-
-      } else if (array[i] == null) {
-        writeType(out, FieldTypes.NULL);
-
-      } else {
-        throw new IOException(
-          new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
-            array[i].getClass().getName()
-          )
-        );
-      }
-    }
-  }
-
-  private String format() {
-    switch (type) {
-    case EMPTY_DATA:
-      return null;
-
-    case CSV_RECORD:
-      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
-        return (String)content;
-      } else {
-        // TODO: need to exclude the case where comma is part of a string.
-        return ((String)content).replaceAll(
-            String.valueOf(DEFAULT_FIELD_DELIMITER),
-            String.valueOf(fieldDelimiter));
-      }
-
-    case ARRAY_RECORD:
-      StringBuilder sb = new StringBuilder();
-      Object[] array = (Object[])content;
-      for (int i = 0; i < array.length; i++) {
-        if (i != 0) {
-          sb.append(fieldDelimiter);
-        }
-
-        if (array[i] instanceof String) {
-          sb.append(stringDelimiter);
-          sb.append(escape((String)array[i]));
-          sb.append(stringDelimiter);
-        } else if (array[i] instanceof byte[]) {
-          sb.append(Arrays.toString((byte[])array[i]));
-        } else {
-          sb.append(String.valueOf(array[i]));
-        }
-      }
-      return sb.toString();
-
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
-    }
-  }
-
-  private Object[] parse() {
-    switch (type) {
-    case EMPTY_DATA:
-      return null;
-
-    case CSV_RECORD:
-      ArrayList<Object> list = new ArrayList<Object>();
-      char[] record = ((String)content).toCharArray();
-      int start = 0;
-      int position = start;
-      boolean stringDelimited = false;
-      boolean arrayDelimited = false;
-      int index = 0;
-      while (position < record.length) {
-        if (record[position] == fieldDelimiter) {
-          if (!stringDelimited && !arrayDelimited) {
-            index = parseField(list, record, start, position, index);
-            start = position + 1;
-          }
-        } else if (record[position] == stringDelimiter) {
-          if (!stringDelimited) {
-            stringDelimited = true;
-          }
-          else if (position > 0 && record[position-1] != stringEscape) {
-            stringDelimited = false;
-          }
-        } else if (record[position] == '[') {
-          if (!stringDelimited) {
-            arrayDelimited = true;
-          }
-        } else if (record[position] == ']') {
-          if (!stringDelimited) {
-            arrayDelimited = false;
-          }
-        }
-        position++;
-      }
-      parseField(list, record, start, position, index);
-      return list.toArray();
-
-    case ARRAY_RECORD:
-      return (Object[])content;
-
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
-    }
-  }
-
-  private int parseField(ArrayList<Object> list, char[] record,
-      int start, int end, int index) {
-    String field = String.valueOf(record, start, end-start).trim();
-
-    int fieldType;
-    if (fieldTypes == null) {
-      fieldType = guessType(field);
-    } else {
-      fieldType = fieldTypes[index];
-    }
-
-    switch (fieldType) {
-    case FieldTypes.UTF:
-      if (field.charAt(0) != stringDelimiter ||
-          field.charAt(field.length()-1) != stringDelimiter) {
-        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
-      }
-      list.add(index, unescape(field.substring(1, field.length()-1)));
-      break;
-
-    case FieldTypes.BIN:
-      if (field.charAt(0) != '[' ||
-          field.charAt(field.length()-1) != ']') {
-        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
-      }
-      String[] splits =
-          field.substring(1, field.length()-1).split(String.valueOf(','));
-      byte[] bytes = new byte[splits.length];
-      for (int i=0; i<bytes.length; i++) {
-        bytes[i] = Byte.parseByte(splits[i].trim());
-      }
-      list.add(index, bytes);
-      break;
-
-    case FieldTypes.DOUBLE:
-      list.add(index, Double.parseDouble(field));
-      break;
-
-    case FieldTypes.FLOAT:
-      list.add(index, Float.parseFloat(field));
-      break;
-
-    case FieldTypes.LONG:
-      list.add(index, Long.parseLong(field));
-      break;
-
-    case FieldTypes.INT:
-      list.add(index, Integer.parseInt(field));
-      break;
-
-    case FieldTypes.SHORT:
-      list.add(index, Short.parseShort(field));
-      break;
-
-    case FieldTypes.CHAR:
-      list.add(index, Character.valueOf(field.charAt(0)));
-      break;
-
-    case FieldTypes.BYTE:
-      list.add(index, Byte.parseByte(field));
-      break;
-
-    case FieldTypes.BOOLEAN:
-      list.add(index, Boolean.parseBoolean(field));
-      break;
-
-    case FieldTypes.NULL:
-      list.add(index, null);
-      break;
-
-    default:
-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
-    }
-
-    return ++index;
-  }
-
-  private int guessType(String field) {
-    char[] value = field.toCharArray();
-
-    if (value[0] == stringDelimiter) {
-      return FieldTypes.UTF;
-    }
-
-    switch (value[0]) {
-    case 'n':
-    case 'N':
-      return FieldTypes.NULL;
-    case '[':
-      return FieldTypes.BIN;
-    case 't':
-    case 'f':
-    case 'T':
-    case 'F':
-      return FieldTypes.BOOLEAN;
-    }
-
-    int position = 1;
-    while (position < value.length) {
-      switch (value[position++]) {
-      case '.':
-        return FieldTypes.DOUBLE;
-      }
-    }
-
-    return FieldTypes.LONG;
-  }
-
-  private String escape(String string) {
-    // TODO: Also need to escape those special characters as documented in:
-    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
-    String regex = String.valueOf(stringDelimiter);
-    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
-    return string.replaceAll(regex, replacement);
-  }
-
-  private String unescape(String string) {
-    // TODO: Also need to unescape those special characters as documented in:
-    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
-    String regex = Matcher.quoteReplacement(escapedStringDelimiter);
-    String replacement = String.valueOf(stringDelimiter);
-    return string.replaceAll(regex, replacement);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
deleted file mode 100644
index dafdeb4..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.job.io.SqoopWritable;
-import org.apache.sqoop.job.mr.SqoopSplit;
-import org.apache.sqoop.utils.ClassUtils;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class JobUtils {
-
-  public static boolean runJob(Configuration conf,
-    Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
-    Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
-    Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    Job job = new Job(conf);
-    job.setInputFormatClass(input);
-    job.setMapperClass(mapper);
-    job.setMapOutputKeyClass(SqoopWritable.class);
-    job.setMapOutputValueClass(NullWritable.class);
-    job.setOutputFormatClass(output);
-    job.setOutputKeyClass(SqoopWritable.class);
-    job.setOutputValueClass(NullWritable.class);
-
-    boolean ret = job.waitForCompletion(true);
-
-    // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner
-    if (isHadoop1()) {
-      callOutputCommitter(job, output);
-    }
-
-    return ret;
-  }
-
-  /**
-   * Call output format on given job manually.
-   */
-  private static void callOutputCommitter(Job job, Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException, InterruptedException {
-    OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null);
-
-    JobContext jobContext = mock(JobContext.class);
-    when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
-
-    committer.commitJob(jobContext);
-  }
-
-  /**
-   * Detect Hadoop 1.0 installation
-   *
-   * @return True if and only if this is Hadoop 1 and below
-   */
-  public static boolean isHadoop1() {
-    String version = org.apache.hadoop.util.VersionInfo.getVersion();
-    if (version.matches("\\b0\\.20\\..+\\b")
-      || version.matches("\\b1\\.\\d\\.\\d")) {
-      return true;
-    }
-    return false;
-  }
-
-  private JobUtils() {
-    // Disable explicit object creation
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 78ae4ec..bbac7d2 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.job.etl.Extractor;
@@ -46,17 +47,13 @@ import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.mr.MRConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 import org.apache.sqoop.job.mr.SqoopSplit;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.schema.type.FixedPoint;
-import org.apache.sqoop.schema.type.FloatingPoint;
-import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.job.util.MRJobTestUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -67,11 +64,10 @@ public class TestMapReduce {
   private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
 
   @Test
-  public void testInputFormat() throws Exception {
+  public void testSqoopInputFormat() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
-      CSVIntermediateDataFormat.class.getName());
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
     Job job = new Job(conf);
 
     SqoopInputFormat inputformat = new SqoopInputFormat();
@@ -79,51 +75,47 @@ public class TestMapReduce {
     assertEquals(9, splits.size());
 
     for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
-      SqoopSplit split = (SqoopSplit)splits.get(id-1);
-      DummyPartition partition = (DummyPartition)split.getPartition();
+      SqoopSplit split = (SqoopSplit) splits.get(id - 1);
+      DummyPartition partition = (DummyPartition) split.getPartition();
       assertEquals(id, partition.getId());
     }
   }
 
   @Test
-  public void testMapper() throws Exception {
+  public void testSqoopMapper() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
-      CSVIntermediateDataFormat.class.getName());
-    Schema schema = new Schema("Test");
-    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
-
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
     Job job = new Job(conf);
-    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
-    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
-    boolean success = JobUtils.runJob(job.getConfiguration(),
-        SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
+    // from and to have the same schema in this test case
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema());
+    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
+                                      SqoopInputFormat.class,
+                                      SqoopMapper.class,
+                                      DummyOutputFormat.class);
     Assert.assertEquals("Job failed!", true, success);
   }
 
   @Test
-  public void testOutputFormat() throws Exception {
+  public void testNullOutputFormat() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
     conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
     conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
-      CSVIntermediateDataFormat.class.getName());
-    Schema schema = new Schema("Test");
-    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
-      .addColumn(new Text("3"));
+    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
 
     Job job = new Job(conf);
-    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
-    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
-    boolean success = JobUtils.runJob(job.getConfiguration(),
-        SqoopInputFormat.class, SqoopMapper.class,
-        SqoopNullOutputFormat.class);
+    // from and to have the same schema in this test case
+    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
+    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema());
+    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
+                                     SqoopInputFormat.class,
+                                     SqoopMapper.class,
+                                     SqoopNullOutputFormat.class);
     Assert.assertEquals("Job failed!", true, success);
 
     // Make sure both destroyers get called.
@@ -171,15 +163,17 @@ public class TestMapReduce {
     }
   }
 
-  public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
+  public static class DummyExtractor extends
+      Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
     @Override
-    public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) {
-      int id = ((DummyPartition)partition).getId();
+    public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj,
+        DummyPartition partition) {
+      int id = ((DummyPartition) partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
-        context.getDataWriter().writeArrayRecord(new Object[] {
-            id * NUMBER_OF_ROWS_PER_PARTITION + row,
-            (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
-            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
+        context.getDataWriter().writeArrayRecord(
+            new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row,
+                (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
+                String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) });
       }
     }
 
@@ -189,16 +183,14 @@ public class TestMapReduce {
     }
   }
 
-  public static class DummyOutputFormat
-      extends OutputFormat<SqoopWritable, NullWritable> {
+  public static class DummyOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
     @Override
     public void checkOutputSpecs(JobContext context) {
       // do nothing
     }
 
     @Override
-    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
-        TaskAttemptContext context) {
+    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) {
       return new DummyRecordWriter();
     }
 
@@ -207,22 +199,17 @@ public class TestMapReduce {
       return new DummyOutputCommitter();
     }
 
-    public static class DummyRecordWriter
-        extends RecordWriter<SqoopWritable, NullWritable> {
-      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-      private Data data = new Data();
+    public static class DummyRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
+      private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
+      // should I use a dummy IDF for testing?
+      private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
 
       @Override
       public void write(SqoopWritable key, NullWritable value) {
-
-        data.setContent(new Object[] {
-          index,
-          (double) index,
-          String.valueOf(index)},
-          Data.ARRAY_RECORD);
+        String testData = "" + index + "," +  (double) index + ",'" + String.valueOf(index) + "'";
+        dataFormat.setTextData(testData);
         index++;
-
-        assertEquals(data.toString(), key.toString());
+        assertEquals(dataFormat.getTextData().toString(), key.toString());
       }
 
       @Override
@@ -233,16 +220,20 @@ public class TestMapReduce {
 
     public static class DummyOutputCommitter extends OutputCommitter {
       @Override
-      public void setupJob(JobContext jobContext) { }
+      public void setupJob(JobContext jobContext) {
+      }
 
       @Override
-      public void setupTask(TaskAttemptContext taskContext) { }
+      public void setupTask(TaskAttemptContext taskContext) {
+      }
 
       @Override
-      public void commitTask(TaskAttemptContext taskContext) { }
+      public void commitTask(TaskAttemptContext taskContext) {
+      }
 
       @Override
-      public void abortTask(TaskAttemptContext taskContext) { }
+      public void abortTask(TaskAttemptContext taskContext) {
+      }
 
       @Override
       public boolean needsTaskCommit(TaskAttemptContext taskContext) {
@@ -251,39 +242,34 @@ public class TestMapReduce {
     }
   }
 
+  // it is writing to the target.
   public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> {
-    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-    private Data expected = new Data();
+    private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
+    private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
 
     @Override
-    public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{
+    public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj)
+        throws Exception {
       String data;
       while ((data = context.getDataReader().readTextRecord()) != null) {
-        expected.setContent(new Object[] {
-          index,
-          (double) index,
-          String.valueOf(index)},
-          Data.ARRAY_RECORD);
+        String testData = "" + index + "," +  (double) index + ",'" + String.valueOf(index) + "'";
+        dataFormat.setTextData(testData);
         index++;
-        assertEquals(expected.toString(), data);
+        assertEquals(dataFormat.getTextData().toString(), data);
       }
     }
   }
 
   public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
-
     public static int count = 0;
-
     @Override
     public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
       count++;
     }
   }
 
-  public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> {
-
+  public static class DummyToDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
     public static int count = 0;
-
     @Override
     public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
       count++;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
index 04fb692..a64a4a6 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
@@ -38,16 +38,17 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
-import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.io.SqoopWritable;
 import org.apache.sqoop.job.mr.MRConfigurationUtils;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.util.MRJobTestUtil;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.FloatingPoint;
@@ -121,6 +122,7 @@ public class TestMatching {
     return parameters;
   }
 
+  @SuppressWarnings("deprecation")
   @Test
   public void testSchemaMatching() throws Exception {
     Configuration conf = new Configuration();
@@ -132,9 +134,9 @@ public class TestMatching {
     Job job = new Job(conf);
     MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
     MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
-    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
+    MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
         DummyOutputFormat.class);
-    boolean success = JobUtils.runJob(job.getConfiguration(),
+    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
         SqoopInputFormat.class, SqoopMapper.class,
         DummyOutputFormat.class);
     if (from.getName().split("-")[1].equals("EMPTY")) {
@@ -233,19 +235,14 @@ public class TestMatching {
     public static class DummyRecordWriter
         extends RecordWriter<SqoopWritable, NullWritable> {
       private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
-      private Data data = new Data();
+      private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
 
       @Override
       public void write(SqoopWritable key, NullWritable value) {
-
-        data.setContent(new Object[] {
-                index,
-                (double) index,
-                String.valueOf(index)},
-            Data.ARRAY_RECORD);
+        String testData = "" + index + "," +  (double) index + ",'" + String.valueOf(index) + "'";
+        dataFormat.setTextData(testData);
         index++;
-
-        assertEquals(data.toString(), key.toString());
+        assertEquals(dataFormat.getTextData().toString(), key.toString());
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
deleted file mode 100644
index 68ce5ed..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sqoop.job.io;
-
-import com.google.common.base.Charsets;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
-import org.apache.sqoop.job.MRJobConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SqoopWritableTest {
-
-  private final SqoopWritable writable = new SqoopWritable();
-
-  @Test
-  public void testStringInStringOut() {
-    String testData = "Live Long and prosper";
-    writable.setString(testData);
-    Assert.assertEquals(testData,writable.getString());
-  }
-
-  @Test
-  public void testDataWritten() throws IOException {
-    String testData = "One ring to rule them all";
-    byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
-    writable.setString(testData);
-    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
-    DataOutput out = new DataOutputStream(ostream);
-    writable.write(out);
-    byte[] written = ostream.toByteArray();
-    InputStream instream = new ByteArrayInputStream(written);
-    DataInput in = new DataInputStream(instream);
-    String readData = in.readUTF();
-    Assert.assertEquals(testData, readData);
-  }
-
-  @Test
-  public void testDataRead() throws IOException {
-    String testData = "Brandywine Bridge - 20 miles!";
-    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
-    DataOutput out = new DataOutputStream(ostream);
-    out.writeUTF(testData);
-    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
-    DataInput in = new DataInputStream(instream);
-    writable.readFields(in);
-    Assert.assertEquals(testData, writable.getString());
-  }
-
-  @Test
-  public void testWriteReadUsingStream() throws IOException {
-    String testData = "You shall not pass";
-    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
-    DataOutput out = new DataOutputStream(ostream);
-    writable.setString(testData);
-    writable.write(out);
-    byte[] written = ostream.toByteArray();
-
-    //Don't test what the data is, test that SqoopWritable can read it.
-    InputStream instream = new ByteArrayInputStream(written);
-    SqoopWritable newWritable = new SqoopWritable();
-    DataInput in = new DataInputStream(instream);
-    newWritable.readFields(in);
-    Assert.assertEquals(testData, newWritable.getString());
-    ostream.close();
-    instream.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
deleted file mode 100644
index 4e23bcb..0000000
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.io;
-
-import java.util.Arrays;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestData {
-
-  private static final double TEST_NUMBER = Math.PI + 100;
-  @Test
-  public void testArrayToCsv() throws Exception {
-    Data data = new Data();
-    String expected;
-    String actual;
-
-    // with special characters:
-    expected =
-        Long.valueOf((long)TEST_NUMBER) + "," +
-        Double.valueOf(TEST_NUMBER) + "," +
-        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
-        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
-    data.setContent(new Object[] {
-        Long.valueOf((long)TEST_NUMBER),
-        Double.valueOf(TEST_NUMBER),
-        String.valueOf(TEST_NUMBER) + "',s",
-        new byte[] {1, 2, 3, 4, 5} },
-        Data.ARRAY_RECORD);
-    actual = (String)data.getContent(Data.CSV_RECORD);
-    assertEquals(expected, actual);
-
-    // with null characters:
-    expected =
-        Long.valueOf((long)TEST_NUMBER) + "," +
-        Double.valueOf(TEST_NUMBER) + "," +
-        "null" + "," +
-        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
-    data.setContent(new Object[] {
-        Long.valueOf((long)TEST_NUMBER),
-        Double.valueOf(TEST_NUMBER),
-        null,
-        new byte[] {1, 2, 3, 4, 5} },
-        Data.ARRAY_RECORD);
-    actual = (String)data.getContent(Data.CSV_RECORD);
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testCsvToArray() throws Exception {
-    Data data = new Data();
-    Object[] expected;
-    Object[] actual;
-
-    // with special characters:
-    expected = new Object[] {
-        Long.valueOf((long)TEST_NUMBER),
-        Double.valueOf(TEST_NUMBER),
-        String.valueOf(TEST_NUMBER) + "',s",
-        new byte[] {1, 2, 3, 4, 5} };
-    data.setContent(
-        Long.valueOf((long)TEST_NUMBER) + "," +
-        Double.valueOf(TEST_NUMBER) + "," +
-        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
-        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
-        Data.CSV_RECORD);
-    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
-    assertEquals(expected.length, actual.length);
-    for (int c=0; c<expected.length; c++) {
-      assertEquals(expected[c], actual[c]);
-    }
-
-    // with null characters:
-    expected = new Object[] {
-        Long.valueOf((long)TEST_NUMBER),
-        Double.valueOf(TEST_NUMBER),
-        null,
-        new byte[] {1, 2, 3, 4, 5} };
-    data.setContent(
-        Long.valueOf((long)TEST_NUMBER) + "," +
-        Double.valueOf(TEST_NUMBER) + "," +
-        "null" + "," +
-        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
-        Data.CSV_RECORD);
-    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
-    assertEquals(expected.length, actual.length);
-    for (int c=0; c<expected.length; c++) {
-      assertEquals(expected[c], actual[c]);
-    }
-  }
-
-  public static void assertEquals(Object expected, Object actual) {
-    if (expected instanceof byte[]) {
-      Assert.assertEquals(Arrays.toString((byte[])expected),
-          Arrays.toString((byte[])actual));
-    } else {
-      Assert.assertEquals(expected, actual);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
new file mode 100644
index 0000000..3207e53
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSqoopWritable {
+
+  private final SqoopWritable writable = new SqoopWritable();
+
+  @Test
+  public void testStringInStringOut() {
+    String testData = "Live Long and prosper";
+    writable.setString(testData);
+    Assert.assertEquals(testData,writable.getString());
+  }
+
+  @Test
+  public void testDataWritten() throws IOException {
+    String testData = "One ring to rule them all";
+    writable.setString(testData);
+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(ostream);
+    writable.write(out);
+    byte[] written = ostream.toByteArray();
+    InputStream instream = new ByteArrayInputStream(written);
+    DataInput in = new DataInputStream(instream);
+    String readData = in.readUTF();
+    Assert.assertEquals(testData, readData);
+  }
+
+  @Test
+  public void testDataRead() throws IOException {
+    String testData = "Brandywine Bridge - 20 miles!";
+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(ostream);
+    out.writeUTF(testData);
+    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
+    DataInput in = new DataInputStream(instream);
+    writable.readFields(in);
+    Assert.assertEquals(testData, writable.getString());
+  }
+
+  @Test
+  public void testWriteReadUsingStream() throws IOException {
+    String testData = "You shall not pass";
+    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(ostream);
+    writable.setString(testData);
+    writable.write(out);
+    byte[] written = ostream.toByteArray();
+
+    //Don't test what the data is, test that SqoopWritable can read it.
+    InputStream instream = new ByteArrayInputStream(written);
+    SqoopWritable newWritable = new SqoopWritable();
+    DataInput in = new DataInputStream(instream);
+    newWritable.readFields(in);
+    Assert.assertEquals(testData, newWritable.getString());
+    ostream.close();
+    instream.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 5bd11f0..67e965d 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -18,6 +18,10 @@
  */
 package org.apache.sqoop.job.mr;
 
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -28,14 +32,11 @@ import org.apache.sqoop.job.MRJobConstants;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.job.util.MRJobTestUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ConcurrentModificationException;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.TimeUnit;
-
 public class TestSqoopOutputFormatLoadExecutor {
 
   private Configuration conf;
@@ -130,12 +131,12 @@ public class TestSqoopOutputFormatLoadExecutor {
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable();
     try {
       for (int count = 0; count < 100; count++) {
-        data.setTextData(String.valueOf(count));
-        writable.setString(data.getTextData());
+        dataFormat.setTextData(String.valueOf(count));
+        writable.setString(dataFormat.getTextData());
         writer.write(writable, null);
       }
     } catch (SqoopException ex) {
@@ -149,7 +150,7 @@ public class TestSqoopOutputFormatLoadExecutor {
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable();
     for (int i = 0; i < 10; i++) {
       StringBuilder builder = new StringBuilder();
@@ -159,8 +160,8 @@ public class TestSqoopOutputFormatLoadExecutor {
           builder.append(",");
         }
       }
-      data.setTextData(builder.toString());
-      writable.setString(data.getTextData());
+      dataFormat.setTextData(builder.toString());
+      writable.setString(dataFormat.getTextData());
       writer.write(writable, null);
     }
     writer.close(null);
@@ -171,7 +172,7 @@ public class TestSqoopOutputFormatLoadExecutor {
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable();
     StringBuilder builder = new StringBuilder();
     for (int count = 0; count < 100; count++) {
@@ -180,8 +181,8 @@ public class TestSqoopOutputFormatLoadExecutor {
         builder.append(",");
       }
     }
-    data.setTextData(builder.toString());
-    writable.setString(data.getTextData());
+    dataFormat.setTextData(builder.toString());
+    writable.setString(dataFormat.getTextData());
     writer.write(writable, null);
 
     //Allow writer to complete.
@@ -196,7 +197,7 @@ public class TestSqoopOutputFormatLoadExecutor {
     SqoopOutputFormatLoadExecutor executor = new
         SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
+    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
     SqoopWritable writable = new SqoopWritable();
     try {
       for (int i = 0; i < 10; i++) {
@@ -207,8 +208,8 @@ public class TestSqoopOutputFormatLoadExecutor {
             builder.append(",");
           }
         }
-        data.setTextData(builder.toString());
-        writable.setString(data.getTextData());
+        dataFormat.setTextData(builder.toString());
+        writable.setString(dataFormat.getTextData());
         writer.write(writable, null);
       }
       writer.close(null);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
new file mode 100644
index 0000000..5d5359e
--- /dev/null
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+import org.apache.sqoop.job.io.SqoopWritable;
+import org.apache.sqoop.job.mr.SqoopSplit;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.utils.ClassUtils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MRJobTestUtil {
+
+  @SuppressWarnings("deprecation")
+  public static boolean runJob(Configuration conf,
+      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+      Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
+      Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    Job job = new Job(conf);
+    job.setInputFormatClass(input);
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(SqoopWritable.class);
+    job.setMapOutputValueClass(NullWritable.class);
+    job.setOutputFormatClass(output);
+    job.setOutputKeyClass(SqoopWritable.class);
+    job.setOutputValueClass(NullWritable.class);
+
+    boolean ret = job.waitForCompletion(true);
+
+    // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
+    // LocalJobRuner
+    if (isHadoop1()) {
+      callOutputCommitter(job, output);
+    }
+
+    return ret;
+  }
+
+  public static Schema getTestSchema() {
+    Schema schema = new Schema("Test");
+    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+        .addColumn(new Text("3"));
+    return schema;
+  }
+
+  public static IntermediateDataFormat<?> getTestIDF() {
+    return new CSVIntermediateDataFormat(getTestSchema());
+  }
+
+  /**
+   * Call output format on given job manually.
+   */
+  private static void callOutputCommitter(Job job,
+      Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException,
+      InterruptedException {
+    OutputCommitter committer = ((OutputFormat<?,?>) ClassUtils.instantiate(outputFormat))
+        .getOutputCommitter(null);
+
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
+
+    committer.commitJob(jobContext);
+  }
+
+  /**
+   * Detect Hadoop 1.0 installation
+   *
+   * @return True if and only if this is Hadoop 1 and below
+   */
+  public static boolean isHadoop1() {
+    String version = org.apache.hadoop.util.VersionInfo.getVersion();
+    if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) {
+      return true;
+    }
+    return false;
+  }
+
+  private MRJobTestUtil() {
+    // Disable explicit object creation
+  }
+
+}


Mime
View raw message