DRILL-357: Hive Storage Engine phase 2 - hive record reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/cdf46fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/cdf46fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/cdf46fd3
Branch: refs/heads/master
Commit: cdf46fd36fdfc2e3029a6b2e077330c665e43c2e
Parents: a9a7ea8
Author: Steven Phillips <sphillips@maprtech.com>
Authored: Thu Jan 23 18:32:03 2014 -0800
Committer: Jacques Nadeau <jacques@apache.org>
Committed: Mon Mar 3 23:21:50 2014 -0800
----------------------------------------------------------------------
.../apache/drill/common/config/DrillConfig.java | 2 +-
distribution/src/assemble/bin.xml | 1 +
exec/java-exec/pom.xml | 11 +
.../templates/VariableLengthVectors.java | 38 +-
.../exec/compile/TemplateClassDefinition.java | 2 +-
.../drill/exec/planner/fragment/Wrapper.java | 3 +
.../drill/exec/store/hive/HiveInputReader.java | 173 +++++++
.../drill/exec/store/hive/HiveReadEntry.java | 47 +-
.../drill/exec/store/hive/HiveReadEntryOld.java | 50 ++
.../drill/exec/store/hive/HiveRecordReader.java | 473 +++++++++++++++++++
.../apache/drill/exec/store/hive/HiveScan.java | 247 ++++++++++
.../exec/store/hive/HiveScanBatchCreator.java | 71 +++
.../exec/store/hive/HiveSchemaProvider.java | 39 --
.../exec/store/hive/HiveStorageEngine.java | 194 ++++++++
.../store/hive/HiveStorageEngineConfig.java | 6 +-
.../drill/exec/store/hive/HiveSubScan.java | 124 +++++
.../apache/drill/exec/store/hive/HiveTable.java | 325 +++++++++++++
.../exec/store/hive/HiveTextRecordReader.java | 172 +++++++
.../store/parquet/ParquetSchemaProvider.java | 1 -
.../apache/drill/exec/util/BatchPrinter.java | 93 ++++
.../src/main/resources/drill-module.conf | 3 +-
.../java/org/apache/drill/exec/TestPlan.java | 48 ++
.../drill/exec/store/hive/TestHiveScan.java | 67 +++
.../apache/drill/exec/util/BatchPrinter.java | 93 ----
.../java-exec/src/test/resources/hive/test.json | 75 +++
pom.xml | 51 ++
.../org/apache/drill/jdbc/DrillHandler.java | 2 +-
.../drill/sql/client/full/BatchLoaderMap.java | 2 +-
.../sql/client/full/HiveDatabaseSchema.java | 115 +++--
.../drill/sql/client/full/HiveSchema.java | 5 +-
.../apache/drill/jdbc/test/FullEngineTest.java | 1 +
31 files changed, 2349 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index d81ef79..a0e261f 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -64,7 +64,7 @@ public final class DrillConfig extends NestedConfig{
mapper.registerModule(deserModule);
mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
- mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
+ mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, true);
mapper.configure(Feature.ALLOW_COMMENTS, true);
mapper.registerSubtypes(LogicalOperatorBase.getSubTypes(this));
mapper.registerSubtypes(StorageEngineConfigBase.getSubTypes(this));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 5538d40..c50df09 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -22,6 +22,7 @@
<id>binary-release</id>
<formats>
<format>tar.gz</format>
+ <format>dir</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<moduleSets>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2cc3fb4..9ea8304 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -237,6 +237,17 @@
<artifactId>maprfs</artifactId>
<version>1.0.3-mapr-3.0.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.94.13-mapr-1401-m7-3.0.2</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</profile>
<profile>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 57927a7..b059d89 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -260,7 +260,43 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
assert index >= 0;
int currentOffset = offsetVector.getAccessor().get(index);
offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
- data.setBytes(currentOffset, bytes);
+ data.setBytes(currentOffset, bytes, 0, bytes.length);
+ }
+
+ public boolean setSafe(int index, byte[] bytes) {
+ assert index >= 0;
+ int currentOffset = offsetVector.getAccessor().get(index);
+ if (data.capacity() < currentOffset + bytes.length) return false;
+ offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
+ data.setBytes(currentOffset, bytes, 0, bytes.length);
+ return true;
+ }
+
+ /**
+ * Set the variable length element at the specified index to the supplied byte array.
+ *
+ * @param index position of the bit to set
+ * @param bytes array of bytes to write
+ * @param start start index of bytes to write
+ * @param length length of bytes to write
+ */
+ public void set(int index, byte[] bytes, int start, int length) {
+ assert index >= 0;
+ int currentOffset = offsetVector.getAccessor().get(index);
+ offsetVector.getMutator().set(index + 1, currentOffset + length);
+ data.setBytes(currentOffset, bytes, start, length);
+ }
+
+ public boolean setSafe(int index, byte[] bytes, int start, int length) {
+ assert index >= 0;
+
+ int currentOffset = offsetVector.getAccessor().get(index);
+
+ if (data.capacity() < currentOffset + length) return false;
+
+ offsetVector.getMutator().set(index + 1, currentOffset + length);
+ data.setBytes(currentOffset, bytes, start, length);
+ return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
index e9946df..688e680 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
@@ -28,7 +28,7 @@ public class TemplateClassDefinition<T>{
private final Class<T> iface;
private final Class<?> template;
private final SignatureHolder signature;
- private final AtomicLong classNumber = new AtomicLong(0);
+ private static final AtomicLong classNumber = new AtomicLong(0);
public <X extends T> TemplateClassDefinition(Class<T> iface, Class<X> template) {
super();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 8c1487c..94fcac5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -72,6 +72,7 @@ public class Wrapper {
public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
Preconditions.checkState(!endpointsAssigned);
+ Preconditions.checkNotNull(endpoint);
EndpointAffinity ea = endpointAffinity.get(endpoint);
if (ea == null) {
ea = new EndpointAffinity(endpoint);
@@ -149,6 +150,7 @@ public class Wrapper {
int start = ThreadLocalRandom.current().nextInt(div);
// round robin with random start.
for (int i = start; i < start + width; i++) {
+ Preconditions.checkNotNull(all.get(i % div));
endpoints.add(all.get(i % div));
}
} else {
@@ -156,6 +158,7 @@ public class Wrapper {
Collections.sort(values);
values = Lists.reverse(values);
for (int i = 0; i < width; i++) {
+ Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint());
endpoints.add(values.get(i%values.size()).getEndpoint());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
new file mode 100644
index 0000000..8809706
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.util.*;
+
+public class HiveInputReader {
+ public static void main(String args[]) throws Exception {
+/*
+ String[] columnNames = {"n_nationkey", "n_name", "n_regionkey", "n_comment"};
+ String[] columnTypes = {"bigint", "string", "bigint", "string"};
+
+ List<FieldSchema> cols = Lists.newArrayList();
+
+ for (int i = 0; i < columnNames.length; i++) {
+ cols.add(new FieldSchema(columnNames[i], columnTypes[i], null));
+ }
+ String location = "file:///tmp/nation_s";
+ String inputFormat = TextInputFormat.class.getCanonicalName();
+ String serdeLib = LazySimpleSerDe.class.getCanonicalName();
+// String inputFormat = HiveHBaseTableInputFormat.class.getCanonicalName();
+// String serdeLib = HBaseSerDe.class.getCanonicalName();
+ Map<String, String> serdeParams = new HashMap();
+// serdeParams.put("serialization.format", "1");
+// serdeParams.put("hbase.columns.mapping", ":key,f:name,f:regionkey,f:comment");
+ serdeParams.put("serialization.format", "|");
+ serdeParams.put("field.delim", "|");
+
+
+ Map<String, String> tableParams = new HashMap();
+ tableParams.put("hbase.table.name", "nation");
+ SerDeInfo serDeInfo = new SerDeInfo(null, serdeLib, serdeParams);
+ StorageDescriptor storageDescriptor = new StorageDescriptor(cols, location, inputFormat, null, false, -1, serDeInfo, null, null, null);
+ Table table = new Table("table", "default", "sphillips", 0, 0, 0, storageDescriptor, new ArrayList<FieldSchema>(), tableParams, null, null, "MANAGED_TABLE");
+ Properties properties = MetaStoreUtils.getTableMetadata(table);
+ */
+
+ HiveConf conf = new HiveConf();
+ conf.set("hive.metastore.uris", "thrift://10.10.31.51:9083");
+ HiveMetaStoreClient client = new HiveMetaStoreClient(conf);
+ Table table = client.getTable("default", "nation");
+ Properties properties = MetaStoreUtils.getTableMetadata(table);
+
+ Path path = new Path(table.getSd().getLocation());
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+// job.set("hbase.zookeeper.quorum", "10.10.31.51");
+// job.set("hbase.zookeeper.property.clientPort", "5181");
+ InputFormat f = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(f.getClass());
+ FileInputFormat.addInputPath(job, path);
+ InputFormat format = job.getInputFormat();
+ SerDe serde = (SerDe) Class.forName(table.getSd().getSerdeInfo().getSerializationLib()).getConstructor().newInstance();
+ serde.initialize(job, properties);
+ ObjectInspector inspector = serde.getObjectInspector();
+ ObjectInspector.Category cat = inspector.getCategory();
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(inspector);
+ List<String> columns = null;
+ List<TypeInfo> colTypes = null;
+ List<ObjectInspector> fieldObjectInspectors = Lists.newArrayList();
+
+ switch(typeInfo.getCategory()) {
+ case STRUCT:
+ columns = ((StructTypeInfo) typeInfo).getAllStructFieldNames();
+ colTypes = ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos();
+ for (int i = 0; i < columns.size(); i++) {
+ System.out.print(columns.get(i));
+ System.out.print(" ");
+ System.out.print(colTypes.get(i));
+ }
+ System.out.println("");
+ for (StructField field : ((StructObjectInspector)inspector).getAllStructFieldRefs()) {
+ fieldObjectInspectors.add(field.getFieldObjectInspector());
+ }
+ }
+
+ for (InputSplit split : format.getSplits(job, 1)) {
+ String encoded = serializeInputSplit(split);
+ System.out.println(encoded);
+ InputSplit newSplit = deserializeInputSplit(encoded, split.getClass().getCanonicalName());
+ System.out.print("Length: " + newSplit.getLength() + " ");
+ System.out.print("Locations: ");
+ for (String loc : newSplit.getLocations()) System.out.print(loc + " " );
+ System.out.println();
+ }
+
+ for (InputSplit split : format.getSplits(job, 1)) {
+ RecordReader reader = format.getRecordReader(split, job, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ int count = 0;
+ while (reader.next(key, value)) {
+ List<Object> values = ((StructObjectInspector) inspector).getStructFieldsDataAsList(serde.deserialize((Writable) value));
+ StructObjectInspector sInsp = (StructObjectInspector) inspector;
+ Object obj = sInsp.getStructFieldData(serde.deserialize((Writable) value) , sInsp.getStructFieldRef("n_name"));
+ System.out.println(obj);
+ /*
+ for (Object obj : values) {
+ PrimitiveObjectInspector.PrimitiveCategory pCat = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveCategory();
+ Object pObj = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveJavaObject(obj);
+ System.out.print(pObj + " ");
+ }
+ */
+ System.out.println("");
+ }
+ }
+ }
+
+ public static String serializeInputSplit(InputSplit split) throws IOException {
+ ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
+ split.write(byteArrayOutputStream);
+ return Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+ }
+
+ public static InputSplit deserializeInputSplit(String base64, String className) throws Exception {
+ InputSplit split;
+ if (Class.forName(className) == FileSplit.class) {
+ split = new FileSplit((Path) null, 0, 0, (String[])null);
+ } else {
+ split = (InputSplit) Class.forName(className).getConstructor().newInstance();
+ }
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
+ split.readFields(byteArrayDataInput);
+ return split;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 41a4d3d..6211e21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -17,19 +17,47 @@
*/
package org.apache.drill.exec.store.hive;
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.ReadEntry;
import org.apache.drill.exec.physical.base.Size;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
public class HiveReadEntry implements ReadEntry {
- private final HiveConf conf;
- private final String table;
- private Size size;
- public HiveReadEntry(HiveConf conf, String table) {
- this.conf = conf;
+ @JsonProperty("table")
+ public HiveTable table;
+ @JsonProperty("partitions")
+ public List<HiveTable.HivePartition> partitions;
+
+ @JsonIgnore
+ private List<Partition> partitionsUnwrapped = Lists.newArrayList();
+
+ @JsonCreator
+ public HiveReadEntry(@JsonProperty("table") HiveTable table, @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) {
this.table = table;
+ this.partitions = partitions;
+ if (partitions != null) {
+ for(HiveTable.HivePartition part : partitions) {
+ partitionsUnwrapped.add(part.getPartition());
+ }
+ }
+ }
+
+ @JsonIgnore
+ public Table getTable() {
+ return table.getTable();
+ }
+
+ @JsonIgnore
+ public List<Partition> getPartitions() {
+ return partitionsUnwrapped;
}
@Override
@@ -40,11 +68,10 @@ public class HiveReadEntry implements ReadEntry {
@Override
public Size getSize() {
- if (size != null) {
- // TODO: contact the metastore and find the size of the data in table
- size = new Size(1, 1);
- }
+ // TODO: contact the metastore and find the size of the data in table
+ Size size = new Size(1, 1);
return size;
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
new file mode 100644
index 0000000..ef7266c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class HiveReadEntryOld implements ReadEntry {
+ private final HiveConf conf;
+ private final String table;
+ private Size size;
+
+ public HiveReadEntryOld(HiveConf conf, String table) {
+ this.conf = conf;
+ this.table = table;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ // TODO: need to come up with way to calculate the cost for Hive tables
+ return new OperatorCost(1, 1, 2, 2);
+ }
+
+ @Override
+ public Size getSize() {
+ if (size != null) {
+ // TODO: contact the metastore and find the size of the data in table
+ size = new Size(1, 1);
+ }
+
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
new file mode 100644
index 0000000..0a31a12
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class HiveRecordReader implements RecordReader {
+
+ protected Table table;
+ protected Partition partition;
+ protected InputSplit inputSplit;
+ protected FragmentContext context;
+ protected List<FieldReference> columns;
+ protected List<String> columnNames;
+ protected List<String> partitionNames = Lists.newArrayList();
+ protected List<String> selectedPartitionNames = Lists.newArrayList();
+ protected List<String> selectedPartitionTypes = Lists.newArrayList();
+ protected List<String> tableColumns;
+ protected SerDe serde;
+ protected StructObjectInspector sInspector;
+ protected List<PrimitiveObjectInspector> fieldInspectors = Lists.newArrayList();
+ protected List<PrimitiveCategory> primitiveCategories = Lists.newArrayList();
+ protected Object key, value;
+ protected org.apache.hadoop.mapred.RecordReader reader;
+ protected List<ValueVector> vectors = Lists.newArrayList();
+ protected List<ValueVector> pVectors = Lists.newArrayList();
+ protected Object redoRecord;
+ List<Object> partitionValues = Lists.newArrayList();
+
+ protected static final int TARGET_RECORD_COUNT = 4000;
+
+ public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException {
+ this.table = table;
+ this.partition = partition;
+ this.inputSplit = inputSplit;
+ this.context = context;
+ this.columns = columns;
+ init();
+ }
+
+ private void init() throws ExecutionSetupException {
+ Properties properties;
+ JobConf job = new JobConf();
+ if (partition != null) {
+ properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ } else {
+ properties = MetaStoreUtils.getTableMetadata(table);
+ }
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ InputFormat format;
+ String sLib = (partition == null) ? table.getSd().getSerdeInfo().getSerializationLib() : partition.getSd().getSerdeInfo().getSerializationLib();
+ String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
+ try {
+ format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
+ Class c = Class.forName(sLib);
+ serde = (SerDe) c.getConstructor().newInstance();
+ serde.initialize(job, properties);
+ } catch (ReflectiveOperationException | SerDeException e) {
+ throw new ExecutionSetupException("Unable to instantiate InputFormat", e);
+ }
+ job.setInputFormat(format.getClass());
+
+ if (partition != null) {
+ List<FieldSchema> partitionKeys = table.getPartitionKeys();
+ for (FieldSchema field : partitionKeys) {
+ partitionNames.add(field.getName());
+ }
+ }
+
+ try {
+ ObjectInspector oi = serde.getObjectInspector();
+ if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+ }
+ sInspector = (StructObjectInspector) oi;
+ StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector);
+ if (columns == null) {
+ columnNames = sTypeInfo.getAllStructFieldNames();
+ tableColumns = columnNames;
+ } else {
+ tableColumns = sTypeInfo.getAllStructFieldNames();
+ List<Integer> columnIds = Lists.newArrayList();
+ columnNames = Lists.newArrayList();
+ for (FieldReference field : columns) {
+ String columnName = field.getPath().toString();
+ if (!tableColumns.contains(columnName)) {
+ if (partition != null && partitionNames.contains(columnName)) {
+ selectedPartitionNames.add(columnName);
+ } else {
+ throw new ExecutionSetupException(String.format("Column %s does not exist", columnName));
+ }
+ } else {
+ columnIds.add(tableColumns.indexOf(columnName));
+ columnNames.add(columnName);
+ }
+ }
+ ColumnProjectionUtils.appendReadColumnIDs(job, columnIds);
+ ColumnProjectionUtils.appendReadColumnNames(job, columnNames);
+ }
+ for (String columnName : columnNames) {
+ ObjectInspector poi = sInspector.getStructFieldRef(columnName).getFieldObjectInspector();
+ if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UnsupportedOperationException(String.format("%s type not supported", poi.getCategory()));
+ }
+ PrimitiveObjectInspector pInspector = (PrimitiveObjectInspector) poi;
+ fieldInspectors.add(pInspector);
+ primitiveCategories.add(pInspector.getPrimitiveCategory());
+ }
+
+ if (columns == null) {
+ selectedPartitionNames = partitionNames;
+ }
+
+ if (partition != null) {
+ for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+ FieldSchema field = table.getPartitionKeys().get(i);
+ if (selectedPartitionNames.contains(field.getName())) {
+ selectedPartitionTypes.add(field.getType());
+ partitionValues.add(convertPartitionType(field.getType(), partition.getValues().get(i)));
+ }
+ }
+ }
+ } catch (SerDeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ try {
+ reader = format.getRecordReader(inputSplit, job, Reporter.NULL);
+ } catch (IOException e) {
+ throw new ExecutionSetupException("Failed to get Recordreader", e);
+ }
+ key = reader.createKey();
+ value = reader.createValue();
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ output.removeAllFields();
+ try {
+ for (int i = 0; i < columnNames.size(); i++) {
+ PrimitiveCategory pCat = primitiveCategories.get(i);
+ MaterializedField field = MaterializedField.create(new SchemaPath(columnNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(pCat));
+ ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+ vectors.add(vv);
+ output.addField(vv);
+ }
+ for (int i = 0; i < selectedPartitionNames.size(); i++) {
+ String type = selectedPartitionTypes.get(i);
+ MaterializedField field = MaterializedField.create(new SchemaPath(selectedPartitionNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(type));
+ ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+ pVectors.add(vv);
+ output.addField(vv);
+ }
+ output.setNewSchema();
+ } catch(SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ protected void populatePartitionVectors(int recordCount) {
+ for (int i = 0; i < pVectors.size(); i++) {
+ int size = 50;
+ ValueVector vector = pVectors.get(i);
+ Object val = partitionValues.get(i);
+ if (selectedPartitionTypes.get(i).equals("string") || selectedPartitionTypes.get(i).equals("binary")) {
+ size = ((byte[]) partitionValues.get(i)).length;
+ }
+ VectorAllocator.getAllocator(vector, size).alloc(recordCount);
+ switch(selectedPartitionTypes.get(i)) {
+ case "boolean": {
+ BitVector v = (BitVector) vector;
+ Boolean value = (Boolean) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value ? 1 : 0);
+ }
+ break;
+ }
+ case "tinyint": {
+ TinyIntVector v = (TinyIntVector) vector;
+ byte value = (byte) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "double": {
+ Float8Vector v = (Float8Vector) vector;
+ double value = (double) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "float": {
+ Float4Vector v = (Float4Vector) vector;
+ float value = (float) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "int": {
+ IntVector v = (IntVector) vector;
+ int value = (int) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "bigint": {
+ BigIntVector v = (BigIntVector) vector;
+ long value = (long) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "smallint": {
+ SmallIntVector v = (SmallIntVector) vector;
+ short value = (short) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "string": {
+ VarCharVector v = (VarCharVector) vector;
+ byte[] value = (byte[]) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ default:
+ throw new UnsupportedOperationException("Could not determine type: " + selectedPartitionTypes.get(i));
+ }
+ vector.getMutator().setValueCount(recordCount);
+ }
+ }
+
+ private Object convertPartitionType(String type, String value) {
+ switch (type) {
+ case "boolean":
+ return Boolean.parseBoolean(value);
+ case "tinyint":
+ return Byte.parseByte(value);
+ case "double":
+ return Double.parseDouble(value);
+ case "float":
+ return Float.parseFloat(value);
+ case "int":
+ return Integer.parseInt(value);
+ case "bigint":
+ return Long.parseLong(value);
+ case "smallint":
+ return Short.parseShort(value);
+ case "string":
+ return value.getBytes();
+ default:
+ throw new UnsupportedOperationException("Could not determine type: " + type);
+ }
+ }
+
+ public static TypeProtos.MajorType getMajorType(String type) {
+ switch(type) {
+ case "binary":
+ return Types.required(TypeProtos.MinorType.VARBINARY);
+ case "boolean":
+ return Types.required(TypeProtos.MinorType.BIT);
+ case "tinyint":
+ return Types.required(TypeProtos.MinorType.TINYINT);
+ case "decimal":
+ return Types.required(TypeProtos.MinorType.DECIMAL16);
+ case "double":
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ case "float":
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case "int":
+ return Types.required(TypeProtos.MinorType.INT);
+ case "bigint":
+ return Types.required(TypeProtos.MinorType.BIGINT);
+ case "smallint":
+ return Types.required(TypeProtos.MinorType.SMALLINT);
+ case "string":
+ return Types.required(TypeProtos.MinorType.VARCHAR);
+ case "varchar":
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type: " + type);
+ }
+ }
+
+ public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) {
+ switch(pCat) {
+ case BINARY:
+ return Types.required(TypeProtos.MinorType.VARBINARY);
+ case BOOLEAN:
+ return Types.required(TypeProtos.MinorType.BIT);
+ case BYTE:
+ return Types.required(TypeProtos.MinorType.TINYINT);
+ case DECIMAL:
+ return Types.required(TypeProtos.MinorType.DECIMAL16);
+ case DOUBLE:
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ case FLOAT:
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case INT:
+ return Types.required(TypeProtos.MinorType.INT);
+ case LONG:
+ return Types.required(TypeProtos.MinorType.BIGINT);
+ case SHORT:
+ return Types.required(TypeProtos.MinorType.SMALLINT);
+ case STRING:
+ return Types.required(TypeProtos.MinorType.VARCHAR);
+ case TIMESTAMP:
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type");
+ }
+ }
+
+ public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, Object fieldValue) {
+ switch(pCat) {
+ case BINARY:
+ ((VarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue);
+ case BOOLEAN:
+ boolean isSet = (boolean) fieldValue;
+ return ((BitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 );
+ case BYTE:
+ return ((TinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue);
+ case DECIMAL:
+ throw new UnsupportedOperationException();
+ case DOUBLE:
+ return ((Float8Vector) vv).getMutator().setSafe(index, (double) fieldValue);
+ case FLOAT:
+ return ((Float4Vector) vv).getMutator().setSafe(index, (float) fieldValue);
+ case INT:
+ return ((IntVector) vv).getMutator().setSafe(index, (int) fieldValue);
+ case LONG:
+ return ((BigIntVector) vv).getMutator().setSafe(index, (long) fieldValue);
+ case SHORT:
+ return ((SmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue);
+ case STRING:
+ int len = ((Text) fieldValue).getLength();
+ byte[] bytes = ((Text) fieldValue).getBytes();
+ return ((VarCharVector) vv).getMutator().setSafe(index, bytes, 0, len);
+ case TIMESTAMP:
+ throw new UnsupportedOperationException();
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type");
+ }
+ }
+
+ @Override
+ public int next() {
+ for (ValueVector vv : vectors) {
+ VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT);
+ }
+ try {
+ int recordCount = 0;
+ if (redoRecord != null) {
+ Object deSerializedValue = serde.deserialize((Writable) redoRecord);
+ for (int i = 0; i < columnNames.size(); i++) {
+ Object obj;
+ String columnName = columnNames.get(i);
+ if (primitiveCategories.get(i) == PrimitiveCategory.STRING) {
+ obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ } else {
+ obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ }
+ boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj);
+ if (!success) {
+ throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnName));
+ }
+ }
+ redoRecord = null;
+ recordCount++;
+ }
+ while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+ Object deSerializedValue = serde.deserialize((Writable) value);
+ for (int i = 0; i < columnNames.size(); i++) {
+ Object obj;
+ String columnName = columnNames.get(i);
+ if (primitiveCategories.get(i) == PrimitiveCategory.STRING) {
+ obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ } else {
+ obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ }
+ boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj);
+ if (!success) {
+ redoRecord = value;
+ if (partition != null) populatePartitionVectors(recordCount);
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(recordCount);
+ }
+ if (partition != null) populatePartitionVectors(recordCount);
+ return recordCount;
+ }
+ }
+ recordCount++;
+ }
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(recordCount);
+ }
+ if (partition != null) populatePartitionVectors(recordCount);
+ return recordCount;
+ } catch (IOException | SerDeException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
new file mode 100644
index 0000000..bc2a16b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.*;
+
+@JsonTypeName("hive-scan")
+public class HiveScan extends AbstractGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+
+ @JsonProperty("hive-table")
+ public HiveReadEntry hiveReadEntry;
+ @JsonIgnore
+ private Table table;
+ @JsonIgnore
+ private List<InputSplit> inputSplits = Lists.newArrayList();
+ @JsonIgnore
+ public HiveStorageEngine storageEngine;
+ @JsonProperty("storageengine")
+ public HiveStorageEngineConfig engineConfig;
+
+ @JsonIgnore
+ public List<Partition> partitions;
+ @JsonIgnore
+ private Collection<DrillbitEndpoint> endpoints;
+
+ @JsonProperty("columns")
+ public List<FieldReference> columns;
+
+ @JsonIgnore
+ List<List<InputSplit>> mappings;
+
+ @JsonIgnore
+ Map<InputSplit, Partition> partitionMap = new HashMap();
+
+ @JsonCreator
+ public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storageengine") HiveStorageEngineConfig config,
+ @JsonProperty("columns") List<FieldReference> columns,
+ @JacksonInject StorageEngineRegistry engineRegistry) throws ExecutionSetupException {
+ this.hiveReadEntry = hiveReadEntry;
+ this.table = hiveReadEntry.getTable();
+ this.engineConfig = config;
+ this.storageEngine = (HiveStorageEngine) engineRegistry.getEngine(config);
+ this.columns = columns;
+ this.partitions = hiveReadEntry.getPartitions();
+ getSplits();
+ endpoints = storageEngine.getContext().getBits();
+ }
+
+ public HiveScan(HiveReadEntry hiveReadEntry, HiveStorageEngine storageEngine, List<FieldReference> columns) throws ExecutionSetupException {
+ this.table = hiveReadEntry.getTable();
+ this.hiveReadEntry = hiveReadEntry;
+ this.columns = columns;
+ this.partitions = hiveReadEntry.getPartitions();
+ getSplits();
+ endpoints = storageEngine.getContext().getBits();
+ this.engineConfig = storageEngine.getConfig();
+ }
+
+ public List<FieldReference> getColumns() {
+ return columns;
+ }
+
+ private void getSplits() throws ExecutionSetupException {
+ try {
+ if (partitions == null || partitions.size() == 0) {
+ Properties properties = MetaStoreUtils.getTableMetadata(table);
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ InputFormat format = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(format.getClass());
+ Path path = new Path(table.getSd().getLocation());
+ FileInputFormat.addInputPath(job, path);
+ format = job.getInputFormat();
+ for (InputSplit split : format.getSplits(job, 1)) {
+ inputSplits.add(split);
+ }
+ for (InputSplit split : inputSplits) {
+ partitionMap.put(split, null);
+ }
+ } else {
+ for (Partition partition : partitions) {
+ Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ InputFormat format = (InputFormat) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(format.getClass());
+ FileInputFormat.addInputPath(job, new Path(partition.getSd().getLocation()));
+ format = job.getInputFormat();
+ InputSplit[] splits = format.getSplits(job,1);
+ for (InputSplit split : splits) {
+ inputSplits.add(split);
+ partitionMap.put(split, partition);
+ }
+ }
+ }
+ } catch (ReflectiveOperationException | IOException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+ mappings = Lists.newArrayList();
+ for (int i = 0; i < endpoints.size(); i++) {
+ mappings.add(new ArrayList<InputSplit>());
+ }
+ int count = endpoints.size();
+ for (int i = 0; i < inputSplits.size(); i++) {
+ mappings.get(i % count).add(inputSplits.get(i));
+ }
+ }
+
+ public static String serializeInputSplit(InputSplit split) throws IOException {
+ ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
+ split.write(byteArrayOutputStream);
+ String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+ logger.debug("Encoded split string for split {} : {}", split, encoded);
+ return encoded;
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ try {
+ List<InputSplit> splits = mappings.get(minorFragmentId);
+ List<Partition> parts = Lists.newArrayList();
+ List<String> encodedInputSplits = Lists.newArrayList();
+ List<String> splitTypes = Lists.newArrayList();
+ for (InputSplit split : splits) {
+ parts.add(partitionMap.get(split));
+ encodedInputSplits.add(serializeInputSplit(split));
+ splitTypes.add(split.getClass().getCanonicalName());
+ }
+ if (parts.contains(null)) parts = null;
+ return new HiveSubScan(table, parts, encodedInputSplits, splitTypes, columns);
+ } catch (IOException | ReflectiveOperationException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return inputSplits.size();
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap();
+ for (DrillbitEndpoint endpoint : endpoints) {
+ endpointMap.put(endpoint.getAddress(), endpoint);
+ logger.debug("endpoing address: {}", endpoint.getAddress());
+ }
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap();
+ try {
+ long totalSize = 0;
+ for (InputSplit split : inputSplits) {
+ totalSize += Math.max(1, split.getLength());
+ }
+ for (InputSplit split : inputSplits) {
+ float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+ for (String loc : split.getLocations()) {
+ logger.debug("split location: {}", loc);
+ DrillbitEndpoint endpoint = endpointMap.get(loc);
+ if (endpoint != null) {
+ if (affinityMap.containsKey(endpoint)) {
+ affinityMap.get(endpoint).addAffinity(affinity);
+ } else {
+ affinityMap.put(endpoint, new EndpointAffinity(endpoint, affinity));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ for (DrillbitEndpoint ep : affinityMap.keySet()) {
+ Preconditions.checkNotNull(ep);
+ }
+ for (EndpointAffinity a : affinityMap.values()) {
+ Preconditions.checkNotNull(a.getEndpoint());
+ }
+ return Lists.newArrayList(affinityMap.values());
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - this is wrong, need to populate correctly
+ return new Size(10,10);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return new HiveScan(hiveReadEntry, storageEngine, columns);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
new file mode 100644
index 0000000..b155661
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.util.List;
+
+public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+ List<RecordReader> readers = Lists.newArrayList();
+ Table table = config.table;
+ List<InputSplit> splits = config.getInputSplits();
+ List<Partition> partitions = config.partitions;
+ if (partitions == null || partitions.size() == 0) {
+ if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
+ table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
+ config.getColumns() != null) {
+ for (InputSplit split : splits) {
+ readers.add(new HiveTextRecordReader(table, null, split, config.getColumns(), context));
+ }
+ } else {
+ for (InputSplit split : splits) {
+ readers.add(new HiveRecordReader(table, null, split, config.getColumns(), context));
+ }
+ }
+ } else {
+ int i = 0;
+ if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
+ table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
+ config.getColumns() != null) {
+ for (InputSplit split : splits) {
+ readers.add(new HiveTextRecordReader(table, partitions.get(i++), split, config.getColumns(), context));
+ }
+ } else {
+ for (InputSplit split : splits) {
+ readers.add(new HiveRecordReader(config.table, partitions.get(i++), split, config.getColumns(), context));
+ }
+ }
+ }
+ return new ScanBatch(context, readers.iterator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
deleted file mode 100644
index 38ec007..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.SchemaProvider;
-
-import com.beust.jcommander.internal.Lists;
-
-public class HiveSchemaProvider implements SchemaProvider{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaProvider.class);
-
- final HiveStorageEngineConfig configuration;
-
- public HiveSchemaProvider(HiveStorageEngineConfig configuration, DrillConfig config){
- this.configuration = configuration;
- }
-
- @Override
- public Object getSelectionBaseOnName(String tableName) {
- HiveReadEntry re = new HiveReadEntry(configuration.getHiveConf(), tableName);
- return Lists.newArrayList(re);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
new file mode 100644
index 0000000..0f6f3bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HiveStorageEngine extends AbstractStorageEngine {
+
+ private HiveStorageEngineConfig config;
+ private HiveConf hiveConf;
+ private HiveSchemaProvider schemaProvider;
+ static private DrillbitContext context;
+
+ public HiveStorageEngine(HiveStorageEngineConfig config, DrillbitContext context) throws ExecutionSetupException {
+ this.config = config;
+ this.context = context;
+ this.hiveConf = config.getHiveConf();
+ }
+
+ public HiveStorageEngineConfig getConfig() {
+ return config;
+ }
+
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public HiveScan getPhysicalScan(Scan scan) throws IOException {
+ HiveReadEntry hiveReadEntry = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
+ try {
+ List<Partition> partitions = getSchemaProvider().getPartitions(hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
+ return new HiveScan(hiveReadEntry, this, null);
+ } catch (ExecutionSetupException | TException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public HiveSchemaProvider getSchemaProvider() {
+ try {
+ if (schemaProvider == null) {
+ schemaProvider = new HiveSchemaProvider(config, context.getConfig());
+ }
+ return schemaProvider;
+ } catch (ExecutionSetupException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ List<String> getPartitions(String dbName, String tableName) throws TException {
+ List<Partition> partitions = getSchemaProvider().getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+ List<String> partitionLocations = Lists.newArrayList();
+ if (partitions == null) return null;
+ for (Partition part : partitions) {
+ partitionLocations.add(part.getSd().getLocation());
+ }
+ return partitionLocations;
+ }
+
+ public static class HiveEntry implements ReadEntry {
+
+ private Table table;
+
+ public HiveEntry(Table table) {
+ this.table = table;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+ "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+ }
+
+ @Override
+ public Size getSize() {
+ throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+ "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+ }
+ }
+
+ public static class HiveSchemaProvider implements SchemaProvider {
+
+ private HiveConf hiveConf;
+ private HiveMetaStoreClient metaClient;
+
+ public HiveSchemaProvider(HiveStorageEngineConfig config, DrillConfig dConfig) throws ExecutionSetupException {
+ hiveConf = config.getHiveConf();
+ }
+
+ public HiveMetaStoreClient getMetaClient() throws MetaException {
+ if (metaClient == null) {
+ metaClient = new HiveMetaStoreClient(hiveConf);
+ }
+ return metaClient;
+ }
+
+ public Table getTable(String dbName, String tableName) throws TException {
+ HiveMetaStoreClient mClient = getMetaClient();
+ try {
+ return mClient.getTable(dbName, tableName);
+ }catch (NoSuchObjectException e) {
+ logger.error("Database: {} table: {} not found", dbName, tableName);
+ throw new RuntimeException(e);
+ } catch (TException e) {
+ mClient.reconnect();
+ return mClient.getTable(dbName, tableName);
+ }
+ }
+
+ List<Partition> getPartitions(String dbName, String tableName) throws TException {
+ HiveMetaStoreClient mClient = getMetaClient();
+ List<Partition> partitions;
+ try {
+ partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+ } catch (TException e) {
+ mClient.reconnect();
+ partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+ }
+ return partitions;
+ }
+
+ @Override
+ public HiveReadEntry getSelectionBaseOnName(String name) {
+ String[] dbNameTableName = name.split("\\.");
+ String dbName;
+ String t;
+ if (dbNameTableName.length > 1) {
+ dbName = dbNameTableName[0];
+ t = dbNameTableName[1];
+ } else {
+ dbName = "default";
+ t = name;
+ }
+
+ try {
+ Table table = getTable(dbName, t);
+ List<Partition> partitions = getPartitions(dbName, t);
+ List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
+ for(Partition part : partitions) {
+ hivePartitions.add(new HiveTable.HivePartition(part));
+ }
+ if (hivePartitions.size() == 0) hivePartitions = null;
+ return new HiveReadEntry(new HiveTable(table), hivePartitions);
+ } catch (NoSuchObjectException e) {
+ throw new DrillRuntimeException(e);
+ } catch (TException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
index 0a2c5de..91fec3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.hive;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.logical.StorageEngineConfigBase;
@@ -27,9 +28,12 @@ import java.util.Map;
@JsonTypeName("hive")
public class HiveStorageEngineConfig extends StorageEngineConfigBase {
- private Map<String, String> configProps;
+ @JsonProperty
+ public Map<String, String> configProps;
+ @JsonIgnore
private HiveConf hiveConf;
+ @JsonIgnore
public HiveConf getHiveConf() {
if (hiveConf == null) {
hiveConf = new HiveConf();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
new file mode 100644
index 0000000..8ff7c82
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class HiveSubScan extends AbstractBase implements SubScan {
+
+ @JsonProperty("splits")
+ public List<String> encodedSplits;
+ @JsonProperty("hive-table")
+ public Table table;
+ @JsonProperty("partitions")
+ public List<Partition> partitions;
+ @JsonIgnore
+ private List<InputSplit> inputSplits = Lists.newArrayList();
+ @JsonProperty("splitClass")
+ public List<String> splitClasses;
+
+ @JsonProperty("columns")
+ public List<FieldReference> columns;
+
+ @JsonCreator
+ public HiveSubScan(@JsonProperty("hive-table") Table table,
+ @JsonProperty("partition") List<Partition> partitions,
+ @JsonProperty("splits") List<String> encodedSplits,
+ @JsonProperty("splitClasses") List<String> splitClasses,
+ @JsonProperty("columns") List<FieldReference> columns) throws IOException, ReflectiveOperationException {
+ this.table = table;
+ this.partitions = partitions;
+ this.encodedSplits = encodedSplits;
+ this.splitClasses = splitClasses;
+ this.columns = columns;
+
+ for (int i = 0; i < encodedSplits.size(); i++) {
+ inputSplits.add(deserializeInputSplit(encodedSplits.get(i), splitClasses.get(i)));
+ }
+ }
+
+ public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{
+ InputSplit split;
+ if (Class.forName(className) == FileSplit.class) {
+ split = new FileSplit((Path) null, 0, 0, (String[])null);
+ } else {
+ split = (InputSplit) Class.forName(className).getConstructor().newInstance();
+ }
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
+ split.readFields(byteArrayDataInput);
+ return split;
+ }
+
+ public List<FieldReference> getColumns() {
+ return columns;
+ }
+
+ public List<InputSplit> getInputSplits() {
+ return inputSplits;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - this is wrong, need to populate correctly
+ return new Size(10,10);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ try {
+ return new HiveSubScan(table, partitions, encodedSplits, splitClasses, columns);
+ } catch (IOException | ReflectiveOperationException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/cdf46fd3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
new file mode 100644
index 0000000..3858804
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.hadoop.hive.metastore.api.*;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("table")
+public class HiveTable {
+
+ @JsonIgnore
+ private Table table;
+
+ @JsonProperty
+ public String tableName;
+ @JsonProperty
+ public String dbName;
+ @JsonProperty
+ public String owner;
+ @JsonProperty
+ public int createTime;
+ @JsonProperty
+ public int lastAccessTime;
+ @JsonProperty
+ public int retention;
+ @JsonProperty
+ public StorageDescriptorWrapper sd;
+ @JsonProperty
+ public List<FieldSchemaWrapper> partitionKeys;
+ @JsonProperty
+ public Map<String,String> parameters;
+ @JsonProperty
+ public String viewOriginalText;
+ @JsonProperty
+ public String viewExpandedText;
+ @JsonProperty
+ public String tableType;
+
+ @JsonCreator
+ public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime,
+ @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
+ @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
+ @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType
+ ) {
+ this.tableName = tableName;
+ this.dbName = dbName;
+ this.owner = owner;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.retention = retention;
+ this.sd = sd;
+ this.partitionKeys = partitionKeys;
+ this.parameters = parameters;
+ this.viewOriginalText = viewOriginalText;
+ this.viewExpandedText = viewExpandedText;
+ this.tableType = tableType;
+
+ List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper w : partitionKeys) partitionKeysUnwrapped.add(w.getFieldSchema());
+ StorageDescriptor sdUnwrapped = sd.getSd();
+ this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
+ parameters, viewOriginalText, viewExpandedText, tableType);
+ }
+
+ public HiveTable(Table table) {
+ if (table == null) return;
+ this.table = table;
+ this.tableName = table.getTableName();
+ this.dbName = table.getDbName();
+ this.owner = table.getOwner();
+ this.createTime = table.getCreateTime();
+ this.lastAccessTime = table.getLastAccessTime();
+ this.retention = table.getRetention();
+ this.sd = new StorageDescriptorWrapper(table.getSd());
+ this.partitionKeys = Lists.newArrayList();
+ for (FieldSchema f : table.getPartitionKeys()) this.partitionKeys.add(new FieldSchemaWrapper(f));
+ this.parameters = table.getParameters();
+ this.viewOriginalText = table.getViewOriginalText();
+ this.viewExpandedText = table.getViewExpandedText();
+ this.tableType = table.getTableType();
+ }
+
+ @JsonIgnore
+ public Table getTable() {
+ return table;
+ }
+
+ public static class HivePartition {
+
+ @JsonIgnore
+ private Partition partition;
+
+ @JsonProperty
+ public List<String> values;
+ @JsonProperty
+ public String tableName;
+ @JsonProperty
+ public String dbName;
+ @JsonProperty
+ public int createTime;
+ @JsonProperty
+ public int lastAccessTime;
+ @JsonProperty
+ public StorageDescriptorWrapper sd;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
+ @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd,
+ @JsonProperty("parameters") Map<String, String> parameters
+ ) {
+ this.values = values;
+ this.tableName = tableName;
+ this.dbName = dbName;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.sd = sd;
+ this.parameters = parameters;
+
+ StorageDescriptor sdUnwrapped = sd.getSd();
+ this.partition = new org.apache.hadoop.hive.metastore.api.Partition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters);
+ }
+
+ public HivePartition(Partition partition) {
+ if (partition == null) return;
+ this.partition = partition;
+ this.values = partition.getValues();
+ this.tableName = partition.getTableName();
+ this.dbName = partition.getDbName();
+ this.createTime = partition.getCreateTime();
+ this.lastAccessTime = partition.getLastAccessTime();
+ this.sd = new StorageDescriptorWrapper(partition.getSd());
+ this.parameters = partition.getParameters();
+ }
+
+ @JsonIgnore
+ public Partition getPartition() {
+ return partition;
+ }
+ }
+
+ public static class StorageDescriptorWrapper {
+ @JsonIgnore
+ private StorageDescriptor sd;
+ @JsonProperty
+ public List<FieldSchemaWrapper> cols;
+ @JsonProperty
+ public String location;
+ @JsonProperty
+ public String inputFormat;
+ @JsonProperty
+ public String outputFormat;
+ @JsonProperty
+ public boolean compressed;
+ @JsonProperty
+ public int numBuckets;
+ @JsonProperty
+ public SerDeInfoWrapper serDeInfo;
+// @JsonProperty
+// public List<String> bucketCols;
+ @JsonProperty
+ public List<OrderWrapper> sortCols;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public StorageDescriptorWrapper(@JsonProperty("cols") List<FieldSchemaWrapper> cols, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
+ @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
+ @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo, @JsonProperty("sortCols") List<OrderWrapper> sortCols,
+ @JsonProperty("parameters") Map<String,String> parameters) {
+ this.cols = cols;
+ this.location = location;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.compressed = compressed;
+ this.numBuckets = numBuckets;
+ this.serDeInfo = serDeInfo;
+// this.bucketCols = bucketCols;
+ this.sortCols = sortCols;
+ this.parameters = parameters;
+ List<FieldSchema> colsUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper w: cols) colsUnwrapped.add(w.getFieldSchema());
+ SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
+ List<Order> sortColsUnwrapped = Lists.newArrayList();
+ for (OrderWrapper w : sortCols) sortColsUnwrapped.add(w.getOrder());
+// this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
+// bucketCols, sortColsUnwrapped, parameters);
+ this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
+ null, sortColsUnwrapped, parameters);
+ }
+
+ public StorageDescriptorWrapper(StorageDescriptor sd) {
+ this.sd = sd;
+ this.cols = Lists.newArrayList();
+ for (FieldSchema f : sd.getCols()) this.cols.add(new FieldSchemaWrapper(f));
+ this.location = sd.getLocation();
+ this.inputFormat = sd.getInputFormat();
+ this.outputFormat = sd.getOutputFormat();
+ this.compressed = sd.isCompressed();
+ this.numBuckets = sd.getNumBuckets();
+ this.serDeInfo = new SerDeInfoWrapper(sd.getSerdeInfo());
+// this.bucketCols = sd.getBucketCols();
+ this.sortCols = Lists.newArrayList();
+ for (Order o : sd.getSortCols()) this.sortCols.add(new OrderWrapper(o));
+ this.parameters = sd.getParameters();
+ }
+
+ @JsonIgnore
+ public StorageDescriptor getSd() {
+ return sd;
+ }
+
+ }
+
+ public static class SerDeInfoWrapper {
+ @JsonIgnore
+ private SerDeInfo serDeInfo;
+ @JsonProperty
+ public String name;
+ @JsonProperty
+ public String serializationLib;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
+ this.name = name;
+ this.serializationLib = serializationLib;
+ this.parameters = parameters;
+ this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
+ }
+
+ public SerDeInfoWrapper(SerDeInfo serDeInfo) {
+ this.serDeInfo = serDeInfo;
+ this.name = serDeInfo.getName();
+ this.serializationLib = serDeInfo.getSerializationLib();
+ this.parameters = serDeInfo.getParameters();
+ }
+
+ @JsonIgnore
+ public SerDeInfo getSerDeInfo() {
+ return serDeInfo;
+ }
+ }
+
+ public static class FieldSchemaWrapper {
+ @JsonIgnore
+ private FieldSchema fieldSchema;
+ @JsonProperty
+ public String name;
+ @JsonProperty
+ public String type;
+ @JsonProperty
+ public String comment;
+
+ @JsonCreator
+ public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
+ this.name = name;
+ this.type = type;
+ this.comment = comment;
+ this.fieldSchema = new FieldSchema(name, type, comment);
+ }
+
+ public FieldSchemaWrapper(FieldSchema fieldSchema) {
+ this.fieldSchema = fieldSchema;
+ this.name = fieldSchema.getName();
+ this.type = fieldSchema.getType();
+ this.comment = fieldSchema.getComment();
+ }
+
+ @JsonIgnore
+ public FieldSchema getFieldSchema() {
+ return fieldSchema;
+ }
+ }
+
+ public static class OrderWrapper {
+ @JsonIgnore
+ private Order ord;
+ @JsonProperty
+ public String col;
+ @JsonProperty
+ public int order;
+
+ @JsonCreator
+ public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
+ this.col = col;
+ this.order = order;
+ }
+
+ public OrderWrapper(Order ord) {
+ this.ord = ord;
+ this.col = ord.getCol();
+ this.order = ord.getOrder();
+ }
+
+ @JsonIgnore
+ public Order getOrder() {
+ return ord;
+ }
+ }
+}
|