sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [2/2] sqoop git commit: SQOOP-1348: Sqoop2: Remove Data class
Date Mon, 10 Nov 2014 21:41:31 GMT
SQOOP-1348: Sqoop2: Remove Data class

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ace22237
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ace22237
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ace22237

Branch: refs/heads/sqoop2
Commit: ace222374b9f70915e16eaa9af8360940951bfd2
Parents: 49a7431
Author: Abraham Elmahrek <abraham@elmahrek.com>
Authored: Mon Nov 10 13:18:38 2014 -0800
Committer: Abraham Elmahrek <abraham@elmahrek.com>
Committed: Mon Nov 10 13:18:38 2014 -0800

----------------------------------------------------------------------
 SQOOP-1348.patch                                | 1844 ++++++++++++++++++
 .../java/org/apache/sqoop/schema/Schema.java    |    7 +-
 .../idf/CSVIntermediateDataFormat.java          |   10 +-
 .../idf/TestCSVIntermediateDataFormat.java      |  106 +-
 .../main/java/org/apache/sqoop/job/io/Data.java |  529 -----
 .../java/org/apache/sqoop/job/JobUtils.java     |   93 -
 .../org/apache/sqoop/job/TestMapReduce.java     |  132 +-
 .../java/org/apache/sqoop/job/TestMatching.java |   21 +-
 .../apache/sqoop/job/io/SqoopWritableTest.java  |   95 -
 .../java/org/apache/sqoop/job/io/TestData.java  |  117 --
 .../apache/sqoop/job/io/TestSqoopWritable.java  |   89 +
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |   33 +-
 .../apache/sqoop/job/util/MRJobTestUtil.java    |  114 ++
 13 files changed, 2201 insertions(+), 989 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/SQOOP-1348.patch
----------------------------------------------------------------------
diff --git a/SQOOP-1348.patch b/SQOOP-1348.patch
new file mode 100644
index 0000000..7834a3f
--- /dev/null
+++ b/SQOOP-1348.patch
@@ -0,0 +1,1844 @@
+diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java
+index 40c362c..3aa3aea 100644
+--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java
++++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java
+@@ -122,12 +122,7 @@ public Schema setCreationDate(Date creationDate) {
+   }
+ 
+   public boolean isEmpty() {
+-    if (columns.size()==0) {
+-      return true;
+-    } else {
+-      return false;
+-    }
+-
++    return columns.size() == 0;
+   }
+ 
+   public String toString() {
+diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+index e0e4061..e65edd9 100644
+--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
++++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+@@ -67,9 +67,15 @@
+ 
+   private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
+   private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
+-
+   private Schema schema;
+ 
++  public CSVIntermediateDataFormat() {
++  }
++
++  public CSVIntermediateDataFormat(Schema schema) {
++    setSchema(schema);
++  }
++
+   /**
+    * {@inheritDoc}
+    */
+@@ -166,7 +172,7 @@ public void setSchema(Schema schema) {
+    */
+   @Override
+   public Object[] getObjectData() {
+-    if (schema.isEmpty()) {
++    if (schema == null || schema.isEmpty()) {
+       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
+     }
+ 
+diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+index 72e95ed..fcf6c3c 100644
+--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
++++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+@@ -41,11 +41,11 @@
+ 
+   private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
+ 
+-  private IntermediateDataFormat<?> data;
++  private IntermediateDataFormat<?> dataFormat;
+ 
+   @Before
+   public void setUp() {
+-    data = new CSVIntermediateDataFormat();
++    dataFormat = new CSVIntermediateDataFormat();
+   }
+ 
+   private String getByteFieldString(byte[] byteFieldData) {
+@@ -61,8 +61,8 @@ private String getByteFieldString(byte[] byteFieldData) {
+   public void testStringInStringOut() {
+     String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+       + ",'" + String.valueOf(0x0A) + "'";
+-    data.setTextData(testData);
+-    assertEquals(testData, data.getTextData());
++    dataFormat.setTextData(testData);
++    assertEquals(testData, dataFormat.getTextData());
+   }
+ 
+   @Test
+@@ -74,10 +74,10 @@ public void testNullStringInObjectOut() {
+         .addColumn(new Text("4"))
+         .addColumn(new Binary("5"))
+         .addColumn(new Text("6"));
+-    data.setSchema(schema);
+-    data.setTextData(null);
++    dataFormat.setSchema(schema);
++    dataFormat.setTextData(null);
+ 
+-    Object[] out = data.getObjectData();
++    Object[] out = dataFormat.getObjectData();
+ 
+     assertNull(out);
+   }
+@@ -91,10 +91,10 @@ public void testEmptyStringInObjectOut() {
+         .addColumn(new Text("4"))
+         .addColumn(new Binary("5"))
+         .addColumn(new Text("6"));
+-    data.setSchema(schema);
+-    data.setTextData("");
++    dataFormat.setSchema(schema);
++    dataFormat.setTextData("");
+ 
+-    data.getObjectData();
++    dataFormat.getObjectData();
+   }
+ 
+   @Test
+@@ -111,10 +111,10 @@ public void testStringInObjectOut() {
+         .addColumn(new Binary("5"))
+         .addColumn(new Text("6"));
+ 
+-    data.setSchema(schema);
+-    data.setTextData(testData);
++    dataFormat.setSchema(schema);
++    dataFormat.setTextData(testData);
+ 
+-    Object[] out = data.getObjectData();
++    Object[] out = dataFormat.getObjectData();
+ 
+     assertEquals(new Long(10),out[0]);
+     assertEquals(new Long(34),out[1]);
+@@ -134,7 +134,7 @@ public void testObjectInStringOut() {
+         .addColumn(new Text("4"))
+         .addColumn(new Binary("5"))
+         .addColumn(new Text("6"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
+     Object[] in = new Object[6];
+@@ -145,12 +145,12 @@ public void testObjectInStringOut() {
+     in[4] = byteFieldData;
+     in[5] = new String(new char[] { 0x0A });
+ 
+-    data.setObjectData(in);
++    dataFormat.setObjectData(in);
+ 
+     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+     String testData = "10,34,'54','random data'," +
+         getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
+-    assertEquals(testData, data.getTextData());
++    assertEquals(testData, dataFormat.getTextData());
+   }
+ 
+   @Test
+@@ -164,7 +164,7 @@ public void testObjectInObjectOut() {
+         .addColumn(new Text("4"))
+         .addColumn(new Binary("5"))
+         .addColumn(new Text("6"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     Object[] in = new Object[6];
+     in[0] = new Long(10);
+@@ -177,9 +177,9 @@ public void testObjectInObjectOut() {
+     System.arraycopy(in,0,inCopy,0,in.length);
+ 
+     // Modifies the input array, so we use the copy to confirm
+-    data.setObjectData(in);
++    dataFormat.setObjectData(in);
+ 
+-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
++    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
+   }
+ 
+   @Test
+@@ -191,7 +191,7 @@ public void testObjectWithNullInStringOut() {
+         .addColumn(new Text("4"))
+         .addColumn(new Binary("5"))
+         .addColumn(new Text("6"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
+     Object[] in = new Object[6];
+@@ -202,12 +202,12 @@ public void testObjectWithNullInStringOut() {
+     in[4] = byteFieldData;
+     in[5] = new String(new char[] { 0x0A });
+ 
+-    data.setObjectData(in);
++    dataFormat.setObjectData(in);
+ 
+     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
+     String testData = "10,34,NULL,'random data'," +
+         getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
+-    assertEquals(testData, data.getTextData());
++    assertEquals(testData, dataFormat.getTextData());
+   }
+ 
+   @Test
+@@ -215,7 +215,7 @@ public void testStringFullRangeOfCharacters() {
+     Schema schema = new Schema("test");
+     schema.addColumn(new Text("1"));
+ 
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     char[] allCharArr = new char[256];
+     for(int i = 0; i < allCharArr.length; ++i) {
+@@ -228,17 +228,17 @@ public void testStringFullRangeOfCharacters() {
+     System.arraycopy(in, 0, inCopy, 0, in.length);
+ 
+     // Modifies the input array, so we use the copy to confirm
+-    data.setObjectData(in);
++    dataFormat.setObjectData(in);
+ 
+-    assertEquals(strData, data.getObjectData()[0]);
+-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
++    assertEquals(strData, dataFormat.getObjectData()[0]);
++    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
+   }
+ 
+   @Test
+   public void testByteArrayFullRangeOfCharacters() {
+     Schema schema = new Schema("test");
+     schema.addColumn(new Binary("1"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     byte[] allCharByteArr = new byte[256];
+     for (int i = 0; i < allCharByteArr.length; ++i) {
+@@ -250,32 +250,32 @@ public void testByteArrayFullRangeOfCharacters() {
+     System.arraycopy(in, 0, inCopy, 0, in.length);
+ 
+     // Modifies the input array, so we use the copy to confirm
+-    data.setObjectData(in);
+-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
++    dataFormat.setObjectData(in);
++    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
+   }
+ 
+   @Test
+   public void testDate() {
+     Schema schema = new Schema("test");
+     schema.addColumn(new Date("1"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+-    data.setTextData("2014-10-01");
+-    assertEquals("2014-10-01", data.getObjectData()[0].toString());
++    dataFormat.setTextData("2014-10-01");
++    assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString());
+   }
+ 
+   @Test
+   public void testDateTime() {
+     Schema schema = new Schema("test");
+     schema.addColumn(new DateTime("1"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     for (String dateTime : new String[]{
+         "2014-10-01T12:00:00",
+         "2014-10-01T12:00:00.000"
+     }) {
+-      data.setTextData(dateTime);
+-      assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString());
++      dataFormat.setTextData(dateTime);
++      assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
+     }
+   }
+ 
+@@ -289,14 +289,14 @@ public void testDateTime() {
+   public void testDateTimeISO8601Alternative() {
+     Schema schema = new Schema("test");
+     schema.addColumn(new DateTime("1"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     for (String dateTime : new String[]{
+         "2014-10-01 12:00:00",
+         "2014-10-01 12:00:00.000"
+     }) {
+-      data.setTextData(dateTime);
+-      assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString());
++      dataFormat.setTextData(dateTime);
++      assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
+     }
+   }
+ 
+@@ -304,20 +304,20 @@ public void testDateTimeISO8601Alternative() {
+   public void testBit() {
+     Schema schema = new Schema("test");
+     schema.addColumn(new Bit("1"));
+-    data.setSchema(schema);
++    dataFormat.setSchema(schema);
+ 
+     for (String trueBit : new String[]{
+         "true", "TRUE", "1"
+     }) {
+-      data.setTextData(trueBit);
+-      assertTrue((Boolean) data.getObjectData()[0]);
++      dataFormat.setTextData(trueBit);
++      assertTrue((Boolean) dataFormat.getObjectData()[0]);
+     }
+ 
+     for (String falseBit : new String[]{
+         "false", "FALSE", "0"
+     }) {
+-      data.setTextData(falseBit);
+-      assertFalse((Boolean) data.getObjectData()[0]);
++      dataFormat.setTextData(falseBit);
++      assertFalse((Boolean) dataFormat.getObjectData()[0]);
+     }
+   }
+ 
+@@ -326,9 +326,23 @@ public void testEmptySchema() {
+     String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
+         + ",'\\n'";
+     Schema schema = new Schema("Test");
+-    data.setSchema(schema);
+-    data.setTextData(testData);
++    dataFormat.setSchema(schema);
++    dataFormat.setTextData(testData);
+ 
+-    Object[] out = data.getObjectData();
++    @SuppressWarnings("unused")
++    Object[] out = dataFormat.getObjectData();
++  }
++
++  @Test(expected = SqoopException.class)
++  public void testNullSchema() {
++    dataFormat.setSchema(null);
++    @SuppressWarnings("unused")
++    Object[] out = dataFormat.getObjectData();
++  }
++
++  @Test(expected = SqoopException.class)
++  public void testNotSettingSchema() {
++    @SuppressWarnings("unused")
++    Object[] out = dataFormat.getObjectData();
+   }
+ }
+diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
+deleted file mode 100644
+index 139883e..0000000
+--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
++++ /dev/null
+@@ -1,529 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.sqoop.job.io;
+-
+-import java.io.DataInput;
+-import java.io.DataOutput;
+-import java.io.IOException;
+-import java.nio.charset.Charset;
+-import java.util.ArrayList;
+-import java.util.Arrays;
+-import java.util.regex.Matcher;
+-
+-import org.apache.hadoop.io.WritableComparable;
+-import org.apache.hadoop.io.WritableComparator;
+-import org.apache.hadoop.io.WritableUtils;
+-import org.apache.sqoop.common.SqoopException;
+-import org.apache.sqoop.job.MRExecutionError;
+-
+-public class Data implements WritableComparable<Data> {
+-
+-  // The content is an Object to accommodate different kinds of data.
+-  // For example, it can be:
+-  // - Object[] for an array of object record
+-  // - String for a text of CSV record
+-  private volatile Object content = null;
+-
+-  public static final int EMPTY_DATA = 0;
+-  public static final int CSV_RECORD = 1;
+-  public static final int ARRAY_RECORD = 2;
+-  private int type = EMPTY_DATA;
+-
+-  public static final String CHARSET_NAME = "UTF-8";
+-
+-  public static final char DEFAULT_RECORD_DELIMITER = '\n';
+-  public static final char DEFAULT_FIELD_DELIMITER = ',';
+-  public static final char DEFAULT_STRING_DELIMITER = '\'';
+-  public static final char DEFAULT_STRING_ESCAPE = '\\';
+-  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+-  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
+-  private char stringEscape = DEFAULT_STRING_ESCAPE;
+-  private String escapedStringDelimiter = String.valueOf(new char[] {
+-      stringEscape, stringDelimiter
+-  });
+-
+-  private int[] fieldTypes = null;
+-
+-  public void setFieldDelimiter(char fieldDelimiter) {
+-    this.fieldDelimiter = fieldDelimiter;
+-  }
+-
+-  public void setFieldTypes(int[] fieldTypes) {
+-    this.fieldTypes = fieldTypes;
+-  }
+-
+-  public void setContent(Object content, int type) {
+-    switch (type) {
+-    case EMPTY_DATA:
+-    case CSV_RECORD:
+-    case ARRAY_RECORD:
+-      this.type = type;
+-      this.content = content;
+-      break;
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+-    }
+-  }
+-
+-  public Object getContent(int targetType) {
+-    switch (targetType) {
+-    case CSV_RECORD:
+-      return format();
+-    case ARRAY_RECORD:
+-      return parse();
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(targetType));
+-    }
+-  }
+-
+-  public int getType() {
+-    return type;
+-  }
+-
+-  public boolean isEmpty() {
+-    return (type == EMPTY_DATA);
+-  }
+-
+-  @Override
+-  public String toString() {
+-    return (String)getContent(CSV_RECORD);
+-  }
+-
+-  @Override
+-  public int compareTo(Data other) {
+-    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
+-    byte[] otherBytes = other.toString().getBytes(
+-        Charset.forName(CHARSET_NAME));
+-    return WritableComparator.compareBytes(
+-        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
+-  }
+-
+-  @Override
+-  public boolean equals(Object other) {
+-    if (!(other instanceof Data)) {
+-      return false;
+-    }
+-
+-    Data data = (Data)other;
+-    if (type != data.getType()) {
+-      return false;
+-    }
+-
+-    return toString().equals(data.toString());
+-  }
+-
+-  @Override
+-  public int hashCode() {
+-    int result = super.hashCode();
+-    switch (type) {
+-    case CSV_RECORD:
+-      result += 31 * content.hashCode();
+-      return result;
+-    case ARRAY_RECORD:
+-      Object[] array = (Object[])content;
+-      for (int i = 0; i < array.length; i++) {
+-        result += 31 * array[i].hashCode();
+-      }
+-      return result;
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+-    }
+-  }
+-
+-  @Override
+-  public void readFields(DataInput in) throws IOException {
+-    type = readType(in);
+-    switch (type) {
+-    case CSV_RECORD:
+-      readCsv(in);
+-      break;
+-    case ARRAY_RECORD:
+-      readArray(in);
+-      break;
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+-    }
+-  }
+-
+-  @Override
+-  public void write(DataOutput out) throws IOException {
+-    writeType(out, type);
+-    switch (type) {
+-    case CSV_RECORD:
+-      writeCsv(out);
+-      break;
+-    case ARRAY_RECORD:
+-      writeArray(out);
+-      break;
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+-    }
+-  }
+-
+-  private int readType(DataInput in) throws IOException {
+-    return WritableUtils.readVInt(in);
+-  }
+-
+-  private void writeType(DataOutput out, int type) throws IOException {
+-    WritableUtils.writeVInt(out, type);
+-  }
+-
+-  private void readCsv(DataInput in) throws IOException {
+-    content = in.readUTF();
+-  }
+-
+-  private void writeCsv(DataOutput out) throws IOException {
+-    out.writeUTF((String)content);
+-  }
+-
+-  private void readArray(DataInput in) throws IOException {
+-    // read number of columns
+-    int columns = in.readInt();
+-    content = new Object[columns];
+-    Object[] array = (Object[])content;
+-    // read each column
+-    for (int i = 0; i < array.length; i++) {
+-      int type = readType(in);
+-      switch (type) {
+-      case FieldTypes.UTF:
+-        array[i] = in.readUTF();
+-        break;
+-
+-      case FieldTypes.BIN:
+-        int length = in.readInt();
+-        byte[] bytes = new byte[length];
+-        in.readFully(bytes);
+-        array[i] = bytes;
+-        break;
+-
+-      case FieldTypes.DOUBLE:
+-        array[i] = in.readDouble();
+-        break;
+-
+-      case FieldTypes.FLOAT:
+-        array[i] = in.readFloat();
+-        break;
+-
+-      case FieldTypes.LONG:
+-        array[i] = in.readLong();
+-        break;
+-
+-      case FieldTypes.INT:
+-        array[i] = in.readInt();
+-        break;
+-
+-      case FieldTypes.SHORT:
+-        array[i] = in.readShort();
+-        break;
+-
+-      case FieldTypes.CHAR:
+-        array[i] = in.readChar();
+-        break;
+-
+-      case FieldTypes.BYTE:
+-        array[i] = in.readByte();
+-        break;
+-
+-      case FieldTypes.BOOLEAN:
+-        array[i] = in.readBoolean();
+-        break;
+-
+-      case FieldTypes.NULL:
+-        array[i] = null;
+-        break;
+-
+-      default:
+-        throw new IOException(
+-          new SqoopException(MRExecutionError.MAPRED_EXEC_0012, Integer.toString(type))
+-        );
+-      }
+-    }
+-  }
+-
+-  private void writeArray(DataOutput out) throws IOException {
+-    Object[] array = (Object[])content;
+-    // write number of columns
+-    out.writeInt(array.length);
+-    // write each column
+-    for (int i = 0; i < array.length; i++) {
+-      if (array[i] instanceof String) {
+-        writeType(out, FieldTypes.UTF);
+-        out.writeUTF((String)array[i]);
+-
+-      } else if (array[i] instanceof byte[]) {
+-        writeType(out, FieldTypes.BIN);
+-        out.writeInt(((byte[])array[i]).length);
+-        out.write((byte[])array[i]);
+-
+-      } else if (array[i] instanceof Double) {
+-        writeType(out, FieldTypes.DOUBLE);
+-        out.writeDouble((Double)array[i]);
+-
+-      } else if (array[i] instanceof Float) {
+-        writeType(out, FieldTypes.FLOAT);
+-        out.writeFloat((Float)array[i]);
+-
+-      } else if (array[i] instanceof Long) {
+-        writeType(out, FieldTypes.LONG);
+-        out.writeLong((Long)array[i]);
+-
+-      } else if (array[i] instanceof Integer) {
+-        writeType(out, FieldTypes.INT);
+-        out.writeInt((Integer)array[i]);
+-
+-      } else if (array[i] instanceof Short) {
+-        writeType(out, FieldTypes.SHORT);
+-        out.writeShort((Short)array[i]);
+-
+-      } else if (array[i] instanceof Character) {
+-        writeType(out, FieldTypes.CHAR);
+-        out.writeChar((Character)array[i]);
+-
+-      } else if (array[i] instanceof Byte) {
+-        writeType(out, FieldTypes.BYTE);
+-        out.writeByte((Byte)array[i]);
+-
+-      } else if (array[i] instanceof Boolean) {
+-        writeType(out, FieldTypes.BOOLEAN);
+-        out.writeBoolean((Boolean)array[i]);
+-
+-      } else if (array[i] == null) {
+-        writeType(out, FieldTypes.NULL);
+-
+-      } else {
+-        throw new IOException(
+-          new SqoopException(MRExecutionError.MAPRED_EXEC_0012,
+-            array[i].getClass().getName()
+-          )
+-        );
+-      }
+-    }
+-  }
+-
+-  private String format() {
+-    switch (type) {
+-    case EMPTY_DATA:
+-      return null;
+-
+-    case CSV_RECORD:
+-      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
+-        return (String)content;
+-      } else {
+-        // TODO: need to exclude the case where comma is part of a string.
+-        return ((String)content).replaceAll(
+-            String.valueOf(DEFAULT_FIELD_DELIMITER),
+-            String.valueOf(fieldDelimiter));
+-      }
+-
+-    case ARRAY_RECORD:
+-      StringBuilder sb = new StringBuilder();
+-      Object[] array = (Object[])content;
+-      for (int i = 0; i < array.length; i++) {
+-        if (i != 0) {
+-          sb.append(fieldDelimiter);
+-        }
+-
+-        if (array[i] instanceof String) {
+-          sb.append(stringDelimiter);
+-          sb.append(escape((String)array[i]));
+-          sb.append(stringDelimiter);
+-        } else if (array[i] instanceof byte[]) {
+-          sb.append(Arrays.toString((byte[])array[i]));
+-        } else {
+-          sb.append(String.valueOf(array[i]));
+-        }
+-      }
+-      return sb.toString();
+-
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+-    }
+-  }
+-
+-  private Object[] parse() {
+-    switch (type) {
+-    case EMPTY_DATA:
+-      return null;
+-
+-    case CSV_RECORD:
+-      ArrayList<Object> list = new ArrayList<Object>();
+-      char[] record = ((String)content).toCharArray();
+-      int start = 0;
+-      int position = start;
+-      boolean stringDelimited = false;
+-      boolean arrayDelimited = false;
+-      int index = 0;
+-      while (position < record.length) {
+-        if (record[position] == fieldDelimiter) {
+-          if (!stringDelimited && !arrayDelimited) {
+-            index = parseField(list, record, start, position, index);
+-            start = position + 1;
+-          }
+-        } else if (record[position] == stringDelimiter) {
+-          if (!stringDelimited) {
+-            stringDelimited = true;
+-          }
+-          else if (position > 0 && record[position-1] != stringEscape) {
+-            stringDelimited = false;
+-          }
+-        } else if (record[position] == '[') {
+-          if (!stringDelimited) {
+-            arrayDelimited = true;
+-          }
+-        } else if (record[position] == ']') {
+-          if (!stringDelimited) {
+-            arrayDelimited = false;
+-          }
+-        }
+-        position++;
+-      }
+-      parseField(list, record, start, position, index);
+-      return list.toArray();
+-
+-    case ARRAY_RECORD:
+-      return (Object[])content;
+-
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(type));
+-    }
+-  }
+-
+-  private int parseField(ArrayList<Object> list, char[] record,
+-      int start, int end, int index) {
+-    String field = String.valueOf(record, start, end-start).trim();
+-
+-    int fieldType;
+-    if (fieldTypes == null) {
+-      fieldType = guessType(field);
+-    } else {
+-      fieldType = fieldTypes[index];
+-    }
+-
+-    switch (fieldType) {
+-    case FieldTypes.UTF:
+-      if (field.charAt(0) != stringDelimiter ||
+-          field.charAt(field.length()-1) != stringDelimiter) {
+-        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
+-      }
+-      list.add(index, unescape(field.substring(1, field.length()-1)));
+-      break;
+-
+-    case FieldTypes.BIN:
+-      if (field.charAt(0) != '[' ||
+-          field.charAt(field.length()-1) != ']') {
+-        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0022);
+-      }
+-      String[] splits =
+-          field.substring(1, field.length()-1).split(String.valueOf(','));
+-      byte[] bytes = new byte[splits.length];
+-      for (int i=0; i<bytes.length; i++) {
+-        bytes[i] = Byte.parseByte(splits[i].trim());
+-      }
+-      list.add(index, bytes);
+-      break;
+-
+-    case FieldTypes.DOUBLE:
+-      list.add(index, Double.parseDouble(field));
+-      break;
+-
+-    case FieldTypes.FLOAT:
+-      list.add(index, Float.parseFloat(field));
+-      break;
+-
+-    case FieldTypes.LONG:
+-      list.add(index, Long.parseLong(field));
+-      break;
+-
+-    case FieldTypes.INT:
+-      list.add(index, Integer.parseInt(field));
+-      break;
+-
+-    case FieldTypes.SHORT:
+-      list.add(index, Short.parseShort(field));
+-      break;
+-
+-    case FieldTypes.CHAR:
+-      list.add(index, Character.valueOf(field.charAt(0)));
+-      break;
+-
+-    case FieldTypes.BYTE:
+-      list.add(index, Byte.parseByte(field));
+-      break;
+-
+-    case FieldTypes.BOOLEAN:
+-      list.add(index, Boolean.parseBoolean(field));
+-      break;
+-
+-    case FieldTypes.NULL:
+-      list.add(index, null);
+-      break;
+-
+-    default:
+-      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0012, String.valueOf(fieldType));
+-    }
+-
+-    return ++index;
+-  }
+-
+-  private int guessType(String field) {
+-    char[] value = field.toCharArray();
+-
+-    if (value[0] == stringDelimiter) {
+-      return FieldTypes.UTF;
+-    }
+-
+-    switch (value[0]) {
+-    case 'n':
+-    case 'N':
+-      return FieldTypes.NULL;
+-    case '[':
+-      return FieldTypes.BIN;
+-    case 't':
+-    case 'f':
+-    case 'T':
+-    case 'F':
+-      return FieldTypes.BOOLEAN;
+-    }
+-
+-    int position = 1;
+-    while (position < value.length) {
+-      switch (value[position++]) {
+-      case '.':
+-        return FieldTypes.DOUBLE;
+-      }
+-    }
+-
+-    return FieldTypes.LONG;
+-  }
+-
+-  private String escape(String string) {
+-    // TODO: Also need to escape those special characters as documented in:
+-    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+-    String regex = String.valueOf(stringDelimiter);
+-    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
+-    return string.replaceAll(regex, replacement);
+-  }
+-
+-  private String unescape(String string) {
+-    // TODO: Also need to unescape those special characters as documented in:
+-    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+-    String regex = Matcher.quoteReplacement(escapedStringDelimiter);
+-    String replacement = String.valueOf(stringDelimiter);
+-    return string.replaceAll(regex, replacement);
+-  }
+-}
+\ No newline at end of file
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
+deleted file mode 100644
+index dafdeb4..0000000
+--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java
++++ /dev/null
+@@ -1,93 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.sqoop.job;
+-
+-import java.io.IOException;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.io.NullWritable;
+-import org.apache.hadoop.mapreduce.InputFormat;
+-import org.apache.hadoop.mapreduce.Job;
+-import org.apache.hadoop.mapreduce.JobContext;
+-import org.apache.hadoop.mapreduce.Mapper;
+-import org.apache.hadoop.mapreduce.OutputCommitter;
+-import org.apache.hadoop.mapreduce.OutputFormat;
+-import org.apache.sqoop.job.io.SqoopWritable;
+-import org.apache.sqoop.job.mr.SqoopSplit;
+-import org.apache.sqoop.utils.ClassUtils;
+-
+-import static org.mockito.Mockito.mock;
+-import static org.mockito.Mockito.when;
+-
+-public class JobUtils {
+-
+-  public static boolean runJob(Configuration conf,
+-    Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
+-    Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
+-    Class<? extends OutputFormat<SqoopWritable, NullWritable>> output)
+-    throws IOException, InterruptedException, ClassNotFoundException {
+-    Job job = new Job(conf);
+-    job.setInputFormatClass(input);
+-    job.setMapperClass(mapper);
+-    job.setMapOutputKeyClass(SqoopWritable.class);
+-    job.setMapOutputValueClass(NullWritable.class);
+-    job.setOutputFormatClass(output);
+-    job.setOutputKeyClass(SqoopWritable.class);
+-    job.setOutputValueClass(NullWritable.class);
+-
+-    boolean ret = job.waitForCompletion(true);
+-
+-    // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in LocalJobRuner
+-    if (isHadoop1()) {
+-      callOutputCommitter(job, output);
+-    }
+-
+-    return ret;
+-  }
+-
+-  /**
+-   * Call output format on given job manually.
+-   */
+-  private static void callOutputCommitter(Job job, Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException, InterruptedException {
+-    OutputCommitter committer = ((OutputFormat)ClassUtils.instantiate(outputFormat)).getOutputCommitter(null);
+-
+-    JobContext jobContext = mock(JobContext.class);
+-    when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
+-
+-    committer.commitJob(jobContext);
+-  }
+-
+-  /**
+-   * Detect Hadoop 1.0 installation
+-   *
+-   * @return True if and only if this is Hadoop 1 and below
+-   */
+-  public static boolean isHadoop1() {
+-    String version = org.apache.hadoop.util.VersionInfo.getVersion();
+-    if (version.matches("\\b0\\.20\\..+\\b")
+-      || version.matches("\\b1\\.\\d\\.\\d")) {
+-      return true;
+-    }
+-    return false;
+-  }
+-
+-  private JobUtils() {
+-    // Disable explicit object creation
+-  }
+-
+-}
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+index 78ae4ec..bbac7d2 100644
+--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+@@ -37,6 +37,7 @@
+ import org.apache.sqoop.common.Direction;
+ import org.apache.sqoop.connector.common.EmptyConfiguration;
+ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
++import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+ import org.apache.sqoop.job.etl.Destroyer;
+ import org.apache.sqoop.job.etl.DestroyerContext;
+ import org.apache.sqoop.job.etl.Extractor;
+@@ -46,17 +47,13 @@
+ import org.apache.sqoop.job.etl.Partition;
+ import org.apache.sqoop.job.etl.Partitioner;
+ import org.apache.sqoop.job.etl.PartitionerContext;
+-import org.apache.sqoop.job.io.Data;
+ import org.apache.sqoop.job.io.SqoopWritable;
+ import org.apache.sqoop.job.mr.MRConfigurationUtils;
+ import org.apache.sqoop.job.mr.SqoopInputFormat;
+ import org.apache.sqoop.job.mr.SqoopMapper;
+ import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
+ import org.apache.sqoop.job.mr.SqoopSplit;
+-import org.apache.sqoop.schema.Schema;
+-import org.apache.sqoop.schema.type.FixedPoint;
+-import org.apache.sqoop.schema.type.FloatingPoint;
+-import org.apache.sqoop.schema.type.Text;
++import org.apache.sqoop.job.util.MRJobTestUtil;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+@@ -67,11 +64,10 @@
+   private static final int NUMBER_OF_ROWS_PER_PARTITION = 10;
+ 
+   @Test
+-  public void testInputFormat() throws Exception {
++  public void testSqoopInputFormat() throws Exception {
+     Configuration conf = new Configuration();
+     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
+-      CSVIntermediateDataFormat.class.getName());
++    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+     Job job = new Job(conf);
+ 
+     SqoopInputFormat inputformat = new SqoopInputFormat();
+@@ -79,51 +75,47 @@ public void testInputFormat() throws Exception {
+     assertEquals(9, splits.size());
+ 
+     for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
+-      SqoopSplit split = (SqoopSplit)splits.get(id-1);
+-      DummyPartition partition = (DummyPartition)split.getPartition();
++      SqoopSplit split = (SqoopSplit) splits.get(id - 1);
++      DummyPartition partition = (DummyPartition) split.getPartition();
+       assertEquals(id, partition.getId());
+     }
+   }
+ 
+   @Test
+-  public void testMapper() throws Exception {
++  public void testSqoopMapper() throws Exception {
+     Configuration conf = new Configuration();
+     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
+-      CSVIntermediateDataFormat.class.getName());
+-    Schema schema = new Schema("Test");
+-    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+-      .addColumn(new org.apache.sqoop.schema.type.Text("3"));
+-
++    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+     Job job = new Job(conf);
+-    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+-    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+-    boolean success = JobUtils.runJob(job.getConfiguration(),
+-        SqoopInputFormat.class, SqoopMapper.class, DummyOutputFormat.class);
++    // from and to have the same schema in this test case
++    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
++    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema());
++    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
++                                      SqoopInputFormat.class,
++                                      SqoopMapper.class,
++                                      DummyOutputFormat.class);
+     Assert.assertEquals("Job failed!", true, success);
+   }
+ 
+   @Test
+-  public void testOutputFormat() throws Exception {
++  public void testNullOutputFormat() throws Exception {
+     Configuration conf = new Configuration();
+     conf.set(MRJobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
+     conf.set(MRJobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
+     conf.set(MRJobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());
+     conf.set(MRJobConstants.JOB_ETL_FROM_DESTROYER, DummyFromDestroyer.class.getName());
+     conf.set(MRJobConstants.JOB_ETL_TO_DESTROYER, DummyToDestroyer.class.getName());
+-    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT,
+-      CSVIntermediateDataFormat.class.getName());
+-    Schema schema = new Schema("Test");
+-    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
+-      .addColumn(new Text("3"));
++    conf.set(MRJobConstants.INTERMEDIATE_DATA_FORMAT, CSVIntermediateDataFormat.class.getName());
+ 
+     Job job = new Job(conf);
+-    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, schema);
+-    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, schema);
+-    boolean success = JobUtils.runJob(job.getConfiguration(),
+-        SqoopInputFormat.class, SqoopMapper.class,
+-        SqoopNullOutputFormat.class);
++    // from and to have the same schema in this test case
++    MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, MRJobTestUtil.getTestSchema());
++    MRConfigurationUtils.setConnectorSchema(Direction.TO, job, MRJobTestUtil.getTestSchema());
++    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
++                                     SqoopInputFormat.class,
++                                     SqoopMapper.class,
++                                     SqoopNullOutputFormat.class);
+     Assert.assertEquals("Job failed!", true, success);
+ 
+     // Make sure both destroyers get called.
+@@ -171,15 +163,17 @@ public String toString() {
+     }
+   }
+ 
+-  public static class DummyExtractor extends Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
++  public static class DummyExtractor extends
++      Extractor<EmptyConfiguration, EmptyConfiguration, DummyPartition> {
+     @Override
+-    public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj, DummyPartition partition) {
+-      int id = ((DummyPartition)partition).getId();
++    public void extract(ExtractorContext context, EmptyConfiguration oc, EmptyConfiguration oj,
++        DummyPartition partition) {
++      int id = ((DummyPartition) partition).getId();
+       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
+-        context.getDataWriter().writeArrayRecord(new Object[] {
+-            id * NUMBER_OF_ROWS_PER_PARTITION + row,
+-            (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
+-            String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
++        context.getDataWriter().writeArrayRecord(
++            new Object[] { id * NUMBER_OF_ROWS_PER_PARTITION + row,
++                (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
++                String.valueOf(id * NUMBER_OF_ROWS_PER_PARTITION + row) });
+       }
+     }
+ 
+@@ -189,16 +183,14 @@ public long getRowsRead() {
+     }
+   }
+ 
+-  public static class DummyOutputFormat
+-      extends OutputFormat<SqoopWritable, NullWritable> {
++  public static class DummyOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
+     @Override
+     public void checkOutputSpecs(JobContext context) {
+       // do nothing
+     }
+ 
+     @Override
+-    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(
+-        TaskAttemptContext context) {
++    public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) {
+       return new DummyRecordWriter();
+     }
+ 
+@@ -207,22 +199,17 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+       return new DummyOutputCommitter();
+     }
+ 
+-    public static class DummyRecordWriter
+-        extends RecordWriter<SqoopWritable, NullWritable> {
+-      private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+-      private Data data = new Data();
++    public static class DummyRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
++      private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
++      // should I use a dummy IDF for testing?
++      private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+ 
+       @Override
+       public void write(SqoopWritable key, NullWritable value) {
+-
+-        data.setContent(new Object[] {
+-          index,
+-          (double) index,
+-          String.valueOf(index)},
+-          Data.ARRAY_RECORD);
++        String testData = "" + index + "," +  (double) index + ",'" + String.valueOf(index) + "'";
++        dataFormat.setTextData(testData);
+         index++;
+-
+-        assertEquals(data.toString(), key.toString());
++        assertEquals(dataFormat.getTextData().toString(), key.toString());
+       }
+ 
+       @Override
+@@ -233,16 +220,20 @@ public void close(TaskAttemptContext context) {
+ 
+     public static class DummyOutputCommitter extends OutputCommitter {
+       @Override
+-      public void setupJob(JobContext jobContext) { }
++      public void setupJob(JobContext jobContext) {
++      }
+ 
+       @Override
+-      public void setupTask(TaskAttemptContext taskContext) { }
++      public void setupTask(TaskAttemptContext taskContext) {
++      }
+ 
+       @Override
+-      public void commitTask(TaskAttemptContext taskContext) { }
++      public void commitTask(TaskAttemptContext taskContext) {
++      }
+ 
+       @Override
+-      public void abortTask(TaskAttemptContext taskContext) { }
++      public void abortTask(TaskAttemptContext taskContext) {
++      }
+ 
+       @Override
+       public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+@@ -251,39 +242,34 @@ public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+     }
+   }
+ 
++  // it is writing to the target.
+   public static class DummyLoader extends Loader<EmptyConfiguration, EmptyConfiguration> {
+-    private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+-    private Data expected = new Data();
++    private int index = START_PARTITION * NUMBER_OF_ROWS_PER_PARTITION;
++    private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+ 
+     @Override
+-    public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj) throws Exception{
++    public void load(LoaderContext context, EmptyConfiguration oc, EmptyConfiguration oj)
++        throws Exception {
+       String data;
+       while ((data = context.getDataReader().readTextRecord()) != null) {
+-        expected.setContent(new Object[] {
+-          index,
+-          (double) index,
+-          String.valueOf(index)},
+-          Data.ARRAY_RECORD);
++        String testData = "" + index + "," +  (double) index + ",'" + String.valueOf(index) + "'";
++        dataFormat.setTextData(testData);
+         index++;
+-        assertEquals(expected.toString(), data);
++        assertEquals(dataFormat.getTextData().toString(), data);
+       }
+     }
+   }
+ 
+   public static class DummyFromDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
+-
+     public static int count = 0;
+-
+     @Override
+     public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
+       count++;
+     }
+   }
+ 
+-  public static class DummyToDestroyer extends Destroyer<EmptyConfiguration,EmptyConfiguration> {
+-
++  public static class DummyToDestroyer extends Destroyer<EmptyConfiguration, EmptyConfiguration> {
+     public static int count = 0;
+-
+     @Override
+     public void destroy(DestroyerContext context, EmptyConfiguration o, EmptyConfiguration o2) {
+       count++;
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+index 04fb692..a64a4a6 100644
+--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMatching.java
+@@ -38,16 +38,17 @@
+ import org.apache.sqoop.common.Direction;
+ import org.apache.sqoop.connector.common.EmptyConfiguration;
+ import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
++import org.apache.sqoop.connector.idf.IntermediateDataFormat;
+ import org.apache.sqoop.job.etl.Extractor;
+ import org.apache.sqoop.job.etl.ExtractorContext;
+ import org.apache.sqoop.job.etl.Partition;
+ import org.apache.sqoop.job.etl.Partitioner;
+ import org.apache.sqoop.job.etl.PartitionerContext;
+-import org.apache.sqoop.job.io.Data;
+ import org.apache.sqoop.job.io.SqoopWritable;
+ import org.apache.sqoop.job.mr.MRConfigurationUtils;
+ import org.apache.sqoop.job.mr.SqoopInputFormat;
+ import org.apache.sqoop.job.mr.SqoopMapper;
++import org.apache.sqoop.job.util.MRJobTestUtil;
+ import org.apache.sqoop.schema.Schema;
+ import org.apache.sqoop.schema.type.FixedPoint;
+ import org.apache.sqoop.schema.type.FloatingPoint;
+@@ -121,6 +122,7 @@ public TestMatching(Schema from,
+     return parameters;
+   }
+ 
++  @SuppressWarnings("deprecation")
+   @Test
+   public void testSchemaMatching() throws Exception {
+     Configuration conf = new Configuration();
+@@ -132,9 +134,9 @@ public void testSchemaMatching() throws Exception {
+     Job job = new Job(conf);
+     MRConfigurationUtils.setConnectorSchema(Direction.FROM, job, from);
+     MRConfigurationUtils.setConnectorSchema(Direction.TO, job, to);
+-    JobUtils.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
++    MRJobTestUtil.runJob(job.getConfiguration(), SqoopInputFormat.class, SqoopMapper.class,
+         DummyOutputFormat.class);
+-    boolean success = JobUtils.runJob(job.getConfiguration(),
++    boolean success = MRJobTestUtil.runJob(job.getConfiguration(),
+         SqoopInputFormat.class, SqoopMapper.class,
+         DummyOutputFormat.class);
+     if (from.getName().split("-")[1].equals("EMPTY")) {
+@@ -233,19 +235,14 @@ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+     public static class DummyRecordWriter
+         extends RecordWriter<SqoopWritable, NullWritable> {
+       private int index = START_PARTITION*NUMBER_OF_ROWS_PER_PARTITION;
+-      private Data data = new Data();
++      private IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+ 
+       @Override
+       public void write(SqoopWritable key, NullWritable value) {
+-
+-        data.setContent(new Object[] {
+-                index,
+-                (double) index,
+-                String.valueOf(index)},
+-            Data.ARRAY_RECORD);
++        String testData = "" + index + "," +  (double) index + ",'" + String.valueOf(index) + "'";
++        dataFormat.setTextData(testData);
+         index++;
+-
+-        assertEquals(data.toString(), key.toString());
++        assertEquals(dataFormat.getTextData().toString(), key.toString());
+       }
+ 
+       @Override
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
+deleted file mode 100644
+index 68ce5ed..0000000
+--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/SqoopWritableTest.java
++++ /dev/null
+@@ -1,95 +0,0 @@
+-/*
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing,
+- * software distributed under the License is distributed on an
+- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+- * KIND, either express or implied.  See the License for the
+- * specific language governing permissions and limitations
+- * under the License.
+- */
+-package org.apache.sqoop.job.io;
+-
+-import com.google.common.base.Charsets;
+-
+-import java.io.ByteArrayInputStream;
+-import java.io.ByteArrayOutputStream;
+-import java.io.DataInput;
+-import java.io.DataInputStream;
+-import java.io.DataOutput;
+-import java.io.DataOutputStream;
+-import java.io.IOException;
+-import java.io.InputStream;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
+-import org.apache.sqoop.job.MRJobConstants;
+-import org.junit.Assert;
+-import org.junit.Test;
+-
+-public class SqoopWritableTest {
+-
+-  private final SqoopWritable writable = new SqoopWritable();
+-
+-  @Test
+-  public void testStringInStringOut() {
+-    String testData = "Live Long and prosper";
+-    writable.setString(testData);
+-    Assert.assertEquals(testData,writable.getString());
+-  }
+-
+-  @Test
+-  public void testDataWritten() throws IOException {
+-    String testData = "One ring to rule them all";
+-    byte[] testDataBytes = testData.getBytes(Charsets.UTF_8);
+-    writable.setString(testData);
+-    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+-    DataOutput out = new DataOutputStream(ostream);
+-    writable.write(out);
+-    byte[] written = ostream.toByteArray();
+-    InputStream instream = new ByteArrayInputStream(written);
+-    DataInput in = new DataInputStream(instream);
+-    String readData = in.readUTF();
+-    Assert.assertEquals(testData, readData);
+-  }
+-
+-  @Test
+-  public void testDataRead() throws IOException {
+-    String testData = "Brandywine Bridge - 20 miles!";
+-    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+-    DataOutput out = new DataOutputStream(ostream);
+-    out.writeUTF(testData);
+-    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
+-    DataInput in = new DataInputStream(instream);
+-    writable.readFields(in);
+-    Assert.assertEquals(testData, writable.getString());
+-  }
+-
+-  @Test
+-  public void testWriteReadUsingStream() throws IOException {
+-    String testData = "You shall not pass";
+-    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
+-    DataOutput out = new DataOutputStream(ostream);
+-    writable.setString(testData);
+-    writable.write(out);
+-    byte[] written = ostream.toByteArray();
+-
+-    //Don't test what the data is, test that SqoopWritable can read it.
+-    InputStream instream = new ByteArrayInputStream(written);
+-    SqoopWritable newWritable = new SqoopWritable();
+-    DataInput in = new DataInputStream(instream);
+-    newWritable.readFields(in);
+-    Assert.assertEquals(testData, newWritable.getString());
+-    ostream.close();
+-    instream.close();
+-  }
+-
+-}
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
+deleted file mode 100644
+index 4e23bcb..0000000
+--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestData.java
++++ /dev/null
+@@ -1,117 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements.  See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership.  The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License.  You may obtain a copy of the License at
+- *
+- *     http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.sqoop.job.io;
+-
+-import java.util.Arrays;
+-
+-import org.junit.Assert;
+-import org.junit.Test;
+-
+-public class TestData {
+-
+-  private static final double TEST_NUMBER = Math.PI + 100;
+-  @Test
+-  public void testArrayToCsv() throws Exception {
+-    Data data = new Data();
+-    String expected;
+-    String actual;
+-
+-    // with special characters:
+-    expected =
+-        Long.valueOf((long)TEST_NUMBER) + "," +
+-        Double.valueOf(TEST_NUMBER) + "," +
+-        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+-        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+-    data.setContent(new Object[] {
+-        Long.valueOf((long)TEST_NUMBER),
+-        Double.valueOf(TEST_NUMBER),
+-        String.valueOf(TEST_NUMBER) + "',s",
+-        new byte[] {1, 2, 3, 4, 5} },
+-        Data.ARRAY_RECORD);
+-    actual = (String)data.getContent(Data.CSV_RECORD);
+-    assertEquals(expected, actual);
+-
+-    // with null characters:
+-    expected =
+-        Long.valueOf((long)TEST_NUMBER) + "," +
+-        Double.valueOf(TEST_NUMBER) + "," +
+-        "null" + "," +
+-        Arrays.toString(new byte[] {1, 2, 3, 4, 5});
+-    data.setContent(new Object[] {
+-        Long.valueOf((long)TEST_NUMBER),
+-        Double.valueOf(TEST_NUMBER),
+-        null,
+-        new byte[] {1, 2, 3, 4, 5} },
+-        Data.ARRAY_RECORD);
+-    actual = (String)data.getContent(Data.CSV_RECORD);
+-    assertEquals(expected, actual);
+-  }
+-
+-  @Test
+-  public void testCsvToArray() throws Exception {
+-    Data data = new Data();
+-    Object[] expected;
+-    Object[] actual;
+-
+-    // with special characters:
+-    expected = new Object[] {
+-        Long.valueOf((long)TEST_NUMBER),
+-        Double.valueOf(TEST_NUMBER),
+-        String.valueOf(TEST_NUMBER) + "',s",
+-        new byte[] {1, 2, 3, 4, 5} };
+-    data.setContent(
+-        Long.valueOf((long)TEST_NUMBER) + "," +
+-        Double.valueOf(TEST_NUMBER) + "," +
+-        "'" + String.valueOf(TEST_NUMBER) + "\\',s'" + "," +
+-        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
+-        Data.CSV_RECORD);
+-    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
+-    assertEquals(expected.length, actual.length);
+-    for (int c=0; c<expected.length; c++) {
+-      assertEquals(expected[c], actual[c]);
+-    }
+-
+-    // with null characters:
+-    expected = new Object[] {
+-        Long.valueOf((long)TEST_NUMBER),
+-        Double.valueOf(TEST_NUMBER),
+-        null,
+-        new byte[] {1, 2, 3, 4, 5} };
+-    data.setContent(
+-        Long.valueOf((long)TEST_NUMBER) + "," +
+-        Double.valueOf(TEST_NUMBER) + "," +
+-        "null" + "," +
+-        Arrays.toString(new byte[] {1, 2, 3, 4, 5}),
+-        Data.CSV_RECORD);
+-    actual = (Object[])data.getContent(Data.ARRAY_RECORD);
+-    assertEquals(expected.length, actual.length);
+-    for (int c=0; c<expected.length; c++) {
+-      assertEquals(expected[c], actual[c]);
+-    }
+-  }
+-
+-  public static void assertEquals(Object expected, Object actual) {
+-    if (expected instanceof byte[]) {
+-      Assert.assertEquals(Arrays.toString((byte[])expected),
+-          Arrays.toString((byte[])actual));
+-    } else {
+-      Assert.assertEquals(expected, actual);
+-    }
+-  }
+-
+-}
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
+new file mode 100644
+index 0000000..3207e53
+--- /dev/null
++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/io/TestSqoopWritable.java
+@@ -0,0 +1,89 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.sqoop.job.io;
++
++import java.io.ByteArrayInputStream;
++import java.io.ByteArrayOutputStream;
++import java.io.DataInput;
++import java.io.DataInputStream;
++import java.io.DataOutput;
++import java.io.DataOutputStream;
++import java.io.IOException;
++import java.io.InputStream;
++
++import org.junit.Assert;
++import org.junit.Test;
++
++public class TestSqoopWritable {
++
++  private final SqoopWritable writable = new SqoopWritable();
++
++  @Test
++  public void testStringInStringOut() {
++    String testData = "Live Long and prosper";
++    writable.setString(testData);
++    Assert.assertEquals(testData,writable.getString());
++  }
++
++  @Test
++  public void testDataWritten() throws IOException {
++    String testData = "One ring to rule them all";
++    writable.setString(testData);
++    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
++    DataOutput out = new DataOutputStream(ostream);
++    writable.write(out);
++    byte[] written = ostream.toByteArray();
++    InputStream instream = new ByteArrayInputStream(written);
++    DataInput in = new DataInputStream(instream);
++    String readData = in.readUTF();
++    Assert.assertEquals(testData, readData);
++  }
++
++  @Test
++  public void testDataRead() throws IOException {
++    String testData = "Brandywine Bridge - 20 miles!";
++    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
++    DataOutput out = new DataOutputStream(ostream);
++    out.writeUTF(testData);
++    InputStream instream = new ByteArrayInputStream(ostream.toByteArray());
++    DataInput in = new DataInputStream(instream);
++    writable.readFields(in);
++    Assert.assertEquals(testData, writable.getString());
++  }
++
++  @Test
++  public void testWriteReadUsingStream() throws IOException {
++    String testData = "You shall not pass";
++    ByteArrayOutputStream ostream = new ByteArrayOutputStream();
++    DataOutput out = new DataOutputStream(ostream);
++    writable.setString(testData);
++    writable.write(out);
++    byte[] written = ostream.toByteArray();
++
++    //Don't test what the data is, test that SqoopWritable can read it.
++    InputStream instream = new ByteArrayInputStream(written);
++    SqoopWritable newWritable = new SqoopWritable();
++    DataInput in = new DataInputStream(instream);
++    newWritable.readFields(in);
++    Assert.assertEquals(testData, newWritable.getString());
++    ostream.close();
++    instream.close();
++  }
++
++}
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+index 5bd11f0..67e965d 100644
+--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+@@ -18,6 +18,10 @@
+  */
+ package org.apache.sqoop.job.mr;
+ 
++import java.util.ConcurrentModificationException;
++import java.util.concurrent.BrokenBarrierException;
++import java.util.concurrent.TimeUnit;
++
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.NullWritable;
+ import org.apache.hadoop.mapreduce.RecordWriter;
+@@ -28,14 +32,11 @@
+ import org.apache.sqoop.job.etl.Loader;
+ import org.apache.sqoop.job.etl.LoaderContext;
+ import org.apache.sqoop.job.io.SqoopWritable;
++import org.apache.sqoop.job.util.MRJobTestUtil;
+ import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.Test;
+ 
+-import java.util.ConcurrentModificationException;
+-import java.util.concurrent.BrokenBarrierException;
+-import java.util.concurrent.TimeUnit;
+-
+ public class TestSqoopOutputFormatLoadExecutor {
+ 
+   private Configuration conf;
+@@ -130,12 +131,12 @@ public void testWhenLoaderThrows() throws Throwable {
+     SqoopOutputFormatLoadExecutor executor = new
+         SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName());
+     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
++    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+     SqoopWritable writable = new SqoopWritable();
+     try {
+       for (int count = 0; count < 100; count++) {
+-        data.setTextData(String.valueOf(count));
+-        writable.setString(data.getTextData());
++        dataFormat.setTextData(String.valueOf(count));
++        writable.setString(dataFormat.getTextData());
+         writer.write(writable, null);
+       }
+     } catch (SqoopException ex) {
+@@ -149,7 +150,7 @@ public void testSuccessfulContinuousLoader() throws Throwable {
+     SqoopOutputFormatLoadExecutor executor = new
+         SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName());
+     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
++    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+     SqoopWritable writable = new SqoopWritable();
+     for (int i = 0; i < 10; i++) {
+       StringBuilder builder = new StringBuilder();
+@@ -159,8 +160,8 @@ public void testSuccessfulContinuousLoader() throws Throwable {
+           builder.append(",");
+         }
+       }
+-      data.setTextData(builder.toString());
+-      writable.setString(data.getTextData());
++      dataFormat.setTextData(builder.toString());
++      writable.setString(dataFormat.getTextData());
+       writer.write(writable, null);
+     }
+     writer.close(null);
+@@ -171,7 +172,7 @@ public void testSuccessfulLoader() throws Throwable {
+     SqoopOutputFormatLoadExecutor executor = new
+         SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName());
+     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
++    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+     SqoopWritable writable = new SqoopWritable();
+     StringBuilder builder = new StringBuilder();
+     for (int count = 0; count < 100; count++) {
+@@ -180,8 +181,8 @@ public void testSuccessfulLoader() throws Throwable {
+         builder.append(",");
+       }
+     }
+-    data.setTextData(builder.toString());
+-    writable.setString(data.getTextData());
++    dataFormat.setTextData(builder.toString());
++    writable.setString(dataFormat.getTextData());
+     writer.write(writable, null);
+ 
+     //Allow writer to complete.
+@@ -196,7 +197,7 @@ public void testThrowingContinuousLoader() throws Throwable {
+     SqoopOutputFormatLoadExecutor executor = new
+         SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName());
+     RecordWriter<SqoopWritable, NullWritable> writer = executor.getRecordWriter();
+-    IntermediateDataFormat data = new CSVIntermediateDataFormat();
++    IntermediateDataFormat<?> dataFormat = MRJobTestUtil.getTestIDF();
+     SqoopWritable writable = new SqoopWritable();
+     try {
+       for (int i = 0; i < 10; i++) {
+@@ -207,8 +208,8 @@ public void testThrowingContinuousLoader() throws Throwable {
+             builder.append(",");
+           }
+         }
+-        data.setTextData(builder.toString());
+-        writable.setString(data.getTextData());
++        dataFormat.setTextData(builder.toString());
++        writable.setString(dataFormat.getTextData());
+         writer.write(writable, null);
+       }
+       writer.close(null);
+diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
+new file mode 100644
+index 0000000..5d5359e
+--- /dev/null
++++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/util/MRJobTestUtil.java
+@@ -0,0 +1,114 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.sqoop.job.util;
++
++import java.io.IOException;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.io.NullWritable;
++import org.apache.hadoop.mapreduce.InputFormat;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.mapreduce.JobContext;
++import org.apache.hadoop.mapreduce.Mapper;
++import org.apache.hadoop.mapreduce.OutputCommitter;
++import org.apache.hadoop.mapreduce.OutputFormat;
++import org.apache.sqoop.connector.idf.CSVIntermediateDataFormat;
++import org.apache.sqoop.connector.idf.IntermediateDataFormat;
++import org.apache.sqoop.job.io.SqoopWritable;
++import org.apache.sqoop.job.mr.SqoopSplit;
++import org.apache.sqoop.schema.Schema;
++import org.apache.sqoop.schema.type.FixedPoint;
++import org.apache.sqoop.schema.type.FloatingPoint;
++import org.apache.sqoop.schema.type.Text;
++import org.apache.sqoop.utils.ClassUtils;
++
++import static org.mockito.Mockito.mock;
++import static org.mockito.Mockito.when;
++
++public class MRJobTestUtil {
++
++  @SuppressWarnings("deprecation")
++  public static boolean runJob(Configuration conf,
++      Class<? extends InputFormat<SqoopSplit, NullWritable>> input,
++      Class<? extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable>> mapper,
++      Class<? extends OutputFormat<SqoopWritable, NullWritable>> output) throws IOException,
++      InterruptedException, ClassNotFoundException {
++    Job job = new Job(conf);
++    job.setInputFormatClass(input);
++    job.setMapperClass(mapper);
++    job.setMapOutputKeyClass(SqoopWritable.class);
++    job.setMapOutputValueClass(NullWritable.class);
++    job.setOutputFormatClass(output);
++    job.setOutputKeyClass(SqoopWritable.class);
++    job.setOutputValueClass(NullWritable.class);
++
++    boolean ret = job.waitForCompletion(true);
++
++    // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
++    // LocalJobRuner
++    if (isHadoop1()) {
++      callOutputCommitter(job, output);
++    }
++
++    return ret;
++  }
++
++  public static Schema getTestSchema() {
++    Schema schema = new Schema("Test");
++    schema.addColumn(new FixedPoint("1")).addColumn(new FloatingPoint("2"))
++        .addColumn(new Text("3"));
++    return schema;
++  }
++
++  public static IntermediateDataFormat<?> getTestIDF() {
++    return new CSVIntermediateDataFormat(getTestSchema());
++  }
++
++  /**
++   * Call output format on given job manually.
++   */
++  private static void callOutputCommitter(Job job,
++      Class<? extends OutputFormat<SqoopWritable, NullWritable>> outputFormat) throws IOException,
++      InterruptedException {
++    OutputCommitter committer = ((OutputFormat<?,?>) ClassUtils.instantiate(outputFormat))
++        .getOutputCommitter(null);
++
++    JobContext jobContext = mock(JobContext.class);
++    when(jobContext.getConfiguration()).thenReturn(job.getConfiguration());
++
++    committer.commitJob(jobContext);
++  }
++
++  /**
++   * Detect Hadoop 1.0 installation
++   *
++   * @return True if and only if this is Hadoop 1 and below
++   */
++  public static boolean isHadoop1() {
++    String version = org.apache.hadoop.util.VersionInfo.getVersion();
++    if (version.matches("\\b0\\.20\\..+\\b") || version.matches("\\b1\\.\\d\\.\\d")) {
++      return true;
++    }
++    return false;
++  }
++
++  private MRJobTestUtil() {
++    // Disable explicit object creation
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/common/src/main/java/org/apache/sqoop/schema/Schema.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/schema/Schema.java b/common/src/main/java/org/apache/sqoop/schema/Schema.java
index 40c362c..3aa3aea 100644
--- a/common/src/main/java/org/apache/sqoop/schema/Schema.java
+++ b/common/src/main/java/org/apache/sqoop/schema/Schema.java
@@ -122,12 +122,7 @@ public class Schema {
   }
 
   public boolean isEmpty() {
-    if (columns.size()==0) {
-      return true;
-    } else {
-      return false;
-    }
-
+    return columns.size() == 0;
   }
 
   public String toString() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
index e0e4061..e65edd9 100644
--- a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/CSVIntermediateDataFormat.java
@@ -67,9 +67,15 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
 
   private final List<Integer> stringFieldIndices = new ArrayList<Integer>();
   private final List<Integer> byteFieldIndices = new ArrayList<Integer>();
-
   private Schema schema;
 
+  public CSVIntermediateDataFormat() {
+  }
+
+  public CSVIntermediateDataFormat(Schema schema) {
+    setSchema(schema);
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -166,7 +172,7 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
    */
   @Override
   public Object[] getObjectData() {
-    if (schema.isEmpty()) {
+    if (schema == null || schema.isEmpty()) {
       throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0006);
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ace22237/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 72e95ed..fcf6c3c 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -41,11 +41,11 @@ public class TestCSVIntermediateDataFormat {
 
   private final String BYTE_FIELD_ENCODING = "ISO-8859-1";
 
-  private IntermediateDataFormat<?> data;
+  private IntermediateDataFormat<?> dataFormat;
 
   @Before
   public void setUp() {
-    data = new CSVIntermediateDataFormat();
+    dataFormat = new CSVIntermediateDataFormat();
   }
 
   private String getByteFieldString(byte[] byteFieldData) {
@@ -61,8 +61,8 @@ public class TestCSVIntermediateDataFormat {
   public void testStringInStringOut() {
     String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
       + ",'" + String.valueOf(0x0A) + "'";
-    data.setTextData(testData);
-    assertEquals(testData, data.getTextData());
+    dataFormat.setTextData(testData);
+    assertEquals(testData, dataFormat.getTextData());
   }
 
   @Test
@@ -74,10 +74,10 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
-    data.setTextData(null);
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData(null);
 
-    Object[] out = data.getObjectData();
+    Object[] out = dataFormat.getObjectData();
 
     assertNull(out);
   }
@@ -91,10 +91,10 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
-    data.setTextData("");
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData("");
 
-    data.getObjectData();
+    dataFormat.getObjectData();
   }
 
   @Test
@@ -111,10 +111,10 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
 
-    data.setSchema(schema);
-    data.setTextData(testData);
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData(testData);
 
-    Object[] out = data.getObjectData();
+    Object[] out = dataFormat.getObjectData();
 
     assertEquals(new Long(10),out[0]);
     assertEquals(new Long(34),out[1]);
@@ -134,7 +134,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
     Object[] in = new Object[6];
@@ -145,12 +145,12 @@ public class TestCSVIntermediateDataFormat {
     in[4] = byteFieldData;
     in[5] = new String(new char[] { 0x0A });
 
-    data.setObjectData(in);
+    dataFormat.setObjectData(in);
 
     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
     String testData = "10,34,'54','random data'," +
         getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
-    assertEquals(testData, data.getTextData());
+    assertEquals(testData, dataFormat.getTextData());
   }
 
   @Test
@@ -164,7 +164,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     Object[] in = new Object[6];
     in[0] = new Long(10);
@@ -177,9 +177,9 @@ public class TestCSVIntermediateDataFormat {
     System.arraycopy(in,0,inCopy,0,in.length);
 
     // Modifies the input array, so we use the copy to confirm
-    data.setObjectData(in);
+    dataFormat.setObjectData(in);
 
-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
   }
 
   @Test
@@ -191,7 +191,7 @@ public class TestCSVIntermediateDataFormat {
         .addColumn(new Text("4"))
         .addColumn(new Binary("5"))
         .addColumn(new Text("6"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     byte[] byteFieldData = new byte[] { (byte) 0x0D, (byte) -112, (byte) 54};
     Object[] in = new Object[6];
@@ -202,12 +202,12 @@ public class TestCSVIntermediateDataFormat {
     in[4] = byteFieldData;
     in[5] = new String(new char[] { 0x0A });
 
-    data.setObjectData(in);
+    dataFormat.setObjectData(in);
 
     //byte[0] = \r byte[1] = -112, byte[1] = 54 - 2's complements
     String testData = "10,34,NULL,'random data'," +
         getByteFieldString(byteFieldData).replaceAll("\r", "\\\\r") + ",'\\n'";
-    assertEquals(testData, data.getTextData());
+    assertEquals(testData, dataFormat.getTextData());
   }
 
   @Test
@@ -215,7 +215,7 @@ public class TestCSVIntermediateDataFormat {
     Schema schema = new Schema("test");
     schema.addColumn(new Text("1"));
 
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     char[] allCharArr = new char[256];
     for(int i = 0; i < allCharArr.length; ++i) {
@@ -228,17 +228,17 @@ public class TestCSVIntermediateDataFormat {
     System.arraycopy(in, 0, inCopy, 0, in.length);
 
     // Modifies the input array, so we use the copy to confirm
-    data.setObjectData(in);
+    dataFormat.setObjectData(in);
 
-    assertEquals(strData, data.getObjectData()[0]);
-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+    assertEquals(strData, dataFormat.getObjectData()[0]);
+    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
   }
 
   @Test
   public void testByteArrayFullRangeOfCharacters() {
     Schema schema = new Schema("test");
     schema.addColumn(new Binary("1"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     byte[] allCharByteArr = new byte[256];
     for (int i = 0; i < allCharByteArr.length; ++i) {
@@ -250,32 +250,32 @@ public class TestCSVIntermediateDataFormat {
     System.arraycopy(in, 0, inCopy, 0, in.length);
 
     // Modifies the input array, so we use the copy to confirm
-    data.setObjectData(in);
-    assertTrue(Arrays.deepEquals(inCopy, data.getObjectData()));
+    dataFormat.setObjectData(in);
+    assertTrue(Arrays.deepEquals(inCopy, dataFormat.getObjectData()));
   }
 
   @Test
   public void testDate() {
     Schema schema = new Schema("test");
     schema.addColumn(new Date("1"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
-    data.setTextData("2014-10-01");
-    assertEquals("2014-10-01", data.getObjectData()[0].toString());
+    dataFormat.setTextData("2014-10-01");
+    assertEquals("2014-10-01", dataFormat.getObjectData()[0].toString());
   }
 
   @Test
   public void testDateTime() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     for (String dateTime : new String[]{
         "2014-10-01T12:00:00",
         "2014-10-01T12:00:00.000"
     }) {
-      data.setTextData(dateTime);
-      assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString());
+      dataFormat.setTextData(dateTime);
+      assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
     }
   }
 
@@ -289,14 +289,14 @@ public class TestCSVIntermediateDataFormat {
   public void testDateTimeISO8601Alternative() {
     Schema schema = new Schema("test");
     schema.addColumn(new DateTime("1"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     for (String dateTime : new String[]{
         "2014-10-01 12:00:00",
         "2014-10-01 12:00:00.000"
     }) {
-      data.setTextData(dateTime);
-      assertEquals("2014-10-01T12:00:00.000", data.getObjectData()[0].toString());
+      dataFormat.setTextData(dateTime);
+      assertEquals("2014-10-01T12:00:00.000", dataFormat.getObjectData()[0].toString());
     }
   }
 
@@ -304,20 +304,20 @@ public class TestCSVIntermediateDataFormat {
   public void testBit() {
     Schema schema = new Schema("test");
     schema.addColumn(new Bit("1"));
-    data.setSchema(schema);
+    dataFormat.setSchema(schema);
 
     for (String trueBit : new String[]{
         "true", "TRUE", "1"
     }) {
-      data.setTextData(trueBit);
-      assertTrue((Boolean) data.getObjectData()[0]);
+      dataFormat.setTextData(trueBit);
+      assertTrue((Boolean) dataFormat.getObjectData()[0]);
     }
 
     for (String falseBit : new String[]{
         "false", "FALSE", "0"
     }) {
-      data.setTextData(falseBit);
-      assertFalse((Boolean) data.getObjectData()[0]);
+      dataFormat.setTextData(falseBit);
+      assertFalse((Boolean) dataFormat.getObjectData()[0]);
     }
   }
 
@@ -326,9 +326,23 @@ public class TestCSVIntermediateDataFormat {
     String testData = "10,34,'54','random data'," + getByteFieldString(new byte[] { (byte) -112, (byte) 54})
         + ",'\\n'";
     Schema schema = new Schema("Test");
-    data.setSchema(schema);
-    data.setTextData(testData);
+    dataFormat.setSchema(schema);
+    dataFormat.setTextData(testData);
 
-    Object[] out = data.getObjectData();
+    @SuppressWarnings("unused")
+    Object[] out = dataFormat.getObjectData();
+  }
+
+  @Test(expected = SqoopException.class)
+  public void testNullSchema() {
+    dataFormat.setSchema(null);
+    @SuppressWarnings("unused")
+    Object[] out = dataFormat.getObjectData();
+  }
+
+  @Test(expected = SqoopException.class)
+  public void testNotSettingSchema() {
+    @SuppressWarnings("unused")
+    Object[] out = dataFormat.getObjectData();
   }
 }


Mime
View raw message