drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [02/38] git commit: DRILL-357: Hive Storage Engine phase 2 - hive record reader
Date Tue, 04 Mar 2014 08:07:29 GMT
DRILL-357: Hive Storage Engine phase 2 - hive record reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cdf46fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cdf46fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cdf46fd3

Branch: refs/heads/master
Commit: cdf46fd36fdfc2e3029a6b2e077330c665e43c2e
Parents: a9a7ea8
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Thu Jan 23 18:32:03 2014 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon Mar 3 23:21:50 2014 -0800

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |   2 +-
 distribution/src/assemble/bin.xml               |   1 +
 exec/java-exec/pom.xml                          |  11 +
 .../templates/VariableLengthVectors.java        |  38 +-
 .../exec/compile/TemplateClassDefinition.java   |   2 +-
 .../drill/exec/planner/fragment/Wrapper.java    |   3 +
 .../drill/exec/store/hive/HiveInputReader.java  | 173 +++++++
 .../drill/exec/store/hive/HiveReadEntry.java    |  47 +-
 .../drill/exec/store/hive/HiveReadEntryOld.java |  50 ++
 .../drill/exec/store/hive/HiveRecordReader.java | 473 +++++++++++++++++++
 .../apache/drill/exec/store/hive/HiveScan.java  | 247 ++++++++++
 .../exec/store/hive/HiveScanBatchCreator.java   |  71 +++
 .../exec/store/hive/HiveSchemaProvider.java     |  39 --
 .../exec/store/hive/HiveStorageEngine.java      | 194 ++++++++
 .../store/hive/HiveStorageEngineConfig.java     |   6 +-
 .../drill/exec/store/hive/HiveSubScan.java      | 124 +++++
 .../apache/drill/exec/store/hive/HiveTable.java | 325 +++++++++++++
 .../exec/store/hive/HiveTextRecordReader.java   | 172 +++++++
 .../store/parquet/ParquetSchemaProvider.java    |   1 -
 .../apache/drill/exec/util/BatchPrinter.java    |  93 ++++
 .../src/main/resources/drill-module.conf        |   3 +-
 .../java/org/apache/drill/exec/TestPlan.java    |  48 ++
 .../drill/exec/store/hive/TestHiveScan.java     |  67 +++
 .../apache/drill/exec/util/BatchPrinter.java    |  93 ----
 .../java-exec/src/test/resources/hive/test.json |  75 +++
 pom.xml                                         |  51 ++
 .../org/apache/drill/jdbc/DrillHandler.java     |   2 +-
 .../drill/sql/client/full/BatchLoaderMap.java   |   2 +-
 .../sql/client/full/HiveDatabaseSchema.java     | 115 +++--
 .../drill/sql/client/full/HiveSchema.java       |   5 +-
 .../apache/drill/jdbc/test/FullEngineTest.java  |   1 +
 31 files changed, 2349 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index d81ef79..a0e261f 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -64,7 +64,7 @@ public final class DrillConfig extends NestedConfig{
     mapper.registerModule(deserModule);
     mapper.enable(SerializationFeature.INDENT_OUTPUT);
     mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
-    mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
+    mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, true);
     mapper.configure(Feature.ALLOW_COMMENTS, true);
     mapper.registerSubtypes(LogicalOperatorBase.getSubTypes(this));
     mapper.registerSubtypes(StorageEngineConfigBase.getSubTypes(this));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 5538d40..c50df09 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -22,6 +22,7 @@
   <id>binary-release</id>
   <formats>
     <format>tar.gz</format>
+    <format>dir</format>
   </formats>
   <includeBaseDirectory>true</includeBaseDirectory>
   <moduleSets>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2cc3fb4..9ea8304 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -237,6 +237,17 @@
           <artifactId>maprfs</artifactId>
           <version>1.0.3-mapr-3.0.0</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+          <version>0.94.13-mapr-1401-m7-3.0.2</version>
+          <exclusions>
+            <exclusion>
+              <groupId>org.slf4j</groupId>
+              <artifactId>slf4j-log4j12</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
       </dependencies>
     </profile>
     <profile>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 57927a7..b059d89 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -260,7 +260,43 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       assert index >= 0;
       int currentOffset = offsetVector.getAccessor().get(index);
       offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
-      data.setBytes(currentOffset, bytes);
+      data.setBytes(currentOffset, bytes, 0, bytes.length);
+    }
+
+    public boolean setSafe(int index, byte[] bytes) {
+      assert index >= 0;
+      int currentOffset = offsetVector.getAccessor().get(index);
+      if (data.capacity() < currentOffset + bytes.length) return false;
+      offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
+      data.setBytes(currentOffset, bytes, 0, bytes.length);
+      return true;
+    }
+
+    /**
+     * Set the variable length element at the specified index to the supplied byte array.
+     *
+     * @param index   position of the bit to set
+     * @param bytes   array of bytes to write
+     * @param start   start index of bytes to write
+     * @param length  length of bytes to write
+     */
+    public void set(int index, byte[] bytes, int start, int length) {
+      assert index >= 0;
+      int currentOffset = offsetVector.getAccessor().get(index);
+      offsetVector.getMutator().set(index + 1, currentOffset + length);
+      data.setBytes(currentOffset, bytes, start, length);
+    }
+
+    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+      assert index >= 0;
+
+      int currentOffset = offsetVector.getAccessor().get(index);
+
+      if (data.capacity() < currentOffset + length) return false;
+
+      offsetVector.getMutator().set(index + 1, currentOffset + length);
+      data.setBytes(currentOffset, bytes, start, length);
+      return true;
     }
 
    

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
index e9946df..688e680 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
@@ -28,7 +28,7 @@ public class TemplateClassDefinition<T>{
   private final Class<T> iface;
   private final Class<?> template;
   private final SignatureHolder signature;
-  private final AtomicLong classNumber = new AtomicLong(0);
+  private static final AtomicLong classNumber = new AtomicLong(0);
   
   public <X extends T> TemplateClassDefinition(Class<T> iface, Class<X> template) {
     super();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 8c1487c..94fcac5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -72,6 +72,7 @@ public class Wrapper {
   
   public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
     Preconditions.checkState(!endpointsAssigned);
+    Preconditions.checkNotNull(endpoint);
     EndpointAffinity ea = endpointAffinity.get(endpoint);
     if (ea == null) {
       ea = new EndpointAffinity(endpoint);
@@ -149,6 +150,7 @@ public class Wrapper {
       int start = ThreadLocalRandom.current().nextInt(div);
       // round robin with random start.
       for (int i = start; i < start + width; i++) {
+        Preconditions.checkNotNull(all.get(i % div));
         endpoints.add(all.get(i % div));
       }
     } else {
@@ -156,6 +158,7 @@ public class Wrapper {
       Collections.sort(values);
       values = Lists.reverse(values);
       for (int i = 0; i < width; i++) {
+        Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint());
         endpoints.add(values.get(i%values.size()).getEndpoint());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
new file mode 100644
index 0000000..8809706
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.util.*;
+
+public class HiveInputReader {
+  public static void main(String args[]) throws Exception {
+/*
+    String[] columnNames = {"n_nationkey", "n_name", "n_regionkey",   "n_comment"};
+    String[] columnTypes = {"bigint", "string", "bigint", "string"};
+
+    List<FieldSchema> cols = Lists.newArrayList();
+
+    for (int i = 0; i < columnNames.length; i++) {
+      cols.add(new FieldSchema(columnNames[i], columnTypes[i], null));
+    }
+    String location = "file:///tmp/nation_s";
+    String inputFormat = TextInputFormat.class.getCanonicalName();
+    String serdeLib = LazySimpleSerDe.class.getCanonicalName();
+//    String inputFormat = HiveHBaseTableInputFormat.class.getCanonicalName();
+//    String serdeLib = HBaseSerDe.class.getCanonicalName();
+    Map<String, String> serdeParams = new HashMap();
+//    serdeParams.put("serialization.format", "1");
+//    serdeParams.put("hbase.columns.mapping", ":key,f:name,f:regionkey,f:comment");
+    serdeParams.put("serialization.format", "|");
+    serdeParams.put("field.delim", "|");
+
+
+    Map<String, String> tableParams = new HashMap();
+    tableParams.put("hbase.table.name", "nation");
+    SerDeInfo serDeInfo = new SerDeInfo(null, serdeLib, serdeParams);
+    StorageDescriptor storageDescriptor = new StorageDescriptor(cols, location, inputFormat, null, false, -1, serDeInfo, null, null, null);
+    Table table = new Table("table", "default", "sphillips", 0, 0, 0, storageDescriptor, new ArrayList<FieldSchema>(), tableParams, null, null, "MANAGED_TABLE");
+    Properties properties = MetaStoreUtils.getTableMetadata(table);
+    */
+
+    HiveConf conf = new HiveConf();
+    conf.set("hive.metastore.uris", "thrift://10.10.31.51:9083");
+    HiveMetaStoreClient client = new HiveMetaStoreClient(conf);
+    Table table = client.getTable("default", "nation");
+    Properties properties = MetaStoreUtils.getTableMetadata(table);
+
+    Path path = new Path(table.getSd().getLocation());
+    JobConf job = new JobConf();
+    for (Object obj : properties.keySet()) {
+      job.set((String) obj, (String) properties.get(obj));
+    }
+//    job.set("hbase.zookeeper.quorum", "10.10.31.51");
+//    job.set("hbase.zookeeper.property.clientPort", "5181");
+    InputFormat f = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+    job.setInputFormat(f.getClass());
+    FileInputFormat.addInputPath(job, path);
+    InputFormat format = job.getInputFormat();
+    SerDe serde = (SerDe) Class.forName(table.getSd().getSerdeInfo().getSerializationLib()).getConstructor().newInstance();
+    serde.initialize(job, properties);
+    ObjectInspector inspector = serde.getObjectInspector();
+    ObjectInspector.Category cat = inspector.getCategory();
+    TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(inspector);
+    List<String> columns = null;
+    List<TypeInfo> colTypes = null;
+    List<ObjectInspector> fieldObjectInspectors = Lists.newArrayList();
+
+    switch(typeInfo.getCategory()) {
+      case STRUCT:
+        columns = ((StructTypeInfo) typeInfo).getAllStructFieldNames();
+        colTypes = ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos();
+        for (int i = 0; i < columns.size(); i++) {
+          System.out.print(columns.get(i));
+          System.out.print(" ");
+          System.out.print(colTypes.get(i));
+        }
+        System.out.println("");
+        for (StructField field : ((StructObjectInspector)inspector).getAllStructFieldRefs()) {
+          fieldObjectInspectors.add(field.getFieldObjectInspector());
+        }
+    }
+
+    for (InputSplit split : format.getSplits(job, 1)) {
+      String encoded = serializeInputSplit(split);
+      System.out.println(encoded);
+      InputSplit newSplit = deserializeInputSplit(encoded, split.getClass().getCanonicalName());
+      System.out.print("Length: " + newSplit.getLength() + " ");
+      System.out.print("Locations: ");
+      for (String loc : newSplit.getLocations()) System.out.print(loc + " " );
+      System.out.println();
+    }
+
+    for (InputSplit split : format.getSplits(job, 1)) {
+      RecordReader reader = format.getRecordReader(split, job, Reporter.NULL);
+      Object key = reader.createKey();
+      Object value = reader.createValue();
+      int count = 0;
+      while (reader.next(key, value)) {
+        List<Object> values = ((StructObjectInspector) inspector).getStructFieldsDataAsList(serde.deserialize((Writable) value));
+        StructObjectInspector sInsp = (StructObjectInspector) inspector;
+        Object obj = sInsp.getStructFieldData(serde.deserialize((Writable) value) , sInsp.getStructFieldRef("n_name"));
+        System.out.println(obj);
+        /*
+        for (Object obj : values) {
+          PrimitiveObjectInspector.PrimitiveCategory pCat = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveCategory();
+          Object pObj = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveJavaObject(obj);
+          System.out.print(pObj + " ");
+        }
+        */
+        System.out.println("");
+      }
+    }
+  }
+
+  public static String serializeInputSplit(InputSplit split) throws IOException {
+    ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
+    split.write(byteArrayOutputStream);
+    return Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+  }
+
+  public static InputSplit deserializeInputSplit(String base64, String className) throws Exception {
+    InputSplit split;
+    if (Class.forName(className) == FileSplit.class) {
+      split = new FileSplit((Path) null, 0, 0, (String[])null);
+    } else {
+      split = (InputSplit) Class.forName(className).getConstructor().newInstance();
+    }
+    ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
+    split.readFields(byteArrayDataInput);
+    return split;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 41a4d3d..6211e21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -17,19 +17,47 @@
  */
 package org.apache.drill.exec.store.hive;
 
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.exec.physical.OperatorCost;
 import org.apache.drill.exec.physical.ReadEntry;
 import org.apache.drill.exec.physical.base.Size;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
 
 public class HiveReadEntry implements ReadEntry {
-  private final HiveConf conf;
-  private final String table;
-  private Size size;
 
-  public HiveReadEntry(HiveConf conf, String table) {
-    this.conf = conf;
+  @JsonProperty("table")
+  public HiveTable table;
+  @JsonProperty("partitions")
+  public List<HiveTable.HivePartition> partitions;
+
+  @JsonIgnore
+  private List<Partition> partitionsUnwrapped = Lists.newArrayList();
+
+  @JsonCreator
+  public HiveReadEntry(@JsonProperty("table") HiveTable table, @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) {
     this.table = table;
+    this.partitions = partitions;
+    if (partitions != null) {
+      for(HiveTable.HivePartition part : partitions) {
+        partitionsUnwrapped.add(part.getPartition());
+      }
+    }
+  }
+
+  @JsonIgnore
+  public Table getTable() {
+    return table.getTable();
+  }
+
+  @JsonIgnore
+  public List<Partition> getPartitions() {
+    return partitionsUnwrapped;
   }
 
   @Override
@@ -40,11 +68,10 @@ public class HiveReadEntry implements ReadEntry {
 
   @Override
   public Size getSize() {
-    if (size != null) {
-      // TODO: contact the metastore and find the size of the data in table
-      size = new Size(1, 1);
-    }
+    // TODO: contact the metastore and find the size of the data in table
+    Size size = new Size(1, 1);
 
     return size;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
new file mode 100644
index 0000000..ef7266c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class HiveReadEntryOld implements ReadEntry {
+  private final HiveConf conf;
+  private final String table;
+  private Size size;
+
+  public HiveReadEntryOld(HiveConf conf, String table) {
+    this.conf = conf;
+    this.table = table;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    // TODO: need to come up with way to calculate the cost for Hive tables
+    return new OperatorCost(1, 1, 2, 2);
+  }
+
+  @Override
+  public Size getSize() {
+    if (size != null) {
+      // TODO: contact the metastore and find the size of the data in table
+      size = new Size(1, 1);
+    }
+
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
new file mode 100644
index 0000000..0a31a12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class HiveRecordReader implements RecordReader {
+
+  protected Table table;
+  protected Partition partition;
+  protected InputSplit inputSplit;
+  protected FragmentContext context;
+  protected List<FieldReference> columns;
+  protected List<String> columnNames;
+  protected List<String> partitionNames = Lists.newArrayList();
+  protected List<String> selectedPartitionNames = Lists.newArrayList();
+  protected List<String> selectedPartitionTypes = Lists.newArrayList();
+  protected List<String> tableColumns;
+  protected SerDe serde;
+  protected StructObjectInspector sInspector;
+  protected List<PrimitiveObjectInspector> fieldInspectors = Lists.newArrayList();
+  protected List<PrimitiveCategory> primitiveCategories = Lists.newArrayList();
+  protected Object key, value;
+  protected org.apache.hadoop.mapred.RecordReader reader;
+  protected List<ValueVector> vectors = Lists.newArrayList();
+  protected List<ValueVector> pVectors = Lists.newArrayList();
+  protected Object redoRecord;
+  List<Object> partitionValues = Lists.newArrayList();
+
+  protected static final int TARGET_RECORD_COUNT = 4000;
+
+  public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException {
+    this.table = table;
+    this.partition = partition;
+    this.inputSplit = inputSplit;
+    this.context = context;
+    this.columns = columns;
+    init();
+  }
+
+  private void init() throws ExecutionSetupException {
+    Properties properties;
+    JobConf job = new JobConf();
+    if (partition != null) {
+      properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+    } else {
+      properties = MetaStoreUtils.getTableMetadata(table);
+    }
+    for (Object obj : properties.keySet()) {
+      job.set((String) obj, (String) properties.get(obj));
+    }
+    InputFormat format;
+    String sLib = (partition == null) ? table.getSd().getSerdeInfo().getSerializationLib() : partition.getSd().getSerdeInfo().getSerializationLib();
+    String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
+    try {
+      format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
+      Class c = Class.forName(sLib);
+      serde = (SerDe) c.getConstructor().newInstance();
+      serde.initialize(job, properties);
+    } catch (ReflectiveOperationException | SerDeException e) {
+      throw new ExecutionSetupException("Unable to instantiate InputFormat", e);
+    }
+    job.setInputFormat(format.getClass());
+
+    if (partition != null) {
+      List<FieldSchema> partitionKeys = table.getPartitionKeys();
+      for (FieldSchema field : partitionKeys) {
+        partitionNames.add(field.getName());
+      }
+    }
+
+    try {
+      ObjectInspector oi = serde.getObjectInspector();
+      if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+        throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+      }
+      sInspector = (StructObjectInspector) oi;
+      StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector);
+      if (columns == null) {
+        columnNames = sTypeInfo.getAllStructFieldNames();
+        tableColumns = columnNames;
+      } else {
+        tableColumns = sTypeInfo.getAllStructFieldNames();
+        List<Integer> columnIds = Lists.newArrayList();
+        columnNames = Lists.newArrayList();
+        for (FieldReference field : columns) {
+          String columnName = field.getPath().toString();
+          if (!tableColumns.contains(columnName)) {
+            if (partition != null && partitionNames.contains(columnName)) {
+              selectedPartitionNames.add(columnName);
+            } else {
+              throw new ExecutionSetupException(String.format("Column %s does not exist", columnName));
+            }
+          } else {
+            columnIds.add(tableColumns.indexOf(columnName));
+            columnNames.add(columnName);
+          }
+        }
+        ColumnProjectionUtils.appendReadColumnIDs(job, columnIds);
+        ColumnProjectionUtils.appendReadColumnNames(job, columnNames);
+      }
+      for (String columnName : columnNames) {
+        ObjectInspector poi = sInspector.getStructFieldRef(columnName).getFieldObjectInspector();
+        if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+          throw new UnsupportedOperationException(String.format("%s type not supported", poi.getCategory()));
+        }
+        PrimitiveObjectInspector pInspector = (PrimitiveObjectInspector) poi;
+        fieldInspectors.add(pInspector);
+        primitiveCategories.add(pInspector.getPrimitiveCategory());
+      }
+
+      if (columns == null) {
+        selectedPartitionNames = partitionNames;
+      }
+
+      if (partition != null) {
+        for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+          FieldSchema field = table.getPartitionKeys().get(i);
+          if (selectedPartitionNames.contains(field.getName())) {
+            selectedPartitionTypes.add(field.getType());
+            partitionValues.add(convertPartitionType(field.getType(), partition.getValues().get(i)));
+          }
+        }
+      }
+    } catch (SerDeException e) {
+      throw new ExecutionSetupException(e);
+    }
+    try {
+      reader = format.getRecordReader(inputSplit, job, Reporter.NULL);
+    } catch (IOException e) {
+      throw new ExecutionSetupException("Failed to get Recordreader", e);
+    }
+    key = reader.createKey();
+    value = reader.createValue();
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    output.removeAllFields();
+    try {
+      for (int i = 0; i < columnNames.size(); i++) {
+        PrimitiveCategory pCat = primitiveCategories.get(i);
+        MaterializedField field = MaterializedField.create(new SchemaPath(columnNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(pCat));
+        ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+        vectors.add(vv);
+        output.addField(vv);
+      }
+      for (int i = 0; i < selectedPartitionNames.size(); i++) {
+        String type = selectedPartitionTypes.get(i);
+        MaterializedField field = MaterializedField.create(new SchemaPath(selectedPartitionNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(type));
+        ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+        pVectors.add(vv);
+        output.addField(vv);
+      }
+      output.setNewSchema();
+    } catch(SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  protected void populatePartitionVectors(int recordCount) {
+    for (int i = 0; i < pVectors.size(); i++) {
+      int size = 50;
+      ValueVector vector = pVectors.get(i);
+      Object val = partitionValues.get(i);
+      if (selectedPartitionTypes.get(i).equals("string") || selectedPartitionTypes.get(i).equals("binary")) {
+        size = ((byte[]) partitionValues.get(i)).length;
+      }
+      VectorAllocator.getAllocator(vector, size).alloc(recordCount);
+      switch(selectedPartitionTypes.get(i)) {
+        case "boolean": {
+          BitVector v = (BitVector) vector;
+          Boolean value = (Boolean) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value ? 1 : 0);
+          }
+          break;
+        }
+        case "tinyint": {
+          TinyIntVector v = (TinyIntVector) vector;
+          byte value = (byte) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        case "double": {
+          Float8Vector v = (Float8Vector) vector;
+          double value = (double) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        case "float": {
+          Float4Vector v = (Float4Vector) vector;
+          float value = (float) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        case "int": {
+          IntVector v = (IntVector) vector;
+          int value = (int) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        case "bigint": {
+          BigIntVector v = (BigIntVector) vector;
+          long value = (long) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        case "smallint": {
+          SmallIntVector v = (SmallIntVector) vector;
+          short value = (short) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        case "string": {
+          VarCharVector v = (VarCharVector) vector;
+          byte[] value = (byte[]) val;
+          for (int j = 0; j < recordCount; j++) {
+            v.getMutator().set(j, value);
+          }
+          break;
+        }
+        default:
+          throw new UnsupportedOperationException("Could not determine type: " + selectedPartitionTypes.get(i));
+      }
+      vector.getMutator().setValueCount(recordCount);
+    }
+  }
+
+  private Object convertPartitionType(String type, String value) {
+    switch (type) {
+      case "boolean":
+        return Boolean.parseBoolean(value);
+      case "tinyint":
+        return Byte.parseByte(value);
+      case "double":
+        return Double.parseDouble(value);
+      case "float":
+        return Float.parseFloat(value);
+      case "int":
+        return Integer.parseInt(value);
+      case "bigint":
+        return Long.parseLong(value);
+      case "smallint":
+        return Short.parseShort(value);
+      case "string":
+        return value.getBytes();
+      default:
+        throw new UnsupportedOperationException("Could not determine type: " + type);
+    }
+  }
+
+  public static TypeProtos.MajorType getMajorType(String type) {
+    switch(type) {
+      case "binary":
+        return Types.required(TypeProtos.MinorType.VARBINARY);
+      case "boolean":
+        return Types.required(TypeProtos.MinorType.BIT);
+      case "tinyint":
+        return Types.required(TypeProtos.MinorType.TINYINT);
+      case "decimal":
+        return Types.required(TypeProtos.MinorType.DECIMAL16);
+      case "double":
+        return Types.required(TypeProtos.MinorType.FLOAT8);
+      case "float":
+        return Types.required(TypeProtos.MinorType.FLOAT4);
+      case "int":
+        return Types.required(TypeProtos.MinorType.INT);
+      case "bigint":
+        return Types.required(TypeProtos.MinorType.BIGINT);
+      case "smallint":
+        return Types.required(TypeProtos.MinorType.SMALLINT);
+      case "string":
+        return Types.required(TypeProtos.MinorType.VARCHAR);
+      case "varchar":
+
+      default:
+        throw new UnsupportedOperationException("Could not determine type: " + type);
+    }
+  }
+
+  public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) {
+    switch(pCat) {
+      case BINARY:
+        return Types.required(TypeProtos.MinorType.VARBINARY);
+      case BOOLEAN:
+        return Types.required(TypeProtos.MinorType.BIT);
+      case BYTE:
+        return Types.required(TypeProtos.MinorType.TINYINT);
+      case DECIMAL:
+        return Types.required(TypeProtos.MinorType.DECIMAL16);
+      case DOUBLE:
+        return Types.required(TypeProtos.MinorType.FLOAT8);
+      case FLOAT:
+        return Types.required(TypeProtos.MinorType.FLOAT4);
+      case INT:
+        return Types.required(TypeProtos.MinorType.INT);
+      case LONG:
+        return Types.required(TypeProtos.MinorType.BIGINT);
+      case SHORT:
+        return Types.required(TypeProtos.MinorType.SMALLINT);
+      case STRING:
+        return Types.required(TypeProtos.MinorType.VARCHAR);
+      case TIMESTAMP:
+
+      default:
+        throw new UnsupportedOperationException("Could not determine type");
+    }
+  }
+
+  public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, Object fieldValue) {
+    switch(pCat) {
+      case BINARY:
+        ((VarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue);
+      case BOOLEAN:
+        boolean isSet = (boolean) fieldValue;
+        return ((BitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 );
+      case BYTE:
+        return ((TinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue);
+      case DECIMAL:
+        throw new UnsupportedOperationException();
+      case DOUBLE:
+        return ((Float8Vector) vv).getMutator().setSafe(index, (double) fieldValue);
+      case FLOAT:
+        return ((Float4Vector) vv).getMutator().setSafe(index, (float) fieldValue);
+      case INT:
+        return ((IntVector) vv).getMutator().setSafe(index, (int) fieldValue);
+      case LONG:
+        return ((BigIntVector) vv).getMutator().setSafe(index, (long) fieldValue);
+      case SHORT:
+        return ((SmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue);
+      case STRING:
+        int len = ((Text) fieldValue).getLength();
+        byte[] bytes = ((Text) fieldValue).getBytes();
+        return ((VarCharVector) vv).getMutator().setSafe(index, bytes, 0, len);
+      case TIMESTAMP:
+        throw new UnsupportedOperationException();
+
+      default:
+        throw new UnsupportedOperationException("Could not determine type");
+    }
+  }
+
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT);
+    }
+    try {
+      int recordCount = 0;
+      if (redoRecord != null) {
+        Object deSerializedValue = serde.deserialize((Writable) redoRecord);
+        for (int i = 0; i < columnNames.size(); i++) {
+          Object obj;
+          String columnName = columnNames.get(i);
+          if (primitiveCategories.get(i) == PrimitiveCategory.STRING) {
+            obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+          } else {
+            obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+          }
+          boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj);
+          if (!success) {
+            throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnName));
+          }
+        }
+        redoRecord = null;
+        recordCount++;
+      }
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+        Object deSerializedValue = serde.deserialize((Writable) value);
+        for (int i = 0; i < columnNames.size(); i++) {
+          Object obj;
+          String columnName = columnNames.get(i);
+          if (primitiveCategories.get(i) == PrimitiveCategory.STRING) {
+            obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+          } else {
+            obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+          }
+          boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj);
+          if (!success) {
+            redoRecord = value;
+            if (partition != null) populatePartitionVectors(recordCount);
+            for (ValueVector v : vectors) {
+              v.getMutator().setValueCount(recordCount);
+            }
+            if (partition != null) populatePartitionVectors(recordCount);
+            return recordCount;
+          }
+        }
+        recordCount++;
+      }
+      for (ValueVector v : vectors) {
+        v.getMutator().setValueCount(recordCount);
+      }
+      if (partition != null) populatePartitionVectors(recordCount);
+      return recordCount;
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
new file mode 100644
index 0000000..bc2a16b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.*;
+
+@JsonTypeName("hive-scan")
+public class HiveScan extends AbstractGroupScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+
+  @JsonProperty("hive-table")
+  public HiveReadEntry hiveReadEntry;
+  @JsonIgnore
+  private Table table;
+  @JsonIgnore
+  private List<InputSplit> inputSplits = Lists.newArrayList();
+  @JsonIgnore
+  public HiveStorageEngine storageEngine;
+  @JsonProperty("storageengine")
+  public HiveStorageEngineConfig engineConfig;
+
+  @JsonIgnore
+  public List<Partition> partitions;
+  @JsonIgnore
+  private Collection<DrillbitEndpoint> endpoints;
+
+  @JsonProperty("columns")
+  public List<FieldReference> columns;
+
+  @JsonIgnore
+  List<List<InputSplit>> mappings;
+
+  @JsonIgnore
+  Map<InputSplit, Partition> partitionMap = new HashMap();
+
+  @JsonCreator
+  public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storageengine") HiveStorageEngineConfig config,
+                  @JsonProperty("columns") List<FieldReference> columns,
+                  @JacksonInject StorageEngineRegistry engineRegistry) throws ExecutionSetupException {
+    this.hiveReadEntry = hiveReadEntry;
+    this.table = hiveReadEntry.getTable();
+    this.engineConfig = config;
+    this.storageEngine = (HiveStorageEngine) engineRegistry.getEngine(config);
+    this.columns = columns;
+    this.partitions = hiveReadEntry.getPartitions();
+    getSplits();
+    endpoints = storageEngine.getContext().getBits();
+  }
+
+  public HiveScan(HiveReadEntry hiveReadEntry, HiveStorageEngine storageEngine, List<FieldReference> columns) throws ExecutionSetupException {
+    this.table = hiveReadEntry.getTable();
+    this.hiveReadEntry = hiveReadEntry;
+    this.columns = columns;
+    this.partitions = hiveReadEntry.getPartitions();
+    getSplits();
+    endpoints = storageEngine.getContext().getBits();
+    this.engineConfig = storageEngine.getConfig();
+  }
+
+  public List<FieldReference> getColumns() {
+    return columns;
+  }
+
+  private void getSplits() throws ExecutionSetupException {
+    try {
+      if (partitions == null || partitions.size() == 0) {
+        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        JobConf job = new JobConf();
+        for (Object obj : properties.keySet()) {
+          job.set((String) obj, (String) properties.get(obj));
+        }
+        InputFormat format = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+        job.setInputFormat(format.getClass());
+        Path path = new Path(table.getSd().getLocation());
+        FileInputFormat.addInputPath(job, path);
+        format = job.getInputFormat();
+        for (InputSplit split : format.getSplits(job, 1)) {
+          inputSplits.add(split);
+        }
+        for (InputSplit split : inputSplits) {
+          partitionMap.put(split, null);
+        }
+      } else {
+        for (Partition partition : partitions) {
+          Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+          JobConf job = new JobConf();
+          for (Object obj : properties.keySet()) {
+            job.set((String) obj, (String) properties.get(obj));
+          }
+          InputFormat format = (InputFormat) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
+          job.setInputFormat(format.getClass());
+          FileInputFormat.addInputPath(job, new Path(partition.getSd().getLocation()));
+          format = job.getInputFormat();
+          InputSplit[] splits = format.getSplits(job,1);
+          for (InputSplit split : splits) {
+            inputSplits.add(split);
+            partitionMap.put(split, partition);
+          }
+        }
+      }
+    } catch (ReflectiveOperationException | IOException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+    mappings = Lists.newArrayList();
+    for (int i = 0; i < endpoints.size(); i++) {
+      mappings.add(new ArrayList<InputSplit>());
+    }
+    int count = endpoints.size();
+    for (int i = 0; i < inputSplits.size(); i++) {
+      mappings.get(i % count).add(inputSplits.get(i));
+    }
+  }
+
+  public static String serializeInputSplit(InputSplit split) throws IOException {
+    ByteArrayDataOutput byteArrayOutputStream =  ByteStreams.newDataOutput();
+    split.write(byteArrayOutputStream);
+    String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+    logger.debug("Encoded split string for split {} : {}", split, encoded);
+    return encoded;
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+    try {
+      List<InputSplit> splits = mappings.get(minorFragmentId);
+      List<Partition> parts = Lists.newArrayList();
+      List<String> encodedInputSplits = Lists.newArrayList();
+      List<String> splitTypes = Lists.newArrayList();
+      for (InputSplit split : splits) {
+        parts.add(partitionMap.get(split));
+        encodedInputSplits.add(serializeInputSplit(split));
+        splitTypes.add(split.getClass().getCanonicalName());
+      }
+      if (parts.contains(null)) parts = null;
+      return new HiveSubScan(table, parts, encodedInputSplits, splitTypes, columns);
+    } catch (IOException | ReflectiveOperationException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return inputSplits.size();
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    Map<String, DrillbitEndpoint> endpointMap = new HashMap();
+    for (DrillbitEndpoint endpoint : endpoints) {
+      endpointMap.put(endpoint.getAddress(), endpoint);
+      logger.debug("endpoing address: {}", endpoint.getAddress());
+    }
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap();
+    try {
+      long totalSize = 0;
+      for (InputSplit split : inputSplits) {
+        totalSize += Math.max(1, split.getLength());
+      }
+      for (InputSplit split : inputSplits) {
+        float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+        for (String loc : split.getLocations()) {
+          logger.debug("split location: {}", loc);
+          DrillbitEndpoint endpoint = endpointMap.get(loc);
+          if (endpoint != null) {
+            if (affinityMap.containsKey(endpoint)) {
+              affinityMap.get(endpoint).addAffinity(affinity);
+            } else {
+              affinityMap.put(endpoint, new EndpointAffinity(endpoint, affinity));
+            }
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+    for (DrillbitEndpoint ep : affinityMap.keySet()) {
+      Preconditions.checkNotNull(ep);
+    }
+    for (EndpointAffinity a : affinityMap.values()) {
+      Preconditions.checkNotNull(a.getEndpoint());
+    }
+    return Lists.newArrayList(affinityMap.values());
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1, 2, 1, 1);
+  }
+
+  @Override
+  public Size getSize() {
+    // TODO - this is wrong, need to populate correctly
+    return new Size(10,10);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    return new HiveScan(hiveReadEntry, storageEngine, columns);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
new file mode 100644
index 0000000..b155661
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.util.List;
+
+public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+    List<RecordReader> readers = Lists.newArrayList();
+    Table table = config.table;
+    List<InputSplit> splits = config.getInputSplits();
+    List<Partition> partitions = config.partitions;
+    if (partitions == null || partitions.size() == 0) {
+      if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
+              table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
+              config.getColumns() != null) {
+        for (InputSplit split : splits) {
+          readers.add(new HiveTextRecordReader(table, null, split, config.getColumns(), context));
+        }
+      } else {
+        for (InputSplit split : splits) {
+          readers.add(new HiveRecordReader(table, null, split, config.getColumns(), context));
+        }
+      }
+    } else {
+      int i = 0;
+      if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
+              table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
+              config.getColumns() != null) {
+        for (InputSplit split : splits) {
+          readers.add(new HiveTextRecordReader(table, partitions.get(i++), split, config.getColumns(), context));
+        }
+      } else {
+        for (InputSplit split : splits) {
+          readers.add(new HiveRecordReader(config.table, partitions.get(i++), split, config.getColumns(), context));
+        }
+      }
+    }
+    return new ScanBatch(context, readers.iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
deleted file mode 100644
index 38ec007..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.SchemaProvider;
-
-import com.beust.jcommander.internal.Lists;
-
-public class HiveSchemaProvider implements SchemaProvider{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaProvider.class);
-
-  final HiveStorageEngineConfig configuration;
-
-  public HiveSchemaProvider(HiveStorageEngineConfig configuration, DrillConfig config){
-    this.configuration = configuration;
-  }
-
-  @Override
-  public Object getSelectionBaseOnName(String tableName) {
-    HiveReadEntry re = new HiveReadEntry(configuration.getHiveConf(), tableName);
-    return Lists.newArrayList(re);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
new file mode 100644
index 0000000..0f6f3bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HiveStorageEngine extends AbstractStorageEngine {
+
+  private HiveStorageEngineConfig config;
+  private HiveConf hiveConf;
+  private HiveSchemaProvider schemaProvider;
+  static private DrillbitContext context;
+
+  public HiveStorageEngine(HiveStorageEngineConfig config, DrillbitContext context) throws ExecutionSetupException {
+    this.config = config;
+    this.context = context;
+    this.hiveConf = config.getHiveConf();
+  }
+
+  public HiveStorageEngineConfig getConfig() {
+    return config;
+  }
+
+  public DrillbitContext getContext() {
+    return context;
+  }
+
+  @Override
+  public HiveScan getPhysicalScan(Scan scan) throws IOException {
+    HiveReadEntry hiveReadEntry = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
+    try {
+      List<Partition> partitions = getSchemaProvider().getPartitions(hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
+      return new HiveScan(hiveReadEntry, this, null);
+    } catch (ExecutionSetupException | TException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public HiveSchemaProvider getSchemaProvider() {
+    try {
+    if (schemaProvider == null) {
+      schemaProvider = new HiveSchemaProvider(config, context.getConfig());
+    }
+    return schemaProvider;
+    } catch (ExecutionSetupException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  List<String> getPartitions(String dbName, String tableName) throws TException {
+    List<Partition> partitions = getSchemaProvider().getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+    List<String> partitionLocations = Lists.newArrayList();
+    if (partitions == null) return null;
+    for (Partition part : partitions) {
+      partitionLocations.add(part.getSd().getLocation());
+    }
+    return partitionLocations;
+  }
+
+  public static class HiveEntry implements ReadEntry {
+
+    private Table table;
+
+    public HiveEntry(Table table) {
+      this.table = table;
+    }
+
+    public Table getTable() {
+      return table;
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+              "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+    }
+
+    @Override
+    public Size getSize() {
+      throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+              "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+    }
+  }
+
+  public static class HiveSchemaProvider implements SchemaProvider {
+
+    private HiveConf hiveConf;
+    private HiveMetaStoreClient metaClient;
+
+    public HiveSchemaProvider(HiveStorageEngineConfig config, DrillConfig dConfig) throws ExecutionSetupException {
+      hiveConf = config.getHiveConf();
+    }
+
+    public HiveMetaStoreClient getMetaClient() throws MetaException {
+      if (metaClient == null) {
+        metaClient = new HiveMetaStoreClient(hiveConf);
+      }
+      return metaClient;
+    }
+
+    public Table getTable(String dbName, String tableName) throws TException {
+      HiveMetaStoreClient mClient = getMetaClient();
+      try {
+        return  mClient.getTable(dbName, tableName);
+      }catch (NoSuchObjectException e) {
+        logger.error("Database: {} table: {} not found", dbName, tableName);
+        throw new RuntimeException(e);
+      } catch (TException e) {
+        mClient.reconnect();
+        return  mClient.getTable(dbName, tableName);
+      }
+    }
+
+    List<Partition> getPartitions(String dbName, String tableName) throws TException {
+      HiveMetaStoreClient mClient = getMetaClient();
+      List<Partition> partitions;
+      try {
+        partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+      } catch (TException e) {
+        mClient.reconnect();
+        partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+      }
+      return partitions;
+    }
+
+    @Override
+    public HiveReadEntry getSelectionBaseOnName(String name) {
+      String[] dbNameTableName = name.split("\\.");
+      String dbName;
+      String t;
+      if (dbNameTableName.length > 1) {
+        dbName = dbNameTableName[0];
+        t = dbNameTableName[1];
+      } else {
+        dbName = "default";
+        t = name;
+      }
+
+      try {
+        Table table = getTable(dbName, t);
+        List<Partition> partitions = getPartitions(dbName, t);
+        List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
+        for(Partition part : partitions) {
+          hivePartitions.add(new HiveTable.HivePartition(part));
+        }
+        if (hivePartitions.size() == 0) hivePartitions = null;
+        return new HiveReadEntry(new HiveTable(table), hivePartitions);
+      } catch (NoSuchObjectException e) {
+        throw new DrillRuntimeException(e);
+      } catch (TException e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
index 0a2c5de..91fec3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hive;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.logical.StorageEngineConfigBase;
@@ -27,9 +28,12 @@ import java.util.Map;
 
 @JsonTypeName("hive")
 public class HiveStorageEngineConfig extends StorageEngineConfigBase {
-  private Map<String, String> configProps;
+  @JsonProperty
+  public Map<String, String> configProps;
+  @JsonIgnore
   private HiveConf hiveConf;
 
+  @JsonIgnore
   public HiveConf getHiveConf() {
     if (hiveConf == null) {
       hiveConf = new HiveConf();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
new file mode 100644
index 0000000..8ff7c82
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class HiveSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty("splits")
+  public List<String> encodedSplits;
+  @JsonProperty("hive-table")
+  public Table table;
+  @JsonProperty("partitions")
+  public List<Partition> partitions;
+  @JsonIgnore
+  private List<InputSplit> inputSplits = Lists.newArrayList();
+  @JsonProperty("splitClass")
+  public List<String> splitClasses;
+
+  @JsonProperty("columns")
+  public List<FieldReference> columns;
+
+  @JsonCreator
+  public HiveSubScan(@JsonProperty("hive-table") Table table,
+                     @JsonProperty("partition") List<Partition> partitions,
+                     @JsonProperty("splits") List<String> encodedSplits,
+                     @JsonProperty("splitClasses") List<String> splitClasses,
+                     @JsonProperty("columns") List<FieldReference> columns) throws IOException, ReflectiveOperationException {
+    this.table = table;
+    this.partitions = partitions;
+    this.encodedSplits = encodedSplits;
+    this.splitClasses = splitClasses;
+    this.columns = columns;
+
+    for (int i = 0; i < encodedSplits.size(); i++) {
+      inputSplits.add(deserializeInputSplit(encodedSplits.get(i), splitClasses.get(i)));
+    }
+  }
+
+  public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{
+    InputSplit split;
+    if (Class.forName(className) == FileSplit.class) {
+      split = new FileSplit((Path) null, 0, 0, (String[])null);
+    } else {
+      split = (InputSplit) Class.forName(className).getConstructor().newInstance();
+    }
+    ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
+    split.readFields(byteArrayDataInput);
+    return split;
+  }
+
+  public List<FieldReference> getColumns() {
+    return columns;
+  }
+
+  public List<InputSplit> getInputSplits() {
+    return inputSplits;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1, 2, 1, 1);
+  }
+
+  @Override
+  public Size getSize() {
+    // TODO - this is wrong, need to populate correctly
+    return new Size(10,10);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    try {
+      return new HiveSubScan(table, partitions, encodedSplits, splitClasses, columns);
+    } catch (IOException | ReflectiveOperationException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
new file mode 100644
index 0000000..3858804
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.hadoop.hive.metastore.api.*;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("table")
+public class HiveTable {
+
+  @JsonIgnore
+  private Table table;
+
+  @JsonProperty
+  public String tableName;
+  @JsonProperty
+  public String dbName;
+  @JsonProperty
+  public String owner;
+  @JsonProperty
+  public int createTime;
+  @JsonProperty
+  public int lastAccessTime;
+  @JsonProperty
+  public int retention;
+  @JsonProperty
+  public StorageDescriptorWrapper sd;
+  @JsonProperty
+  public List<FieldSchemaWrapper> partitionKeys;
+  @JsonProperty
+  public Map<String,String> parameters;
+  @JsonProperty
+  public String viewOriginalText;
+  @JsonProperty
+  public String viewExpandedText;
+  @JsonProperty
+  public String tableType;
+
+  @JsonCreator
+  public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime,
+                   @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
+                   @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
+                   @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType
+                   ) {
+    this.tableName = tableName;
+    this.dbName = dbName;
+    this.owner = owner;
+    this.createTime = createTime;
+    this.lastAccessTime = lastAccessTime;
+    this.retention = retention;
+    this.sd = sd;
+    this.partitionKeys = partitionKeys;
+    this.parameters = parameters;
+    this.viewOriginalText = viewOriginalText;
+    this.viewExpandedText = viewExpandedText;
+    this.tableType = tableType;
+
+    List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
+    for (FieldSchemaWrapper w : partitionKeys) partitionKeysUnwrapped.add(w.getFieldSchema());
+    StorageDescriptor sdUnwrapped = sd.getSd();
+    this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
+            parameters, viewOriginalText, viewExpandedText, tableType);
+  }
+
+  public HiveTable(Table table) {
+    if (table == null) return;
+    this.table = table;
+    this.tableName = table.getTableName();
+    this.dbName = table.getDbName();
+    this.owner = table.getOwner();
+    this.createTime = table.getCreateTime();
+    this.lastAccessTime = table.getLastAccessTime();
+    this.retention = table.getRetention();
+    this.sd = new StorageDescriptorWrapper(table.getSd());
+    this.partitionKeys = Lists.newArrayList();
+    for (FieldSchema f : table.getPartitionKeys()) this.partitionKeys.add(new FieldSchemaWrapper(f));
+    this.parameters = table.getParameters();
+    this.viewOriginalText = table.getViewOriginalText();
+    this.viewExpandedText = table.getViewExpandedText();
+    this.tableType = table.getTableType();
+  }
+
+  @JsonIgnore
+  public Table getTable() {
+    return table;
+  }
+
+  public static class HivePartition {
+
+    @JsonIgnore
+    private Partition partition;
+
+    @JsonProperty
+    public List<String> values;
+    @JsonProperty
+    public String tableName;
+    @JsonProperty
+    public String dbName;
+    @JsonProperty
+    public int createTime;
+    @JsonProperty
+    public int lastAccessTime;
+    @JsonProperty
+    public StorageDescriptorWrapper sd;
+    @JsonProperty
+    public Map<String,String> parameters;
+
+    @JsonCreator
+    public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
+                     @JsonProperty("lastAccessTime") int lastAccessTime,  @JsonProperty("sd") StorageDescriptorWrapper sd,
+                     @JsonProperty("parameters") Map<String, String> parameters
+    ) {
+      this.values = values;
+      this.tableName = tableName;
+      this.dbName = dbName;
+      this.createTime = createTime;
+      this.lastAccessTime = lastAccessTime;
+      this.sd = sd;
+      this.parameters = parameters;
+
+      StorageDescriptor sdUnwrapped = sd.getSd();
+      this.partition = new org.apache.hadoop.hive.metastore.api.Partition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters);
+    }
+
+    public HivePartition(Partition partition) {
+      if (partition == null) return;
+      this.partition = partition;
+      this.values = partition.getValues();
+      this.tableName = partition.getTableName();
+      this.dbName = partition.getDbName();
+      this.createTime = partition.getCreateTime();
+      this.lastAccessTime = partition.getLastAccessTime();
+      this.sd = new StorageDescriptorWrapper(partition.getSd());
+      this.parameters = partition.getParameters();
+    }
+
+    @JsonIgnore
+    public Partition getPartition() {
+      return partition;
+    }
+  }
+
+  public static class StorageDescriptorWrapper {
+    @JsonIgnore
+    private StorageDescriptor sd;
+    @JsonProperty
+    public List<FieldSchemaWrapper> cols;
+    @JsonProperty
+    public String location;
+    @JsonProperty
+    public String inputFormat;
+    @JsonProperty
+    public String outputFormat;
+    @JsonProperty
+    public boolean compressed;
+    @JsonProperty
+    public int numBuckets;
+    @JsonProperty
+    public SerDeInfoWrapper serDeInfo;
+//    @JsonProperty
+//    public List<String> bucketCols;
+    @JsonProperty
+    public List<OrderWrapper> sortCols;
+    @JsonProperty
+    public Map<String,String> parameters;
+
+    @JsonCreator
+    public StorageDescriptorWrapper(@JsonProperty("cols") List<FieldSchemaWrapper> cols, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
+                                    @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
+                                    @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo,  @JsonProperty("sortCols") List<OrderWrapper> sortCols,
+                                    @JsonProperty("parameters") Map<String,String> parameters) {
+      this.cols = cols;
+      this.location = location;
+      this.inputFormat = inputFormat;
+      this.outputFormat = outputFormat;
+      this.compressed = compressed;
+      this.numBuckets = numBuckets;
+      this.serDeInfo = serDeInfo;
+//      this.bucketCols = bucketCols;
+      this.sortCols = sortCols;
+      this.parameters = parameters;
+      List<FieldSchema> colsUnwrapped = Lists.newArrayList();
+      for (FieldSchemaWrapper w: cols) colsUnwrapped.add(w.getFieldSchema());
+      SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
+      List<Order> sortColsUnwrapped = Lists.newArrayList();
+      for (OrderWrapper w : sortCols) sortColsUnwrapped.add(w.getOrder());
+//      this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
+//              bucketCols, sortColsUnwrapped, parameters);
+      this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
+              null, sortColsUnwrapped, parameters);
+    }
+
+    public StorageDescriptorWrapper(StorageDescriptor sd) {
+      this.sd = sd;
+      this.cols = Lists.newArrayList();
+      for (FieldSchema f : sd.getCols()) this.cols.add(new FieldSchemaWrapper(f));
+      this.location = sd.getLocation();
+      this.inputFormat = sd.getInputFormat();
+      this.outputFormat = sd.getOutputFormat();
+      this.compressed = sd.isCompressed();
+      this.numBuckets = sd.getNumBuckets();
+      this.serDeInfo = new SerDeInfoWrapper(sd.getSerdeInfo());
+//      this.bucketCols = sd.getBucketCols();
+      this.sortCols = Lists.newArrayList();
+      for (Order o : sd.getSortCols()) this.sortCols.add(new OrderWrapper(o));
+      this.parameters = sd.getParameters();
+    }
+
+    @JsonIgnore
+    public StorageDescriptor getSd() {
+      return sd;
+    }
+
+  }
+
+  public static class SerDeInfoWrapper {
+    @JsonIgnore
+    private SerDeInfo serDeInfo;
+    @JsonProperty
+    public String name;
+    @JsonProperty
+    public String serializationLib;
+    @JsonProperty
+    public Map<String,String> parameters;
+
+    @JsonCreator
+    public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
+      this.name = name;
+      this.serializationLib = serializationLib;
+      this.parameters = parameters;
+      this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
+    }
+
+    public SerDeInfoWrapper(SerDeInfo serDeInfo) {
+      this.serDeInfo = serDeInfo;
+      this.name = serDeInfo.getName();
+      this.serializationLib = serDeInfo.getSerializationLib();
+      this.parameters = serDeInfo.getParameters();
+    }
+
+    @JsonIgnore
+    public SerDeInfo getSerDeInfo() {
+      return serDeInfo;
+    }
+  }
+
+  public static class FieldSchemaWrapper {
+    @JsonIgnore
+    private FieldSchema fieldSchema;
+    @JsonProperty
+    public String name;
+    @JsonProperty
+    public String type;
+    @JsonProperty
+    public String comment;
+
+    @JsonCreator
+    public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
+      this.name = name;
+      this.type = type;
+      this.comment = comment;
+      this.fieldSchema = new FieldSchema(name, type, comment);
+    }
+
+    public FieldSchemaWrapper(FieldSchema fieldSchema) {
+      this.fieldSchema = fieldSchema;
+      this.name = fieldSchema.getName();
+      this.type = fieldSchema.getType();
+      this.comment = fieldSchema.getComment();
+    }
+
+    @JsonIgnore
+    public FieldSchema getFieldSchema() {
+      return fieldSchema;
+    }
+  }
+
+  public static class OrderWrapper {
+    @JsonIgnore
+    private Order ord;
+    @JsonProperty
+    public String col;
+    @JsonProperty
+    public int order;
+
+    @JsonCreator
+    public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
+      this.col = col;
+      this.order = order;
+    }
+
+    public OrderWrapper(Order ord) {
+      this.ord = ord;
+      this.col = ord.getCol();
+      this.order = ord.getOrder();
+    }
+
+    @JsonIgnore
+    public Order getOrder() {
+      return ord;
+    }
+  }
+}


Mime
View raw message