drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [24/38] Integrate new SQL changes with Hive storage engine, moving to automatic file detection. Rename storage engine to storage plugin. Separate storage plugins from format plugins, updating Parquet and JSON to format engines. Refactor distribution log
Date Tue, 04 Mar 2014 08:07:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
new file mode 100644
index 0000000..97124bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork{
+
+  private long start;
+  private long length;
+
+  @JsonCreator
+  public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) {
+    this.path = path;
+    this.start = start;
+    this.length = length;
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public long getLength() {
+    return length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
new file mode 100644
index 0000000..bf1d762
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+
+public class ReadEntryWithPath {
+
+  protected String path;
+
+  
+  public ReadEntryWithPath(String path) {
+    super();
+    this.path = path;
+  }
+
+  public ReadEntryWithPath(){}
+  
+  public String getPath(){
+   return path;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
new file mode 100644
index 0000000..c69edb7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaHolder;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.internal.Lists;
+
+public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFactory<String, DrillTable> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
+
+  private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(this);
+  private final List<FormatMatcher> fileMatchers;
+  private final List<FormatMatcher> dirMatchers;
+
+  private final Path root;
+  private final DrillFileSystem fs;
+  private final String storageEngineName;
+  private final String schemaName;
+
+  public WorkspaceSchemaFactory(String schemaName, String storageEngineName, DrillFileSystem fileSystem, String path,
+      List<FormatMatcher> formatMatchers) throws ExecutionSetupException {
+    this.fs = fileSystem;
+    this.root = new Path(path);
+    this.fileMatchers = Lists.newArrayList();
+    this.dirMatchers = Lists.newArrayList();
+    for (FormatMatcher m : formatMatchers) {
+      if (m.supportDirectoryReads()) {
+        dirMatchers.add(m);
+      }
+      fileMatchers.add(m);
+    }
+    this.storageEngineName = storageEngineName;
+    this.schemaName = schemaName;
+  }
+
+  public WorkspaceSchema create(SchemaHolder holder) {
+    return new WorkspaceSchema(holder, schemaName);
+  }
+
+  @Override
+  public DrillTable create(String key) {
+    try {
+
+      FileSelection fileSelection = FileSelection.create(fs, root, key);
+      if(fileSelection == null) return null;
+      
+      if (fileSelection.containsDirectories(fs)) {
+        for (FormatMatcher m : dirMatchers) {
+          try {
+            Object selection = m.isReadable(fileSelection);
+            if (selection != null)
+              return new DynamicDrillTable(storageEngineName, selection, m.getFormatPlugin().getStorageConfig());
+          } catch (IOException e) {
+            logger.debug("File read failed.", e);
+          }
+        }
+        fileSelection = fileSelection.minusDirectorries(fs);
+      }
+
+      for (FormatMatcher m : fileMatchers) {
+        Object selection = m.isReadable(fileSelection);
+        if (selection != null)
+          return new DynamicDrillTable(storageEngineName, selection, m.getFormatPlugin().getStorageConfig());
+      }
+      return null;
+
+    } catch (IOException e) {
+      logger.debug("Failed to create DrillTable with root {} and name {}", root, key, e);
+    }
+
+    return null;
+  }
+
+  @Override
+  public void destroy(DrillTable value) {
+  }
+
+  public class WorkspaceSchema extends AbstractSchema {
+
+    public WorkspaceSchema(SchemaHolder parentSchema, String name) {
+      super(parentSchema, name);
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return tables.keySet();
+    }
+
+    @Override
+    public DrillTable getTable(String name) {
+      return tables.get(name);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
new file mode 100644
index 0000000..d79e542
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyBatchCreator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class EasyBatchCreator implements BatchCreator<EasySubScan>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children == null || children.isEmpty();
+    return config.getFormatPlugin().getBatch(context, config);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
new file mode 100644
index 0000000..8a41575
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.QueryOptimizerRule;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+
+import com.beust.jcommander.internal.Lists;
+
+public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+
+  private final BasicFormatMatcher matcher;
+  private final DrillbitContext context;
+  private final boolean readable;
+  private final boolean writable;
+  private final boolean blockSplittable;
+  private final DrillFileSystem fs;
+  private final StoragePluginConfig storageConfig;
+  private final FormatPluginConfig formatConfig;
+  private final String name;
+  
+  protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable, String extension, String defaultName){
+    this.matcher = new BasicFormatMatcher(this, fs, extension);
+    this.readable = readable;
+    this.writable = writable;
+    this.context = context;
+    this.blockSplittable = blockSplittable;
+    this.fs = fs;
+    this.storageConfig = storageConfig;
+    this.formatConfig = formatConfig;
+    this.name = name == null ? defaultName : name; 
+  }
+  
+  @Override
+  public DrillFileSystem getFileSystem() {
+    return fs;
+  }
+
+  @Override
+  public DrillbitContext getContext() {
+    return context;
+  }
+  
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will
+   * only split on file boundaries.
+   * 
+   * @return True if splittable.
+   */
+  public boolean isBlockSplittable(){
+    return blockSplittable;
+  };
+
+  public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, FieldReference ref, List<SchemaPath> columns) throws ExecutionSetupException;
+
+  
+  RecordBatch getBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+    List<RecordReader> readers = Lists.newArrayList();
+    for(FileWork work : scan.getWorkUnits()){
+      readers.add(getRecordReader(context, work, scan.getRef(), scan.getColumns())); 
+    }
+    
+    return new ScanBatch(context, readers.iterator());
+  }
+  
+  @Override
+  public AbstractGroupScan getGroupScan(FieldReference outputRef, FileSelection selection) throws IOException {
+    return new EasyGroupScan(selection, this, outputRef, null);
+  }
+
+  @Override
+  public FormatPluginConfig getConfig() {
+    return formatConfig;
+  }
+
+  @Override
+  public StoragePluginConfig getStorageConfig() {
+    return storageConfig;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return readable;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return writable;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public List<QueryOptimizerRule> getOptimizerRules() {
+    return Collections.emptyList();
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
new file mode 100644
index 0000000..5fcf7a5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.BlockMapBuilder;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+
+@JsonTypeName("fs-scan")
+public class EasyGroupScan extends AbstractGroupScan{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
+
+  private final FileSelection selection;
+  private final EasyFormatPlugin<?> formatPlugin;
+  private final FieldReference ref;
+  private final int maxWidth;
+  private final List<SchemaPath> columns;
+  
+  private ListMultimap<Integer, CompleteFileWork> mappings;
+  private List<CompleteFileWork> chunks;
+  private List<EndpointAffinity> endpointAffinities;
+
+  @JsonCreator
+  public EasyGroupScan(
+      @JsonProperty("files") List<String> files, //
+      @JsonProperty("storage") StoragePluginConfig storageConfig, //
+      @JsonProperty("format") FormatPluginConfig formatConfig, //
+      @JacksonInject StoragePluginRegistry engineRegistry, // 
+      @JsonProperty("ref") FieldReference ref, //
+      @JsonProperty("columns") List<SchemaPath> columns
+      ) throws IOException, ExecutionSetupException {
+    
+    this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+    this.selection = new FileSelection(files, true);
+    this.maxWidth = selection.getFileStatusList(formatPlugin.getFileSystem()).size();
+    this.ref = ref;
+    this.columns = columns;
+  }
+  
+  public EasyGroupScan(
+      FileSelection selection, //
+      EasyFormatPlugin<?> formatPlugin, // 
+      FieldReference ref, //
+      List<SchemaPath> columns
+      ) throws IOException{
+    this.selection = selection;
+    this.maxWidth = selection.getFileStatusList(formatPlugin.getFileSystem()).size();
+    this.formatPlugin = formatPlugin;
+    this.ref = ref;
+    this.columns = columns;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return maxWidth;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1,1,1,1);
+  }
+
+  @Override
+  public Size getSize() {
+    return new Size(1024,1024);
+  }
+
+  @JsonProperty("files")
+  public List<String> getFiles() {
+    return selection.getAsFiles();
+  }
+  
+  @JsonIgnore
+  public FileSelection getFileSelection(){
+    return selection;
+  }
+  
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    assert children == null || children.isEmpty();
+    return this;
+  }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns(){
+    return columns;
+  }
+  
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    if (this.endpointAffinities == null) {
+      try{
+      BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+      this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+      this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+      }catch(IOException e){
+        logger.warn("Failure determining endpoint affinity.", e);
+        this.endpointAffinities = Collections.emptyList();
+      }
+    }
+    return this.endpointAffinities;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+  }
+
+  @Override
+  public EasySubScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.size() : String.format(
+        "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
+        minorFragmentId);
+
+    List<CompleteFileWork> filesForMinor = mappings.get(minorFragmentId);
+
+    Preconditions.checkArgument(!filesForMinor.isEmpty(),
+        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
+
+    return new EasySubScan(convert(filesForMinor), formatPlugin, ref, columns);
+  }
+  
+  private List<FileWorkImpl> convert(List<CompleteFileWork> list){
+    List<FileWorkImpl> newList = Lists.newArrayList();
+    for(CompleteFileWork f : list){
+      newList.add(f.getAsFileWork());
+    }
+    return newList;
+  }
+  
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig(){
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig(){
+    return formatPlugin.getConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
new file mode 100644
index 0000000..72d1fe6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractSubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("fs-sub-scan")
+public class EasySubScan extends AbstractSubScan{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasySubScan.class);
+
+  private final List<FileWorkImpl> files;
+  private final EasyFormatPlugin<?> formatPlugin;
+  private final FieldReference ref;
+  private final List<SchemaPath> columns;
+  
+  @JsonCreator
+  public EasySubScan(
+      @JsonProperty("files") List<FileWorkImpl> files, //
+      @JsonProperty("storage") StoragePluginConfig storageConfig, //
+      @JsonProperty("format") FormatPluginConfig formatConfig, //
+      @JacksonInject StoragePluginRegistry engineRegistry, // 
+      @JsonProperty("ref") FieldReference ref, //
+      @JsonProperty("columns") List<SchemaPath> columns //
+      ) throws IOException, ExecutionSetupException {
+    
+    this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+    Preconditions.checkNotNull(this.formatPlugin);
+    this.files = files;
+    this.ref = ref;
+    this.columns = columns;
+  }
+  
+  public EasySubScan(List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, FieldReference ref, List<SchemaPath> columns){
+    this.formatPlugin = plugin;
+    this.files = files;
+    this.ref = ref;
+    this.columns = columns;
+  }
+  
+  @JsonIgnore
+  public EasyFormatPlugin<?> getFormatPlugin(){
+    return formatPlugin;
+  }
+
+  @JsonProperty("files")
+  public List<FileWorkImpl> getWorkUnits() {
+    return files;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig(){
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig(){
+    return formatPlugin.getConfig();
+  }
+  
+  @JsonProperty("ref")
+  public FieldReference getRef() {
+    return ref;
+  }
+  
+  @JsonProperty("columns")
+  public List<SchemaPath> getColumns(){
+    return columns;
+  }
+
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
new file mode 100644
index 0000000..170d339
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+
+public interface FileWork {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileWork.class);
+  
+  public String getPath();  
+  public long getStart();
+  public long getLength();  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
new file mode 100644
index 0000000..8af6aaf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillFileSystem.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.shim;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Wraps the underlying filesystem to provide advanced file system features. Delegates to underlying file system if
+ * those features are exposed.
+ */
+public abstract class DrillFileSystem implements AutoCloseable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
+
+  public abstract FileSystem getUnderlying();
+  
+  public abstract BlockLocation[] getBlockLocations(FileStatus status, long start, long length) throws IOException;
+  public abstract List<FileStatus> list(boolean recursive, Path... paths) throws IOException;
+  public abstract FileStatus getFileStatus(Path p) throws IOException;
+  public abstract DrillOutputStream create(Path p) throws IOException;
+  public abstract DrillInputStream open(Path p) throws IOException;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
new file mode 100644
index 0000000..82764a3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillInputStream.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.shim;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public abstract class DrillInputStream implements AutoCloseable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillInputStream.class);
+
+//  public abstract AccountingByteBuf readNow(long start, long length) throws IOException;
+//  public abstract void readNow(AccountingByteBuf b, long start, long length) throws IOException;
+//  public abstract AccountingByteBuf readNow() throws IOException;
+  
+  public abstract FSDataInputStream getInputStream();
+//  public abstract CheckedFuture<Long, IOException> readFuture(AccountingByteBuf b, long start, long length) throws IOException;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
new file mode 100644
index 0000000..c2446e9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/DrillOutputStream.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.shim;
+
+import java.io.OutputStream;
+
+
+public abstract class DrillOutputStream implements AutoCloseable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOutputStream.class);
+
+  public abstract OutputStream getOuputStream();
+//  public abstract CheckedFuture<Long, IOException> writeFuture(AccountingByteBuf b);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
new file mode 100644
index 0000000..eaab5ef
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/FileSystemCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.shim;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.shim.fallback.FallbackFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+public class FileSystemCreator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemCreator.class);
+  
+  public static DrillFileSystem getFileSystem(DrillConfig config, Configuration fsConf) throws IOException{
+    FileSystem fs = FileSystem.get(fsConf);
+    return new FallbackFileSystem(config, fs);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
new file mode 100644
index 0000000..5743ca1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/shim/fallback/FallbackFileSystem.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.shim.fallback;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.shim.DrillInputStream;
+import org.apache.drill.exec.store.dfs.shim.DrillOutputStream;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+public class FallbackFileSystem extends DrillFileSystem {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FallbackFileSystem.class);
+
+  final FileSystem fs;
+
+  public FallbackFileSystem(DrillConfig config, FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  public FileSystem getUnderlying() {
+    return fs;
+  }
+
+  @Override
+  public List<FileStatus> list(boolean recursive, Path... paths) throws IOException {
+    if (recursive) {
+      List<FileStatus> statuses = Lists.newArrayList();
+      for (Path p : paths) {
+        addRecursiveStatus(fs.getFileStatus(p), statuses);
+      }
+      return statuses;
+
+    } else {
+      return Lists.newArrayList(fs.listStatus(paths));
+    }
+  }
+
+  
+  private void addRecursiveStatus(FileStatus parent, List<FileStatus> listToFill) throws IOException {
+    if (parent.isDir()) {
+      Path pattern = new Path(parent.getPath(), "/*");
+      FileStatus[] sub = fs.globStatus(pattern);
+      for(FileStatus s : sub){
+        listToFill.add(s);
+      }
+    } else {
+      listToFill.add(parent);
+    }
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path p) throws IOException {
+    return fs.getFileStatus(p);
+  }
+
+  @Override
+  public DrillOutputStream create(Path p) throws IOException {
+    return new Out(fs.create(p));
+  }
+
+  @Override
+  public DrillInputStream open(Path p) throws IOException {
+    return new In(fs.open(p));
+  }
+
+  @Override
+  public void close() throws Exception {
+    fs.close();
+  }
+
+  @Override
+  public BlockLocation[] getBlockLocations(FileStatus status, long start, long len) throws IOException {
+    return fs.getFileBlockLocations(status, start, len);
+  }
+
+  private class Out extends DrillOutputStream {
+
+    private final FSDataOutputStream out;
+    
+    public Out(FSDataOutputStream out) {
+      super();
+      this.out = out;
+    }
+
+    @Override
+    public void close() throws Exception {
+      out.close();
+    }
+
+    @Override
+    public FSDataOutputStream getOuputStream() {
+      return out;
+    }
+
+  }
+
+  private class In extends DrillInputStream {
+
+    private final FSDataInputStream in;
+    
+    public In(FSDataInputStream in) {
+      super();
+      this.in = in;
+    }
+
+    @Override
+    public FSDataInputStream getInputStream() {
+      return in;
+    }
+
+    @Override
+    public void close() throws Exception {
+      in.close();
+    }
+
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
new file mode 100644
index 0000000..82bf3bf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
+
+  public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
+    this(name, context, fs, storageConfig, new JSONFormatConfig());
+  }
+  
+  public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
+    super(name, context, fs, config, formatPluginConfig, true, false, false, "json", "json");
+  }
+  
+  @Override
+  public RecordReader getRecordReader(FragmentContext context, FileWork fileWork, FieldReference ref,
+      List<SchemaPath> columns) throws ExecutionSetupException {
+    return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), ref, columns);
+  }
+
+  @JsonTypeName("json")
+  public static class JSONFormatConfig implements FormatPluginConfig {
+
+    @Override
+    public int hashCode() {
+      return 31;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() == obj.getClass())
+        return true;
+      return false;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
new file mode 100644
index 0000000..67e8b3f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
+import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.schema.DiffSchema;
+import org.apache.drill.exec.schema.Field;
+import org.apache.drill.exec.schema.NamedField;
+import org.apache.drill.exec.schema.ObjectSchema;
+import org.apache.drill.exec.schema.RecordSchema;
+import org.apache.drill.exec.schema.SchemaIdGenerator;
+import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.VectorHolder;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.RepeatedBitVector;
+import org.apache.drill.exec.vector.RepeatedFloat4Vector;
+import org.apache.drill.exec.vector.RepeatedIntVector;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class JSONRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
+  private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+  public static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  private final Map<String, VectorHolder> valueVectorMap;
+  private final FileSystem fileSystem;
+  private final Path hadoopPath;
+
+  private JsonParser parser;
+  private SchemaIdGenerator generator;
+  private DiffSchema diffSchema;
+  private RecordSchema currentSchema;
+  private List<Field> removedFields;
+  private OutputMutator outputMutator;
+  private BufferAllocator allocator;
+  private int batchSize;
+  private final FieldReference ref;
+  private final List<SchemaPath> columns;
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
+                          FieldReference ref, List<SchemaPath> columns) {
+    this.hadoopPath = new Path(inputPath);
+    this.fileSystem = fileSystem;
+    this.allocator = fragmentContext.getAllocator();
+    this.batchSize = batchSize;
+    valueVectorMap = Maps.newHashMap();
+    this.ref = ref;
+    this.columns = columns;
+  }
+
+  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, FieldReference ref,
+                          List<SchemaPath> columns) {
+    this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, ref, columns);
+  }
+
+  private JsonParser getParser() {
+    return parser;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    outputMutator = output;
+    currentSchema = new ObjectSchema();
+    diffSchema = new DiffSchema();
+    removedFields = Lists.newArrayList();
+
+    try {
+      JsonFactory factory = new JsonFactory();
+      parser = factory.createJsonParser(fileSystem.open(hadoopPath));
+      parser.nextToken(); // Read to the first START_OBJECT token
+      generator = new SchemaIdGenerator();
+    } catch (IOException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  @Override
+  public int next() {
+    if (parser.isClosed() || !parser.hasCurrentToken()) {
+      return 0;
+    }
+
+    resetBatch();
+
+    int nextRowIndex = 0;
+
+    try {
+      while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
+        parser.nextToken(); // Read to START_OBJECT token
+
+        if (!parser.hasCurrentToken()) {
+          parser.close();
+          break;
+        }
+      }
+
+      parser.nextToken();
+
+      if (!parser.hasCurrentToken()) {
+        parser.close();
+      }
+
+      // Garbage collect fields never referenced in this batch
+      for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+        diffSchema.addRemovedField(field);
+        outputMutator.removeField(field.getAsMaterializedField(ref));
+      }
+
+      if (diffSchema.isChanged()) {
+        outputMutator.setNewSchema();
+      }
+
+
+    } catch (IOException | SchemaChangeException e) {
+      logger.error("Error reading next in Json reader", e);
+    }
+
+    for (VectorHolder holder : valueVectorMap.values()) {
+      holder.populateVectorLength();
+    }
+
+    return nextRowIndex;
+  }
+
+  private void resetBatch() {
+    for (VectorHolder value : valueVectorMap.values()) {
+      value.reset();
+    }
+
+    currentSchema.resetMarkedFields();
+    diffSchema.reset();
+    removedFields.clear();
+  }
+
+  @Override
+  public void cleanup() {
+    try {
+      parser.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Json parser", e);
+    }
+  }
+
+
+  private RecordSchema getCurrentSchema() {
+    return currentSchema;
+  }
+
+  private void setCurrentSchema(RecordSchema schema) {
+    currentSchema = schema;
+  }
+
+  private List<Field> getRemovedFields() {
+    return removedFields;
+  }
+
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  private boolean fieldSelected(String field){
+    SchemaPath sp = new SchemaPath(field, ExpressionPosition.UNKNOWN);
+    if (this.columns != null && this.columns.size() > 0){
+      for (SchemaPath expr : this.columns){
+        if ( sp.equals(expr)){
+          return true;
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
+  public static enum ReadType {
+    ARRAY(END_ARRAY) {
+      @Override
+      public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
+        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+      }
+
+      @Override
+      public RecordSchema createSchema() throws IOException {
+        return new ObjectSchema();
+      }
+    },
+    OBJECT(END_OBJECT) {
+      @Override
+      public Field createField(RecordSchema parentSchema,
+                               String prefixFieldName,
+                               String fieldName,
+                               MajorType fieldType,
+                               int index) {
+        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
+      }
+
+      @Override
+      public RecordSchema createSchema() throws IOException {
+        return new ObjectSchema();
+      }
+    };
+
+    private final JsonToken endObject;
+
+    ReadType(JsonToken endObject) {
+      this.endObject = endObject;
+    }
+
+    public JsonToken getEndObject() {
+      return endObject;
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    public boolean readRecord(JSONRecordReader reader,
+                              String prefixFieldName,
+                              int rowIndex,
+                              int groupCount) throws IOException, SchemaChangeException {
+      JsonParser parser = reader.getParser();
+      JsonToken token = parser.nextToken();
+      JsonToken endObject = getEndObject();
+      int colIndex = 0;
+      boolean isFull = false;
+      while (token != endObject) {
+        if (token == FIELD_NAME) {
+          token = parser.nextToken();
+          continue;
+        }
+
+        String fieldName = parser.getCurrentName();
+        if ( fieldName != null && ! reader.fieldSelected(fieldName)){
+          // this field was not requested in the query
+          token = parser.nextToken();
+          colIndex += 1;
+          continue;
+        }
+        MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
+        ReadType readType = null;
+        switch (token) {
+          case START_ARRAY:
+            readType = ReadType.ARRAY;
+            groupCount++;
+            break;
+          case START_OBJECT:
+            readType = ReadType.OBJECT;
+            groupCount = 0;
+            break;
+        }
+
+        if (fieldType != null) { // Including nulls
+          boolean currentFieldFull = !recordData(
+              readType,
+              reader,
+              fieldType,
+              prefixFieldName,
+              fieldName,
+              rowIndex,
+              colIndex,
+              groupCount);
+
+          isFull = isFull || currentFieldFull;
+        }
+        token = parser.nextToken();
+        colIndex += 1;
+      }
+      return !isFull;
+    }
+
+    private void removeChildFields(List<Field> removedFields, Field field) {
+      RecordSchema schema = field.getAssignedSchema();
+      if (schema == null) {
+        return;
+      }
+      for (Field childField : schema.getFields()) {
+        removedFields.add(childField);
+        if (childField.hasSchema()) {
+          removeChildFields(removedFields, childField);
+        }
+      }
+    }
+
+    private boolean recordData(JSONRecordReader.ReadType readType,
+                               JSONRecordReader reader,
+                               MajorType fieldType,
+                               String prefixFieldName,
+                               String fieldName,
+                               int rowIndex,
+                               int colIndex,
+                               int groupCount) throws IOException, SchemaChangeException {
+      RecordSchema currentSchema = reader.getCurrentSchema();
+      Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
+      boolean isFieldFound = field != null;
+      List<Field> removedFields = reader.getRemovedFields();
+      boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
+
+      if (isFieldFound && !field.getFieldType().equals(fieldType)) {
+        boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
+
+        if (newFieldLateBound && !existingFieldLateBound) {
+          fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
+        } else if (!newFieldLateBound && existingFieldLateBound) {
+          field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
+        } else if (!newFieldLateBound && !existingFieldLateBound) {
+          if (field.hasSchema()) {
+            removeChildFields(removedFields, field);
+          }
+          removedFields.add(field);
+          currentSchema.removeField(field, colIndex);
+
+          isFieldFound = false;
+        }
+      }
+
+      if (!isFieldFound) {
+        field = createField(
+            currentSchema,
+            prefixFieldName,
+            fieldName,
+            fieldType,
+            colIndex
+        );
+
+        reader.recordNewField(field);
+        currentSchema.addField(field);
+      }
+
+      field.setRead(true);
+
+      VectorHolder holder = getOrCreateVectorHolder(reader, field);
+      if (readType != null) {
+        RecordSchema fieldSchema = field.getAssignedSchema();
+        RecordSchema newSchema = readType.createSchema();
+
+        if (readType != ReadType.ARRAY) {
+          reader.setCurrentSchema(fieldSchema);
+          if (fieldSchema == null) reader.setCurrentSchema(newSchema);
+          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        } else {
+          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
+        }
+
+        reader.setCurrentSchema(currentSchema);
+
+      } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
+        return addValueToVector(
+            rowIndex,
+            holder,
+            JacksonHelper.getValueFromFieldType(
+                reader.getParser(),
+                fieldType.getMinorType()
+            ),
+            fieldType.getMinorType(),
+            groupCount
+        );
+      }
+
+      return true;
+    }
+
+    private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
+      switch (minorType) {
+        case INT: {
+          holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
+              NullableIntVector.Mutator m = int4.getMutator();
+              m.set(index, (Integer) val);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated int is not supported.");
+            }
+
+            RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
+            RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Integer) val);
+          }
+
+          return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
+        }
+        case FLOAT4: {
+          holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
+              NullableFloat4Vector.Mutator m = float4.getMutator();
+              m.set(index, (Float) val);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated float is not supported.");
+            }
+
+            RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
+            RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Float) val);
+          }
+          return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
+        }
+        case VARCHAR: {
+          if (val == null) {
+            return (index + 1) * 4 <= holder.getLength();
+          } else {
+            byte[] bytes = ((String) val).getBytes(UTF_8);
+            int length = bytes.length;
+            holder.incAndCheckLength(length);
+            if (groupCount == 0) {
+              NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
+              NullableVarCharVector.Mutator m = varLen4.getMutator();
+              m.set(index, bytes);
+            } else {
+              RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
+              RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
+              holder.setGroupCount(index);
+              m.add(index, bytes);
+            }
+            return holder.hasEnoughSpace(length + 4 + 1);
+          }
+        }
+        case BIT: {
+          holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
+          if (groupCount == 0) {
+            if (val != null) {
+              NullableBitVector bit = (NullableBitVector) holder.getValueVector();
+              NullableBitVector.Mutator m = bit.getMutator();
+              m.set(index, (Boolean) val ? 1 : 0);
+            }
+          } else {
+            if (val == null) {
+              throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
+            }
+
+            RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
+            RepeatedBitVector.Mutator m = repeatedBit.getMutator();
+            holder.setGroupCount(index);
+            m.add(index, (Boolean) val ? 1 : 0);
+          }
+          return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
+        }
+        default:
+          throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
+      }
+    }
+
+    private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
+      return reader.getOrCreateVectorHolder(field);
+    }
+
+    public abstract RecordSchema createSchema() throws IOException;
+
+    public abstract Field createField(RecordSchema parentSchema,
+                                      String prefixFieldName,
+                                      String fieldName,
+                                      MajorType fieldType,
+                                      int index);
+  }
+
+  private void recordNewField(Field field) {
+    diffSchema.recordNewField(field);
+  }
+
+  private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
+    String fullFieldName = ref != null ? ref.getPath() + "." + field.getFullFieldName() : field.getFullFieldName();
+    VectorHolder holder = valueVectorMap.get(fullFieldName);
+
+    if (holder == null) {
+      MajorType type = field.getFieldType();
+      MinorType minorType = type.getMinorType();
+
+      if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
+        return null;
+      }
+
+      MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
+
+      ValueVector v = TypeHelper.getNewVector(f, allocator);
+      AllocationHelper.allocate(v, batchSize, 50);
+      holder = new VectorHolder(v);
+      valueVectorMap.put(fullFieldName, holder);
+      outputMutator.addField(v);
+      return holder;
+    }
+    return holder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 6211e21..f330a1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -17,19 +17,19 @@
  */
 package org.apache.drill.exec.store.hive;
 
-import com.beust.jcommander.internal.Lists;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
 import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
 
-public class HiveReadEntry implements ReadEntry {
+public class HiveReadEntry {
 
   @JsonProperty("table")
   public HiveTable table;
@@ -60,18 +60,5 @@ public class HiveReadEntry implements ReadEntry {
     return partitionsUnwrapped;
   }
 
-  @Override
-  public OperatorCost getCost() {
-    // TODO: need to come up with way to calculate the cost for Hive tables
-    return new OperatorCost(1, 1, 2, 2);
-  }
-
-  @Override
-  public Size getSize() {
-    // TODO: contact the metastore and find the size of the data in table
-    Size size = new Size(1, 1);
-
-    return size;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
deleted file mode 100644
index ef7266c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-public class HiveReadEntryOld implements ReadEntry {
-  private final HiveConf conf;
-  private final String table;
-  private Size size;
-
-  public HiveReadEntryOld(HiveConf conf, String table) {
-    this.conf = conf;
-    this.table = table;
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    // TODO: need to come up with way to calculate the cost for Hive tables
-    return new OperatorCost(1, 1, 2, 2);
-  }
-
-  @Override
-  public Size getSize() {
-    if (size != null) {
-      // TODO: contact the metastore and find the size of the data in table
-      size = new Size(1, 1);
-    }
-
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index bc2a16b..a1e8f1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -58,9 +58,9 @@ public class HiveScan extends AbstractGroupScan {
   @JsonIgnore
   private List<InputSplit> inputSplits = Lists.newArrayList();
   @JsonIgnore
-  public HiveStorageEngine storageEngine;
-  @JsonProperty("storageengine")
-  public HiveStorageEngineConfig engineConfig;
+  public HiveStoragePlugin storagePlugin;
+  @JsonProperty("storage-plugin")
+  public String storagePluginName;
 
   @JsonIgnore
   public List<Partition> partitions;
@@ -77,27 +77,27 @@ public class HiveScan extends AbstractGroupScan {
   Map<InputSplit, Partition> partitionMap = new HashMap();
 
   @JsonCreator
-  public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storageengine") HiveStorageEngineConfig config,
+  public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storage-plugin") String storagePluginName,
                   @JsonProperty("columns") List<FieldReference> columns,
-                  @JacksonInject StorageEngineRegistry engineRegistry) throws ExecutionSetupException {
+                  @JacksonInject StoragePluginRegistry engineRegistry) throws ExecutionSetupException {
     this.hiveReadEntry = hiveReadEntry;
     this.table = hiveReadEntry.getTable();
-    this.engineConfig = config;
-    this.storageEngine = (HiveStorageEngine) engineRegistry.getEngine(config);
+    this.storagePluginName = storagePluginName;
+    this.storagePlugin = (HiveStoragePlugin) engineRegistry.getEngine(storagePluginName);
     this.columns = columns;
     this.partitions = hiveReadEntry.getPartitions();
     getSplits();
-    endpoints = storageEngine.getContext().getBits();
+    endpoints = storagePlugin.getContext().getBits();
   }
 
-  public HiveScan(HiveReadEntry hiveReadEntry, HiveStorageEngine storageEngine, List<FieldReference> columns) throws ExecutionSetupException {
+  public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storageEngine, List<FieldReference> columns) throws ExecutionSetupException {
     this.table = hiveReadEntry.getTable();
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.partitions = hiveReadEntry.getPartitions();
     getSplits();
     endpoints = storageEngine.getContext().getBits();
-    this.engineConfig = storageEngine.getConfig();
+    this.storagePluginName = storageEngine.getName();
   }
 
   public List<FieldReference> getColumns() {
@@ -112,7 +112,7 @@ public class HiveScan extends AbstractGroupScan {
         for (Object obj : properties.keySet()) {
           job.set((String) obj, (String) properties.get(obj));
         }
-        InputFormat format = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+        InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
         job.setInputFormat(format.getClass());
         Path path = new Path(table.getSd().getLocation());
         FileInputFormat.addInputPath(job, path);
@@ -130,7 +130,7 @@ public class HiveScan extends AbstractGroupScan {
           for (Object obj : properties.keySet()) {
             job.set((String) obj, (String) properties.get(obj));
           }
-          InputFormat format = (InputFormat) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
+          InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
           job.setInputFormat(format.getClass());
           FileInputFormat.addInputPath(job, new Path(partition.getSd().getLocation()));
           format = job.getInputFormat();
@@ -192,12 +192,12 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    Map<String, DrillbitEndpoint> endpointMap = new HashMap();
+    Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
     for (DrillbitEndpoint endpoint : endpoints) {
       endpointMap.put(endpoint.getAddress(), endpoint);
       logger.debug("endpoing address: {}", endpoint.getAddress());
     }
-    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap();
+    Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>();
     try {
       long totalSize = 0;
       for (InputSplit split : inputSplits) {
@@ -242,6 +242,6 @@ public class HiveScan extends AbstractGroupScan {
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
-    return new HiveScan(hiveReadEntry, storageEngine, columns);
+    return new HiveScan(hiveReadEntry, storagePlugin, columns);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
deleted file mode 100644
index 0f6f3bc..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-
-import java.io.IOException;
-import java.util.List;
-
-public class HiveStorageEngine extends AbstractStorageEngine {
-
-  private HiveStorageEngineConfig config;
-  private HiveConf hiveConf;
-  private HiveSchemaProvider schemaProvider;
-  static private DrillbitContext context;
-
-  public HiveStorageEngine(HiveStorageEngineConfig config, DrillbitContext context) throws ExecutionSetupException {
-    this.config = config;
-    this.context = context;
-    this.hiveConf = config.getHiveConf();
-  }
-
-  public HiveStorageEngineConfig getConfig() {
-    return config;
-  }
-
-  public DrillbitContext getContext() {
-    return context;
-  }
-
-  @Override
-  public HiveScan getPhysicalScan(Scan scan) throws IOException {
-    HiveReadEntry hiveReadEntry = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
-    try {
-      List<Partition> partitions = getSchemaProvider().getPartitions(hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
-      return new HiveScan(hiveReadEntry, this, null);
-    } catch (ExecutionSetupException | TException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  @Override
-  public HiveSchemaProvider getSchemaProvider() {
-    try {
-    if (schemaProvider == null) {
-      schemaProvider = new HiveSchemaProvider(config, context.getConfig());
-    }
-    return schemaProvider;
-    } catch (ExecutionSetupException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  List<String> getPartitions(String dbName, String tableName) throws TException {
-    List<Partition> partitions = getSchemaProvider().getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
-    List<String> partitionLocations = Lists.newArrayList();
-    if (partitions == null) return null;
-    for (Partition part : partitions) {
-      partitionLocations.add(part.getSd().getLocation());
-    }
-    return partitionLocations;
-  }
-
-  public static class HiveEntry implements ReadEntry {
-
-    private Table table;
-
-    public HiveEntry(Table table) {
-      this.table = table;
-    }
-
-    public Table getTable() {
-      return table;
-    }
-
-    @Override
-    public OperatorCost getCost() {
-      throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
-              "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
-    }
-
-    @Override
-    public Size getSize() {
-      throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
-              "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
-    }
-  }
-
-  public static class HiveSchemaProvider implements SchemaProvider {
-
-    private HiveConf hiveConf;
-    private HiveMetaStoreClient metaClient;
-
-    public HiveSchemaProvider(HiveStorageEngineConfig config, DrillConfig dConfig) throws ExecutionSetupException {
-      hiveConf = config.getHiveConf();
-    }
-
-    public HiveMetaStoreClient getMetaClient() throws MetaException {
-      if (metaClient == null) {
-        metaClient = new HiveMetaStoreClient(hiveConf);
-      }
-      return metaClient;
-    }
-
-    public Table getTable(String dbName, String tableName) throws TException {
-      HiveMetaStoreClient mClient = getMetaClient();
-      try {
-        return  mClient.getTable(dbName, tableName);
-      }catch (NoSuchObjectException e) {
-        logger.error("Database: {} table: {} not found", dbName, tableName);
-        throw new RuntimeException(e);
-      } catch (TException e) {
-        mClient.reconnect();
-        return  mClient.getTable(dbName, tableName);
-      }
-    }
-
-    List<Partition> getPartitions(String dbName, String tableName) throws TException {
-      HiveMetaStoreClient mClient = getMetaClient();
-      List<Partition> partitions;
-      try {
-        partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
-      } catch (TException e) {
-        mClient.reconnect();
-        partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
-      }
-      return partitions;
-    }
-
-    @Override
-    public HiveReadEntry getSelectionBaseOnName(String name) {
-      String[] dbNameTableName = name.split("\\.");
-      String dbName;
-      String t;
-      if (dbNameTableName.length > 1) {
-        dbName = dbNameTableName[0];
-        t = dbNameTableName[1];
-      } else {
-        dbName = "default";
-        t = name;
-      }
-
-      try {
-        Table table = getTable(dbName, t);
-        List<Partition> partitions = getPartitions(dbName, t);
-        List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
-        for(Partition part : partitions) {
-          hivePartitions.add(new HiveTable.HivePartition(part));
-        }
-        if (hivePartitions.size() == 0) hivePartitions = null;
-        return new HiveReadEntry(new HiveTable(table), hivePartitions);
-      } catch (NoSuchObjectException e) {
-        throw new DrillRuntimeException(e);
-      } catch (TException e) {
-        throw new DrillRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
deleted file mode 100644
index 91fec3b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.hadoop.hive.conf.HiveConf;
-
-import java.util.Map;
-
-@JsonTypeName("hive")
-public class HiveStorageEngineConfig extends StorageEngineConfigBase {
-  @JsonProperty
-  public Map<String, String> configProps;
-  @JsonIgnore
-  private HiveConf hiveConf;
-
-  @JsonIgnore
-  public HiveConf getHiveConf() {
-    if (hiveConf == null) {
-      hiveConf = new HiveConf();
-      if (configProps != null) {
-        for (Map.Entry<String, String> entry : configProps.entrySet()) {
-          hiveConf.set(entry.getKey(), entry.getValue());
-        }
-      }
-    }
-
-    return hiveConf;
-  }
-
-  @JsonCreator
-  public HiveStorageEngineConfig(@JsonProperty("config") Map<String, String> props) {
-    this.configProps = props;
-  }
-
-  @Override
-  public int hashCode() {
-    return configProps != null ? configProps.hashCode() : 0;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    HiveStorageEngineConfig that = (HiveStorageEngineConfig) o;
-
-    if (configProps != null ? !configProps.equals(that.configProps) : that.configProps != null) return false;
-
-    return true;
-  }
-}


Mime
View raw message