sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-1647: Sqoop2: Read data from HDFS in KiteConnector
Date Thu, 08 Jan 2015 05:38:35 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 2b59860da -> f073cf693


SQOOP-1647: Sqoop2: Read data from HDFS in KiteConnector

(Qian Xu via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f073cf69
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f073cf69
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f073cf69

Branch: refs/heads/sqoop2
Commit: f073cf69388f8e07e452a8d9d072e8ada42152f7
Parents: 2b59860
Author: Abraham Elmahrek <abe@apache.org>
Authored: Wed Jan 7 21:11:33 2015 -0800
Committer: Abraham Elmahrek <abe@apache.org>
Committed: Wed Jan 7 21:11:33 2015 -0800

----------------------------------------------------------------------
 .../sqoop/connector/kite/KiteConnector.java     | 21 ++---
 .../connector/kite/KiteConnectorError.java      |  8 +-
 .../connector/kite/KiteDatasetExecutor.java     | 52 ++++++----
 .../connector/kite/KiteDatasetPartition.java    | 61 ++++++++++++
 .../connector/kite/KiteDatasetPartitioner.java  | 51 ++++++++++
 .../sqoop/connector/kite/KiteExtractor.java     | 75 +++++++++++++++
 .../sqoop/connector/kite/KiteFromDestroyer.java | 36 +++++++
 .../connector/kite/KiteFromInitializer.java     | 68 ++++++++++++++
 .../apache/sqoop/connector/kite/KiteLoader.java | 10 +-
 .../sqoop/connector/kite/KiteToDestroyer.java   | 11 ++-
 .../sqoop/connector/kite/KiteToInitializer.java |  3 +-
 .../kite/configuration/FromJobConfig.java       | 31 ++++++
 .../configuration/FromJobConfiguration.java     | 32 +++++++
 .../resources/kite-connector-config.properties  | 13 ++-
 .../sqoop/connector/kite/TestKiteExecutor.java  | 91 ++++++++++++++----
 .../sqoop/connector/kite/TestKiteExtractor.java | 97 +++++++++++++++++++
 .../connector/kite/TestKiteFromInitializer.java | 70 ++++++++++++++
 .../connector/kite/TestKiteToDestroyer.java     |  8 +-
 .../connector/kite/TestKiteToInitializer.java   | 12 +--
 .../connector/common/AvroDataTypeUtil.java      | 99 ++++++++++++++++++++
 20 files changed, 783 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java
index 982d6dd..5f58a90 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnector.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
 
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.VersionInfo;
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
@@ -27,8 +28,6 @@ import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.From;
 import org.apache.sqoop.job.etl.To;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
@@ -44,6 +43,12 @@ public class KiteConnector extends SqoopConnector {
           KiteLoader.class,
           KiteToDestroyer.class);
 
+  private static final From FROM = new From(
+      KiteFromInitializer.class,
+      KiteDatasetPartitioner.class,
+      KiteExtractor.class,
+      KiteFromDestroyer.class);
+
   @Override
   public String getVersion() {
     return VersionInfo.getBuildVersion();
@@ -64,8 +69,7 @@ public class KiteConnector extends SqoopConnector {
   public Class getJobConfigurationClass(Direction jobType) {
     switch (jobType) {
       case FROM:
-        // TODO: SQOOP-1647
-        return null;
+        return FromJobConfiguration.class;
       case TO:
         return ToJobConfiguration.class;
       default:
@@ -74,15 +78,8 @@ public class KiteConnector extends SqoopConnector {
   }
 
   @Override
-  public List<Direction> getSupportedDirections() {
-    // TODO: No need to override, when SQOOP-1647 is done
-    return Arrays.asList(Direction.TO);
-  }
-
-  @Override
   public From getFrom() {
-    // TODO: SQOOP-1647
-    return null;
+    return FROM;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java
index d67c8de..5775fcf 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorError.java
@@ -24,9 +24,15 @@ public enum KiteConnectorError implements ErrorCode {
   /** Unsupported dataset URI scheme */
   GENERIC_KITE_CONNECTOR_0000("Unsupported dataset URI scheme"),
 
-  /** Destination is not empty */
+  /** Target dataset is not empty */
   GENERIC_KITE_CONNECTOR_0001("Dataset is not empty"),
 
+  /** Dataset does not exist */
+  GENERIC_KITE_CONNECTOR_0002("Dataset does not exist"),
+
+  /** Error occurred while creating partitions */
+  GENERIC_KITE_CONNECTOR_0003("Error occurred while creating partitions"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java
index 9432e4b..e4514b5 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetExecutor.java
@@ -23,9 +23,11 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.common.AvroDataTypeUtil;
 import org.apache.sqoop.connector.kite.util.KiteDataTypeUtil;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetReader;
 import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.Format;
@@ -47,10 +49,12 @@ public class KiteDatasetExecutor {
 
   private DatasetWriter<GenericRecord> writer;
 
+  private DatasetReader<GenericRecord> reader;
+
   /**
    * Creates a new dataset.
    */
-  public KiteDatasetExecutor(String uri, org.apache.sqoop.schema.Schema schema,
+  public static Dataset<GenericRecord> createDataset(String uri, org.apache.sqoop.schema.Schema schema,
       FileFormat format) {
     Schema datasetSchema = KiteDataTypeUtil.createAvroSchema(schema);
     Format datasetFormat = KiteDataTypeUtil.toFormat(format);
@@ -59,11 +63,10 @@ public class KiteDatasetExecutor {
         .schema(datasetSchema)
         .format(datasetFormat)
         .build();
-    dataset = Datasets.create(uri, descriptor);
+    return Datasets.create(uri, descriptor);
   }
 
-  @VisibleForTesting
-  protected KiteDatasetExecutor(Dataset<GenericRecord> dataset) {
+  public KiteDatasetExecutor(Dataset<GenericRecord> dataset) {
     this.dataset = dataset;
   }
 
@@ -87,7 +90,7 @@ public class KiteDatasetExecutor {
   }
 
   @VisibleForTesting
-  protected boolean isWriterClosed() {
+  boolean isWriterClosed() {
     return writer == null || !writer.isOpen();
   }
 
@@ -101,25 +104,38 @@ public class KiteDatasetExecutor {
     }
   }
 
-  /**
-   * Checks the existence by a specified dataset URI.
-   */
-  public static boolean datasetExists(String uri) {
-    return Datasets.exists(uri);
+  public Object[] readRecord() {
+    if (getOrNewReader().hasNext()) {
+      GenericRecord record = getOrNewReader().next();
+      return AvroDataTypeUtil.extractGenericRecord(record);
+    }
+    return null;
   }
 
-  /**
-   * Deletes current dataset physically.
-   */
-  public void deleteDataset() {
-    deleteDataset(dataset.getUri().toString());
+  private DatasetReader<GenericRecord> getOrNewReader() {
+    if (reader == null) {
+      reader = dataset.newReader();
+    }
+    return reader;
+  }
+
+  @VisibleForTesting
+  boolean isReaderClosed() {
+    return reader == null || !reader.isOpen();
+  }
+
+  public void closeReader() {
+    if (reader != null) {
+      Closeables.closeQuietly(reader);
+      reader = null;
+    }
   }
 
   /**
-   * Deletes particular dataset physically.
+   * Deletes current dataset physically.
    */
-  public static boolean deleteDataset(String uri) {
-    return Datasets.delete(uri);
+  public void deleteDataset() {
+    Datasets.delete(dataset.getUri().toString());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java
new file mode 100644
index 0000000..9399a91
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartition.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kite;
+
+import org.apache.sqoop.job.etl.Partition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A part of the input data partitioned by the Partitioner.
+ */
+public class KiteDatasetPartition extends Partition {
+
+  /** The uri to the dataset */
+  private String uri;
+
+  public KiteDatasetPartition() {
+  }
+
+  public String getUri() {
+    return uri;
+  }
+
+  public void setUri(String uri) {
+    this.uri = uri;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    uri = in.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(uri);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("{uri=%s}", uri);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java
new file mode 100644
index 0000000..41fe7e1
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteDatasetPartitioner.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kite;
+
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This allows connector to define how input data from the FROM source can be
+ * partitioned. The number of data partitions also determines the degree of
+ * parallelism.
+ */
+public class KiteDatasetPartitioner extends Partitioner<LinkConfiguration,
+    FromJobConfiguration> {
+
+  @Override
+  public List<Partition> getPartitions(PartitionerContext context,
+      LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfig) {
+    // There is no way to create partitions of an un-partitioned dataset.
+    // TODO: SQOOP-1942 will create partitions, if dataset is partitioned.
+    KiteDatasetPartition partition = new KiteDatasetPartition();
+    partition.setUri(fromJobConfig.fromJobConfig.uri);
+
+    List<Partition> partitions = new LinkedList<Partition>();
+    partitions.add(partition);
+    return partitions;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
new file mode 100644
index 0000000..d4a8a77
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteExtractor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.Datasets;
+
+/**
+ * This allows connector to extract data from a source system based on each
+ * partition.
+ */
+public class KiteExtractor extends Extractor<LinkConfiguration,
+    FromJobConfiguration, KiteDatasetPartition> {
+
+  private static final Logger LOG = Logger.getLogger(KiteExtractor.class);
+
+  private long rowsRead = 0L;
+
+  @VisibleForTesting
+  KiteDatasetExecutor getExecutor(String uri) {
+    Dataset<GenericRecord> dataset = Datasets.load(uri);
+    return new KiteDatasetExecutor(dataset);
+  }
+
+  @Override
+  public void extract(ExtractorContext context, LinkConfiguration linkConfig,
+      FromJobConfiguration fromJobConfig, KiteDatasetPartition partition) {
+    String uri = partition.getUri();
+    LOG.info("Loading data from " + uri);
+
+    KiteDatasetExecutor executor = getExecutor(uri);
+    DataWriter writer = context.getDataWriter();
+    Object[] array;
+    rowsRead = 0L;
+
+    try {
+      while ((array = executor.readRecord()) != null) {
+        // TODO: SQOOP-1616 will cover more column data types. Use schema and do data type conversion (e.g. datatime).
+        writer.writeArrayRecord(array);
+        rowsRead++;
+      }
+    } finally {
+      executor.closeReader();
+    }
+  }
+
+  @Override
+  public long getRowsRead() {
+    return rowsRead;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java
new file mode 100644
index 0000000..8d0a495
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromDestroyer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite;
+
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
+
+/**
+ * This classes allows connector to define work to complete execution.
+ */
+public class KiteFromDestroyer extends Destroyer<LinkConfiguration,
+    FromJobConfiguration> {
+
+  @Override
+  public void destroy(DestroyerContext context,
+      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
new file mode 100644
index 0000000..2f82eaa
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteFromInitializer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.connector.common.AvroDataTypeUtil;
+import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.utils.ClassUtils;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.Datasets;
+
+import java.util.Set;
+
+/**
+ * This class allows connector to define initialization work for execution.
+ */
+public class KiteFromInitializer extends Initializer<LinkConfiguration,
+    FromJobConfiguration> {
+
+  private static final Logger LOG = Logger.getLogger(KiteFromInitializer.class);
+
+  @Override
+  public void initialize(InitializerContext context,
+      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
+    if (!Datasets.exists(fromJobConfig.fromJobConfig.uri)) {
+      LOG.error("Dataset does not exist");
+      throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0002);
+    }
+  }
+  @Override
+  public Set<String> getJars(InitializerContext context,
+      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
+    Set<String> jars = super.getJars(context, linkConfig, fromJobConfig);
+    jars.add(ClassUtils.jarForClass("org.kitesdk.data.Datasets"));
+    jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
+    jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
+    return jars;
+  }
+
+  @Override
+  public Schema getSchema(InitializerContext context,
+      LinkConfiguration linkConfig, FromJobConfiguration fromJobConfig) {
+    Dataset dataset = Datasets.load(fromJobConfig.fromJobConfig.uri);
+    org.apache.avro.Schema avroSchema = dataset.getDescriptor().getSchema();
+    return AvroDataTypeUtil.createSqoopSchema(avroSchema);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
index b115242..1710969 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.connector.kite;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
@@ -27,6 +28,7 @@ import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.schema.Schema;
+import org.kitesdk.data.Dataset;
 
 /**
  * This class allows Kite connector to load data into a target system.
@@ -36,8 +38,9 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
   private static final Logger LOG = Logger.getLogger(KiteLoader.class);
 
   private long rowsWritten = 0;
+
   @VisibleForTesting
-  protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
+  KiteDatasetExecutor getExecutor(String uri, Schema schema,
       FileFormat format) {
     // Note that instead of creating a dataset at destination, we create a
     // temporary dataset by every KiteLoader instance. They will be merged when
@@ -45,8 +48,9 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
     // not able to pass the temporary dataset uri to KiteToDestroyer. So we
     // delegate KiteDatasetExecutor to manage name convention for datasets.
     uri = KiteDatasetExecutor.suggestTemporaryDatasetUri(uri);
-
-    return new KiteDatasetExecutor(uri, schema, format);
+    Dataset<GenericRecord> dataset =
+        KiteDatasetExecutor.createDataset(uri, schema, format);
+    return new KiteDatasetExecutor(dataset);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
index 3b36f1d..704c8e9 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
@@ -18,6 +18,7 @@
 package org.apache.sqoop.connector.kite;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
@@ -26,6 +27,8 @@ import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.schema.Schema;
+import org.kitesdk.data.Dataset;
+import org.kitesdk.data.Datasets;
 
 /**
  * This classes allows connector to define work to complete execution.
@@ -54,7 +57,7 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
       }
     } else {
       for (String tempUri : tempUris) {
-        KiteDatasetExecutor.deleteDataset(tempUri);
+        Datasets.delete(tempUri);
         LOG.warn(String.format("Failed to import. " +
             "Temporary dataset %s has been deleted", tempUri));
       }
@@ -62,9 +65,11 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
   }
 
   @VisibleForTesting
-  protected KiteDatasetExecutor getExecutor(String uri, Schema schema,
+  KiteDatasetExecutor getExecutor(String uri, Schema schema,
       FileFormat format) {
-    return new KiteDatasetExecutor(uri, schema, format);
+    Dataset<GenericRecord> dataset =
+        KiteDatasetExecutor.createDataset(uri, schema, format);
+    return new KiteDatasetExecutor(dataset);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index ad5898f..ef94d48 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -28,6 +28,7 @@ import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.schema.NullSchema;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.utils.ClassUtils;
+import org.kitesdk.data.Datasets;
 
 import java.util.Set;
 
@@ -46,7 +47,7 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
       LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
     String uri = ConfigUtil.buildDatasetUri(
         linkConfig.linkConfig, toJobConfig.toJobConfig);
-    if (KiteDatasetExecutor.datasetExists(uri)) {
+    if (Datasets.exists(uri)) {
       LOG.error("Overwrite an existing dataset is not expected in new create mode.");
       throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java
new file mode 100644
index 0000000..68a1d7a
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfig.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.validators.DatasetURIValidator;
+
+@ConfigClass
+public class FromJobConfig {
+
+  @Input(size = 255, validators = {@Validator(DatasetURIValidator.class)})
+  public String uri;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java
new file mode 100644
index 0000000..e4e297f
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/FromJobConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite.configuration;
+
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigurationClass;
+
+@ConfigurationClass
+public class FromJobConfiguration {
+
+  @Config public FromJobConfig fromJobConfig;
+
+  public FromJobConfiguration() {
+    fromJobConfig = new FromJobConfig();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/main/resources/kite-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties
index 65541c5..23d0e28 100644
--- a/connector/connector-kite/src/main/resources/kite-connector-config.properties
+++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties
@@ -37,4 +37,15 @@ toJobConfig.uri.help = Location to store dataset (i.e. \
   "dataset:hive://<namespace>/<dataset>")
 
 toJobConfig.fileFormat.label = File format
-toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed.
\ No newline at end of file
+toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed.
+
+# From Job Config
+#
+fromJobConfig.label = From Kite Dataset Configuration
+fromJobConfig.help = You must supply the information requested in order to \
+  get information where you want to store your data.
+
+fromJobConfig.uri.label = Dataset URI
+fromJobConfig.uri.help = Location to load dataset (i.e. \
+  "dataset:hdfs://<host>[:port]/<path>/<namespace>/<dataset>", \
+  "dataset:hive://<namespace>/<dataset>")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java
index 5e4edc5..43736cc 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExecutor.java
@@ -19,39 +19,47 @@ package org.apache.sqoop.connector.kite;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
+import org.kitesdk.data.DatasetReader;
 import org.kitesdk.data.DatasetWriter;
 
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertNull;
 import static junit.framework.TestCase.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.Mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 public class TestKiteExecutor {
 
-  @Mock
+  @org.mockito.Mock
   private Dataset<GenericRecord> datasetMock;
 
-  @Mock
+  @org.mockito.Mock
   private DatasetDescriptor descriptorMock;
 
-  @Mock
+  @org.mockito.Mock
   private DatasetWriter<GenericRecord> writerMock;
 
+  @org.mockito.Mock
+  private DatasetReader<GenericRecord> readerMock;
+
   private KiteDatasetExecutor executor;
 
   @Before
   public void setUp() {
     initMocks(this);
     when(datasetMock.newWriter()).thenReturn(writerMock);
+    when(datasetMock.newReader()).thenReturn(readerMock);
     when(datasetMock.getDescriptor()).thenReturn(descriptorMock);
     when(descriptorMock.getSchema()).thenReturn(
         new Schema.Parser().parse("{\"name\":\"test\",\"type\":\"record\"," +
@@ -63,25 +71,16 @@ public class TestKiteExecutor {
   @After
   public void tearDown() {
     executor.closeWriter();
+    executor.closeReader();
     assertTrue(executor.isWriterClosed());
+    assertTrue(executor.isReaderClosed());
   }
 
   @Test
   public void testWriteRecord() {
-    // setup
+    // setup & exercise
     final int NUMBER_OF_ROWS = 10;
-    when(descriptorMock.getSchema()).thenReturn(
-        new Schema.Parser().parse("{" +
-            "\"name\":\"test\",\"type\":\"record\"," +
-            "\"fields\":[" +
-            "{\"name\":\"f1\",\"type\":\"int\"}," +
-            "{\"name\":\"f2\",\"type\":\"string\"}" +
-            "]}"));
-
-    // exercise
-    for (int i = 0; i < NUMBER_OF_ROWS; i++) {
-      executor.writeRecord(new Object[]{42, "foo"});
-    }
+    createDatasetWithRecords(NUMBER_OF_ROWS);
 
     // verify
     verify(writerMock, times(NUMBER_OF_ROWS)).write(any(GenericRecord.class));
@@ -103,4 +102,62 @@ public class TestKiteExecutor {
     assertTrue(executor.isWriterClosed());
   }
 
+  @Test
+  public void testReaderRecord() {
+    // setup
+    final int NUMBER_OF_ROWS = 10;
+    createDatasetWithRecords(NUMBER_OF_ROWS);
+    when(readerMock.next()).thenReturn(
+        new GenericRecordBuilder(createTwoFieldSchema())
+            .set("f1", 1)
+            .set("f2", "foo")
+            .build());
+    when(readerMock.hasNext()).thenReturn(true);
+
+    // exercise & verify
+    for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+      Object[] actual = executor.readRecord();
+      assertNotNull(actual);
+      assertEquals(2, actual.length);
+      assertEquals(1, actual[0]);
+      assertEquals("foo", actual[1]);
+    }
+    when(readerMock.hasNext()).thenReturn(false);
+    Object[] actual = executor.readRecord();
+    assertNull(actual);
+  }
+
+  @Test
+  public void testCloseReader() {
+    // setup
+    when(readerMock.isOpen()).thenReturn(true);
+    executor.readRecord();
+    assertTrue(!executor.isReaderClosed());
+
+    // exercise
+    executor.closeReader();
+
+    // verify
+    verify(readerMock, times(1)).close();
+    assertTrue(executor.isReaderClosed());
+  }
+
+  private static Schema createTwoFieldSchema() {
+    return new Schema.Parser().parse("{" +
+        "\"name\":\"test\",\"type\":\"record\"," +
+        "\"fields\":[" +
+        "{\"name\":\"f1\",\"type\":\"int\"}," +
+        "{\"name\":\"f2\",\"type\":\"string\"}" +
+        "]}");
+  }
+
+  private void createDatasetWithRecords(int numberOfRecords) {
+    when(descriptorMock.getSchema()).thenReturn(createTwoFieldSchema());
+
+    // exercise
+    for (int i = 0; i < numberOfRecords; i++) {
+      executor.writeRecord(new Object[]{i, "foo" + i});
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
new file mode 100644
index 0000000..0e2c865
--- /dev/null
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kite;
+
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
+import org.apache.sqoop.etl.io.DataWriter;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Text;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class TestKiteExtractor {
+
+  private KiteExtractor extractor;
+
+  @org.mockito.Mock
+  private KiteDatasetExecutor executorMock;
+
+  @org.mockito.Mock
+  private DataWriter writerMock = new DataWriter() {
+    @Override
+    public void writeArrayRecord(Object[] array) {
+    }
+
+    @Override
+    public void writeStringRecord(String text) {
+    }
+
+    @Override
+    public void writeRecord(Object obj) {
+    }
+  };
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+
+    extractor = new KiteExtractor() {
+      @Override
+      protected KiteDatasetExecutor getExecutor(String uri) {
+        return executorMock;
+      }
+    };
+  }
+
+  @Test
+  public void testExtractor() throws Exception {
+    // setup
+    Schema schema = new Schema("testExtractor");
+    schema.addColumn(new Text("TextCol"));
+    ExtractorContext context = new ExtractorContext(null, writerMock, schema);
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+    KiteDatasetPartition partition = new KiteDatasetPartition();
+    partition.setUri("dataset:hdfs:/path/to/dataset");
+    OngoingStubbing<Object[]> readRecordMethodStub = when(executorMock.readRecord());
+    final int NUMBER_OF_ROWS = 1000;
+    for (int i = 0; i < NUMBER_OF_ROWS; i++) {
+      // TODO: SQOOP-1616 will cover more column data types
+      readRecordMethodStub = readRecordMethodStub.thenReturn(new Object[]{});
+    }
+    readRecordMethodStub.thenReturn(null);
+
+    // exercise
+    extractor.extract(context, linkConfig, jobConfig, partition);
+
+    // verify
+    verify(writerMock, times(NUMBER_OF_ROWS)).writeArrayRecord(
+        any(Object[].class));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
new file mode 100644
index 0000000..557a3c2
--- /dev/null
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteFromInitializer.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.connector.kite;
+
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kite.configuration.FromJobConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.kitesdk.data.Datasets;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Datasets.class)
+public class TestKiteFromInitializer {
+
+  private KiteFromInitializer initializer;
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+    mockStatic(Datasets.class);
+
+    initializer = new KiteFromInitializer();
+  }
+
+  @Test
+  public void testInitializePassed() {
+    // setup
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+    jobConfig.fromJobConfig.uri = "dataset:file:/ds/exist";
+    when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(true);
+
+    // exercise
+    initializer.initialize(null, null, jobConfig);
+  }
+
+  @Test(expected=SqoopException.class)
+  public void testInitializeFailed() {
+    // setup
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+    jobConfig.fromJobConfig.uri = "dataset:file:/ds/not/exist";
+    when(Datasets.exists(jobConfig.fromJobConfig.uri)).thenReturn(false);
+
+    // exercise
+    initializer.initialize(null, null, jobConfig);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
index 87ed906..92424c4 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
@@ -26,6 +26,7 @@ import org.apache.sqoop.schema.Schema;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.kitesdk.data.Datasets;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -37,7 +38,7 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(KiteDatasetExecutor.class)
+@PrepareForTest({KiteDatasetExecutor.class, Datasets.class})
 public class TestKiteToDestroyer {
 
   private KiteToDestroyer destroyer;
@@ -55,6 +56,7 @@ public class TestKiteToDestroyer {
   public void setUp() {
     initMocks(this);
     mockStatic(KiteDatasetExecutor.class);
+    mockStatic(Datasets.class);
 
     destroyer = new KiteToDestroyer() {
       @Override
@@ -93,7 +95,7 @@ public class TestKiteToDestroyer {
     when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
     for (String uri : expectedUris) {
-      when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true);
+      when(Datasets.delete(uri)).thenReturn(true);
     }
 
     // exercise
@@ -102,7 +104,7 @@ public class TestKiteToDestroyer {
     // verify
     for (String uri : expectedUris) {
       verifyStatic(times(1));
-      KiteDatasetExecutor.deleteDataset(uri);
+      Datasets.delete(uri);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
index fab31f9..50c3064 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
@@ -25,6 +25,7 @@ import org.apache.sqoop.schema.Schema;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.kitesdk.data.Datasets;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -35,18 +36,15 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(KiteDatasetExecutor.class)
+@PrepareForTest(Datasets.class)
 public class TestKiteToInitializer {
 
   private KiteToInitializer initializer;
 
-  @org.mockito.Mock
-  private KiteDatasetExecutor executorMock;
-
   @Before
   public void setUp() {
     initMocks(this);
-    mockStatic(KiteDatasetExecutor.class);
+    mockStatic(Datasets.class);
 
     initializer = new KiteToInitializer();
   }
@@ -57,7 +55,7 @@ public class TestKiteToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration toJobConfig = new ToJobConfiguration();
     toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
-    when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
+    when(Datasets.exists(toJobConfig.toJobConfig.uri))
         .thenReturn(false);
 
     // exercise
@@ -70,7 +68,7 @@ public class TestKiteToInitializer {
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration toJobConfig = new ToJobConfiguration();
     toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
-    when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
+    when(Datasets.exists(toJobConfig.toJobConfig.uri))
         .thenReturn(true);
 
     // exercise

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f073cf69/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java
new file mode 100644
index 0000000..a71385b
--- /dev/null
+++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/AvroDataTypeUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.common;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.schema.type.Unknown;
+
+import java.util.List;
+
+/**
+ * The helper class provides methods to convert Sqoop data types to Avro
+ * supported data types.
+ */
+public class AvroDataTypeUtil {
+
+  public static org.apache.sqoop.schema.Schema createSqoopSchema(
+      Schema avroSchema) {
+    org.apache.sqoop.schema.Schema schema =
+        new org.apache.sqoop.schema.Schema(avroSchema.getName());
+    schema.setNote(avroSchema.getDoc());
+    for (Schema.Field field : avroSchema.getFields()) {
+      Column column = avroTypeToSchemaType(field);
+      schema.addColumn(column);
+    }
+    return schema;
+  }
+
+  private static Column avroTypeToSchemaType(Schema.Field field) {
+    Schema.Type schemaType = field.schema().getType();
+    if (schemaType == Schema.Type.UNION) {
+      List<Schema> unionSchema = field.schema().getTypes();
+      if (unionSchema.size() == 2) {
+        Schema.Type first = unionSchema.get(0).getType();
+        Schema.Type second = unionSchema.get(1).getType();
+        if ((first == Schema.Type.NULL && second != Schema.Type.NULL) ||
+            (first != Schema.Type.NULL && second == Schema.Type.NULL)) {
+          return avroPrimitiveTypeToSchemaType(field.name(),
+              first != Schema.Type.NULL ? first : second);
+        }
+      }
+      // This is an unsupported complex data type
+      return new Unknown(field.name());
+    }
+
+    return avroPrimitiveTypeToSchemaType(field.name(), schemaType);
+  }
+
+  private static Column avroPrimitiveTypeToSchemaType(String name,
+      Schema.Type type) {
+    assert type != Schema.Type.UNION;
+    switch (type) {
+      case INT:
+      case LONG:
+        return new FixedPoint(name);
+      case STRING:
+        return new Text(name);
+      case DOUBLE:
+        return new FloatingPoint(name);
+      case BOOLEAN:
+        return new Bit(name);
+      case BYTES:
+        return new Binary(name);
+      default:
+        return new Unknown(name);
+    }
+  }
+
+  public static Object[] extractGenericRecord(GenericRecord data) {
+    List<Schema.Field> fields = data.getSchema().getFields();
+    Object[] record = new Object[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      record[i] = data.get(i);
+    }
+    return record;
+  }
+
+}
\ No newline at end of file


Mime
View raw message