drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [22/38] Integrate new SQL changes with Hive storage engine, moving to automatic file detection. Rename storage engine to storage plugin. Separate storage plugins from format plugins, updating Parquet and JSON to format engines. Refactor distribution log
Date Tue, 04 Mar 2014 08:07:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
new file mode 100644
index 0000000..6d02046
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.QueryOptimizerRule;
+import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.MagicString;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
+
+import com.google.common.collect.Lists;
+
+public class ParquetFormatPlugin implements FormatPlugin{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+  private final DrillbitContext context;
+  static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+  private CodecFactoryExposer codecFactoryExposer;
+  private final DrillFileSystem fs;
+  private final ParquetFormatMatcher formatMatcher;
+  private final ParquetFormatConfig config;
+  private final StoragePluginConfig storageConfig;
+  private final String name;
+  
+  public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){
+    this(name, context, fs, storageConfig, new ParquetFormatConfig());
+  }
+  
+  public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
+    this.context = context;
+    this.codecFactoryExposer = new CodecFactoryExposer(fs.getUnderlying().getConf());
+    this.config = formatConfig;
+    this.formatMatcher = new ParquetFormatMatcher(this, fs);
+    this.storageConfig = storageConfig;
+    this.fs = fs;
+    this.name = name == null ? "parquet" : name;
+  }
+
+  Configuration getHadoopConfig() {
+    return fs.getUnderlying().getConf();
+  }
+
+  public DrillFileSystem getFileSystem() {
+    return fs;
+  }
+
+  @Override
+  public ParquetFormatConfig getConfig() {
+    return config;
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+  
+  @Override
+  public List<QueryOptimizerRule> getOptimizerRules() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ParquetGroupScan getGroupScan(FieldReference outputRef, FileSelection selection) throws IOException {
+    return new ParquetGroupScan( selection.getFileStatusList(fs), this, outputRef);
+  }
+
+  @Override
+  public StoragePluginConfig getStorageConfig() {
+    return storageConfig;
+  }
+
+  public CodecFactoryExposer getCodecFactoryExposer() {
+    return codecFactoryExposer;
+  }
+
+  public String getName(){
+    return name;
+  }
+  
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return formatMatcher;
+  }
+
+  private static class ParquetFormatMatcher extends BasicFormatMatcher{
+    
+    private final DrillFileSystem fs;
+    
+    public ParquetFormatMatcher(ParquetFormatPlugin plugin, DrillFileSystem fs) {
+      super(plugin, fs, //
+          Lists.newArrayList( //
+              Pattern.compile(".*\\.parquet$"), //
+              Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE) //
+              //
+              ),
+          Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC))
+                    
+          );
+      this.fs = fs;
+      
+    }
+    
+    @Override
+    public boolean supportDirectoryReads() {
+      return true;
+    }
+
+    @Override
+    public FormatSelection isReadable(FileSelection file) throws IOException {
+      // TODO: we only check the first file for directory reading.  This is because 
+      if(file.containsDirectories(fs)){
+        if(isDirReadable(file.getFirstPath(fs))){
+          return new FormatSelection(plugin.getConfig(), file);
+        }
+      }
+      return super.isReadable(file);
+    }
+    
+    boolean isDirReadable(FileStatus dir) {
+      Path p = new Path(dir.getPath(), "/" + ParquetFileWriter.PARQUET_METADATA_FILE);
+      try {
+        return fs.getUnderlying().exists(p);
+      } catch (IOException e) {
+        logger.info("Failure while attempting to check for Parquet metadata file.", e);
+        return false;
+      }
+    }
+    
+    
+    
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index aa01115..f76e59a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,37 +18,35 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
 import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntryFromHDFS;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AffinityCalculator;
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.BlockMapBuilder;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -59,16 +57,17 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.org.codehaus.jackson.annotate.JsonCreator;
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-
 
 @JsonTypeName("parquet-scan")
 public class ParquetGroupScan extends AbstractGroupScan {
@@ -78,11 +77,18 @@ public class ParquetGroupScan extends AbstractGroupScan {
   static final String ENDPOINT_BYTES_TIMER = MetricRegistry.name(ParquetGroupScan.class, "endpointBytes");
   static final String ASSIGNMENT_TIMER = MetricRegistry.name(ParquetGroupScan.class, "applyAssignments");
   static final String ASSIGNMENT_AFFINITY_HIST = MetricRegistry.name(ParquetGroupScan.class, "assignmentAffinity");
+  
   final Histogram assignmentAffinityStats = metrics.histogram(ASSIGNMENT_AFFINITY_HIST);
 
-  private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> mappings;
+  private ListMultimap<Integer, RowGroupInfo> mappings;
   private List<RowGroupInfo> rowGroupInfos;
-  private Stopwatch watch = new Stopwatch();
+  private final List<ReadEntryWithPath> entries;
+  private final Stopwatch watch = new Stopwatch();
+  private final ParquetFormatPlugin formatPlugin;
+  private final ParquetFormatConfig formatConfig;
+  private final FileSystem fs;
+  private final FieldReference ref;
+  private List<EndpointAffinity> endpointAffinities;
 
   private List<SchemaPath> columns;
 
@@ -91,79 +97,87 @@ public class ParquetGroupScan extends AbstractGroupScan {
   }
 
   @JsonProperty("storageengine")
-  public ParquetStorageEngineConfig getEngineConfig() {
-    return this.engineConfig;
+  public ParquetFormatConfig getEngineConfig() {
+    return this.formatConfig;
   }
 
-  private List<ReadEntryWithPath> entries;
-  private long totalBytes;
-  private Collection<DrillbitEndpoint> availableEndpoints;
-  private ParquetStorageEngine storageEngine;
-  private ParquetStorageEngineConfig engineConfig;
-  private FileSystem fs;
-  private final FieldReference ref;
-  private List<EndpointAffinity> endpointAffinities;
-
   @JsonCreator
-  public ParquetGroupScan(@JsonProperty("entries") List<ReadEntryWithPath> entries,
-                          @JsonProperty("storageengine") ParquetStorageEngineConfig storageEngineConfig,
-                          @JacksonInject StorageEngineRegistry engineRegistry,
-                          @JsonProperty("ref") FieldReference ref,
-                          @JsonProperty("columns") List<SchemaPath> columns
-                           )throws IOException, ExecutionSetupException {
+  public ParquetGroupScan( //
+      @JsonProperty("entries") List<ReadEntryWithPath> entries, //
+      @JsonProperty("storage") StoragePluginConfig storageConfig, //
+      @JsonProperty("format") FormatPluginConfig formatConfig, //
+      @JacksonInject StoragePluginRegistry engineRegistry, // 
+      @JsonProperty("ref") FieldReference ref, //
+      @JsonProperty("columns") List<SchemaPath> columns //
+      ) throws IOException, ExecutionSetupException {
     engineRegistry.init(DrillConfig.create());
     this.columns = columns;
-    this.storageEngine = (ParquetStorageEngine) engineRegistry.getEngine(storageEngineConfig);
-    this.availableEndpoints = storageEngine.getContext().getBits();
-    this.fs = storageEngine.getFileSystem();
-    this.engineConfig = storageEngineConfig;
+    if(formatConfig == null) formatConfig = new ParquetFormatConfig();
+    Preconditions.checkNotNull(storageConfig);
+    Preconditions.checkNotNull(formatConfig);
+    this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
+    Preconditions.checkNotNull(formatPlugin);
+    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    this.formatConfig = formatPlugin.getConfig();
     this.entries = entries;
     this.ref = ref;
-    readFooter();
-    calculateEndpointBytes();
+    this.readFooterFromEntries();
+
   }
 
-  public ParquetGroupScan(List<ReadEntryWithPath> entries, //
-                          ParquetStorageEngine storageEngine, // 
-                          FieldReference ref, //
-                          List<SchemaPath> columns) throws IOException {
-    this.storageEngine = storageEngine;
-    this.columns = columns;
-    this.engineConfig = storageEngine.getEngineConfig();
-    this.availableEndpoints = storageEngine.getContext().getBits();
-    this.fs = storageEngine.getFileSystem();
-    this.entries = entries;
+  public ParquetGroupScan(List<FileStatus> files, //
+      ParquetFormatPlugin formatPlugin, //
+      FieldReference ref) //
+      throws IOException {
+    this.formatPlugin = formatPlugin;
+    this.columns = null;
+    this.formatConfig = formatPlugin.getConfig();
+    this.fs = formatPlugin.getFileSystem().getUnderlying();
+    
+    this.entries = Lists.newArrayList();
+    for(FileStatus file : files){
+      entries.add(new ReadEntryWithPath(file.getPath().toString()));
+    }
+    
     this.ref = ref;
-    readFooter();
-    calculateEndpointBytes();
+    readFooter(files);
   }
 
-  private void readFooter() throws IOException {
+  private void readFooterFromEntries()  throws IOException {
+    List<FileStatus> files = Lists.newArrayList();
+    for(ReadEntryWithPath e : entries){
+      files.add(fs.getFileStatus(new Path(e.getPath())));
+    }
+    readFooter(files);
+  }
+  
+  private void readFooter(List<FileStatus> statuses) throws IOException {
     watch.reset();
     watch.start();
     Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
-    rowGroupInfos = new ArrayList();
+    
+    
+    rowGroupInfos = Lists.newArrayList();
     long start = 0, length = 0;
     ColumnChunkMetaData columnChunkMetaData;
-    for (ReadEntryWithPath readEntryWithPath : entries){
-      Path path = new Path(readEntryWithPath.getPath());
-      List<Footer> footers = ParquetFileReader.readFooters(this.storageEngine.getHadoopConfig(), path);
+    for (FileStatus status : statuses) {
+      List<Footer> footers = ParquetFileReader.readFooters(formatPlugin.getHadoopConfig(), status);
       if (footers.size() == 0) {
-        logger.warn("No footers found");
+        throw new IOException(String.format("Unable to find footer for file %s", status.getPath().getName()));
       }
-//      readEntryWithPath.getPath();
 
       for (Footer footer : footers) {
         int index = 0;
         ParquetMetadata metadata = footer.getParquetMetadata();
-        for (BlockMetaData rowGroup : metadata.getBlocks()){
+        for (BlockMetaData rowGroup : metadata.getBlocks()) {
           // need to grab block information from HDFS
           columnChunkMetaData = rowGroup.getColumns().iterator().next();
           start = columnChunkMetaData.getFirstDataPageOffset();
-          // this field is not being populated correctly, but the column chunks know their sizes, just summing them for now
-          //end = start + rowGroup.getTotalByteSize();
+          // this field is not being populated correctly, but the column chunks know their sizes, just summing them for
+          // now
+          // end = start + rowGroup.getTotalByteSize();
           length = 0;
-          for (ColumnChunkMetaData col : rowGroup.getColumns()){
+          for (ColumnChunkMetaData col : rowGroup.getColumns()) {
             length += col.getTotalSize();
           }
           String filePath = footer.getFile().toUri().getPath();
@@ -179,203 +193,109 @@ public class ParquetGroupScan extends AbstractGroupScan {
     logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
-  private void calculateEndpointBytes() {
-    Timer.Context tContext = metrics.timer(ENDPOINT_BYTES_TIMER).time();
-    watch.reset();
-    watch.start();
-    AffinityCalculator ac = new AffinityCalculator(fs, availableEndpoints);
-    for (RowGroupInfo e : rowGroupInfos) {
-      ac.setEndpointBytes(e);
-      totalBytes += e.getLength();
-    }
-    watch.stop();
-    tContext.stop();
-    logger.debug("Took {} ms to calculate EndpointBytes", watch.elapsed(TimeUnit.MILLISECONDS));
-  }
-
   @JsonIgnore
   public FileSystem getFileSystem() {
     return this.fs;
   }
 
-  public static class RowGroupInfo extends ReadEntryFromHDFS {
+  public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork {
 
-    private HashMap<DrillbitEndpoint,Long> endpointBytes;
-    private long maxBytes;
+    private EndpointByteMap byteMap;
     private int rowGroupIndex;
 
     @JsonCreator
     public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start,
-                        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+        @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
       super(path, start, length);
       this.rowGroupIndex = rowGroupIndex;
     }
 
-    @Override
-    public OperatorCost getCost() {
-      return new OperatorCost(1, 2, 1, 1);
+    public RowGroupReadEntry getRowGroupReadEntry() {
+      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
     }
 
-    @Override
-    public Size getSize() {
-      // TODO - these values are wrong, I cannot know these until after I read a file
-      return new Size(10, 10);
-    }
-
-    public HashMap<DrillbitEndpoint,Long> getEndpointBytes() {
-      return endpointBytes;
-    }
-
-    public void setEndpointBytes(HashMap<DrillbitEndpoint,Long> endpointBytes) {
-      this.endpointBytes = endpointBytes;
-    }
-
-    public void setMaxBytes(long bytes) {
-      this.maxBytes = bytes;
+    public int getRowGroupIndex() {
+      return this.rowGroupIndex;
     }
 
-    public long getMaxBytes() {
-      return maxBytes;
+    @Override
+    public int compareTo(CompleteWork o) {
+      return Long.compare(getTotalBytes(), o.getTotalBytes());
     }
 
-    public ParquetRowGroupScan.RowGroupReadEntry getRowGroupReadEntry() {
-      return new ParquetRowGroupScan.RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+    @Override
+    public long getTotalBytes() {
+      return this.getLength();
     }
 
-    public int getRowGroupIndex() {
-      return this.rowGroupIndex;
+    @Override
+    public EndpointByteMap getByteMap() {
+      return byteMap;
     }
-  }
 
-  private class ParquetReadEntryComparator implements Comparator<RowGroupInfo> {
-    public int compare(RowGroupInfo e1, RowGroupInfo e2) {
-      if (e1.getMaxBytes() == e2.getMaxBytes()) return 0;
-      return (e1.getMaxBytes() > e2.getMaxBytes()) ? 1 : -1;
+    public void setEndpointByteMap(EndpointByteMap byteMap) {
+      this.byteMap = byteMap;
     }
   }
 
   /**
-   *Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
+   * Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
    * rowGroup
+   * 
    * @return a list of EndpointAffinity objects
    */
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    watch.reset();
-    watch.start();
+    
     if (this.endpointAffinities == null) {
-      HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
-      for (RowGroupInfo entry : rowGroupInfos) {
-        for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
-          long bytes = entry.getEndpointBytes().get(d);
-          float affinity = (float)bytes / (float)totalBytes;
-          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes);
-          if (affinities.keySet().contains(d)) {
-            affinities.put(d, affinities.get(d) + affinity);
-          } else {
-            affinities.put(d, affinity);
-          }
+      BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits());
+      try{
+        for (RowGroupInfo rgi : rowGroupInfos) {
+          EndpointByteMap ebm = bmb.getEndpointByteMap(rgi);
+          rgi.setEndpointByteMap(ebm);
         }
+      } catch (IOException e) {
+        logger.warn("Failure while determining operator affinity.", e);
+        return Collections.emptyList();
       }
-      List<EndpointAffinity> affinityList = new LinkedList<>();
-      for (DrillbitEndpoint d : affinities.keySet()) {
-        logger.debug("Endpoint {} has affinity {}", d.getAddress(), affinities.get(d).floatValue());
-        affinityList.add(new EndpointAffinity(d,affinities.get(d).floatValue()));
-      }
-      this.endpointAffinities = affinityList;
+
+      this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
     }
-    watch.stop();
-    logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
     return this.endpointAffinities;
   }
 
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException {
 
-  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.00};
+    this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos);
 
-  /**
-   *
-   * @param incomingEndpoints
-   */
-  @Override
-  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    watch.reset();
-    watch.start();
-    final Timer.Context tcontext = metrics.timer(ASSIGNMENT_TIMER).time();
-    Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size(), String.format("Incoming endpoints %d " +
-            "is greater than number of row groups %d", incomingEndpoints.size(), rowGroupInfos.size()));
-    mappings = ArrayListMultimap.create();
-    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
-    List<DrillbitEndpoint> endpointLinkedlist = Lists.newLinkedList(incomingEndpoints);
-    for(double cutoff : ASSIGNMENT_CUTOFFS ){
-      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false);
-    }
-    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
-    tcontext.stop();
-    watch.stop();
-    logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
-    Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
-    Preconditions.checkState(!rowGroupInfos.isEmpty());
   }
 
-  public int fragmentPointer = 0;
+  @Override
+  public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < mappings.size() : String.format(
+        "Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(),
+        minorFragmentId);
 
-  /**
-   *
-   * @param endpointAssignments the mapping between fragment/endpoint and rowGroup
-   * @param endpoints the list of drillbits, ordered by the corresponding fragment
-   * @param rowGroups the list of rowGroups to assign
-   * @param requiredPercentage the percentage of max bytes required to make an assignment
-   * @param assignAll if true, will assign even if no affinity
-   */
-  private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints,
-                              List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) {
-    Collections.sort(rowGroups, new ParquetReadEntryComparator());
-    final boolean requireAffinity = requiredPercentage > 0;
-    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
-
-    if (maxAssignments < 1) maxAssignments = 1;
-
-    for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); iter.hasNext();){
-      RowGroupInfo rowGroupInfo = iter.next();
-      for (int i = 0; i < endpoints.size(); i++) {
-        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
-        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
-        Map<DrillbitEndpoint, Long> bytesPerEndpoint = rowGroupInfo.getEndpointBytes();
-        boolean haveAffinity = bytesPerEndpoint.containsKey(currentEndpoint) ;
-
-        if (assignAll ||
-                (!bytesPerEndpoint.isEmpty() &&
-                        (!requireAffinity || haveAffinity) &&
-                        (!endpointAssignments.containsKey(minorFragmentId) || endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
-                        (!requireAffinity || bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage))) {
-
-          endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry());
-          logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress());
-          if (bytesPerEndpoint.get(currentEndpoint) != null) {
-            assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
-          } else {
-            assignmentAffinityStats.update(0);
-          }
-          iter.remove();
-          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
-          break;
-        }
-      }
+    List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId);
 
-    }
+    Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(),
+        String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
+
+    return new ParquetRowGroupScan(formatPlugin, convertToReadEntries(rowGroupsForMinor), ref, columns);
   }
 
-  @Override
-  public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < mappings.size() : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(), minorFragmentId);
-    for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings.get(minorFragmentId)) {
-      logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
+  
+  
+  private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups){
+    List<RowGroupReadEntry> entries = Lists.newArrayList();
+    for (RowGroupInfo rgi : rowGroups) {
+      RowGroupReadEntry rgre = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(),
+          rgi.getRowGroupIndex());
+      entries.add(rgre);
     }
-    Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
-    return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId), ref,
-            columns);
+    return entries;
   }
-
   
   public FieldReference getRef() {
     return ref;
@@ -392,21 +312,21 @@ public class ParquetGroupScan extends AbstractGroupScan {
 
   @Override
   public OperatorCost getCost() {
-    //TODO Figure out how to properly calculate cost
-    return new OperatorCost(1,rowGroupInfos.size(),1,1);
+    // TODO Figure out how to properly calculate cost
+    return new OperatorCost(1, rowGroupInfos.size(), 1, 1);
   }
 
   @Override
   public Size getSize() {
     // TODO - this is wrong, need to populate correctly
-    return new Size(10,10);
+    return new Size(10, 10);
   }
 
   @Override
   @JsonIgnore
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    //TODO return copy of self
+    // TODO return copy of self
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 1e6c31a..9e1cc66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -49,7 +49,7 @@ import parquet.schema.PrimitiveType;
 
 import com.google.common.base.Joiner;
 
-public class ParquetRecordReader implements RecordReader {
+class ParquetRecordReader implements RecordReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
 
   // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index b3ce9b4..0e672d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -25,15 +25,16 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntryFromHDFS;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -48,43 +49,53 @@ import com.google.common.collect.Iterators;
 public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
 
-  public final StorageEngineConfig engineConfig;
-  private final ParquetStorageEngine parquetStorageEngine;
+  public final ParquetFormatConfig formatConfig;
+  private final ParquetFormatPlugin formatPlugin;
   private final List<RowGroupReadEntry> rowGroupReadEntries;
   private final FieldReference ref;
   private final List<SchemaPath> columns;
 
   @JsonCreator
-  public ParquetRowGroupScan(@JacksonInject StorageEngineRegistry registry,
-                             @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
-                             @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
-                             @JsonProperty("ref") FieldReference ref,
-                             @JsonProperty("columns") List<SchemaPath> columns
-                             ) throws ExecutionSetupException {
-    parquetStorageEngine = (ParquetStorageEngine) registry.getEngine(engineConfig);
+  public ParquetRowGroupScan( //
+      @JacksonInject StoragePluginRegistry registry, //
+      @JsonProperty("storage") StoragePluginConfig storageConfig, //
+      @JsonProperty("format") FormatPluginConfig formatConfig, //
+      @JsonProperty("entries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, //
+      @JsonProperty("ref") FieldReference ref, //
+      @JsonProperty("columns") List<SchemaPath> columns //
+  ) throws ExecutionSetupException {
+
+    if(formatConfig == null) formatConfig = new ParquetFormatConfig();
+    Preconditions.checkNotNull(storageConfig);
+    Preconditions.checkNotNull(formatConfig);
+    this.formatPlugin = (ParquetFormatPlugin) registry.getFormatPlugin(storageConfig, formatConfig);
+    Preconditions.checkNotNull(formatPlugin);
     this.rowGroupReadEntries = rowGroupReadEntries;
-    this.engineConfig = engineConfig;
+    this.formatConfig = formatPlugin.getConfig();
     this.ref = ref;
     this.columns = columns;
   }
 
-  public ParquetRowGroupScan(ParquetStorageEngine engine, ParquetStorageEngineConfig config,
-                              List<RowGroupReadEntry> rowGroupReadEntries, FieldReference ref,
-                              List<SchemaPath> columns
-                              ) {
-    parquetStorageEngine = engine;
-    engineConfig = config;
+  public ParquetRowGroupScan( //
+      ParquetFormatPlugin formatPlugin, //
+      List<RowGroupReadEntry> rowGroupReadEntries, //
+      FieldReference ref, //
+      List<SchemaPath> columns) {
+    this.formatPlugin = formatPlugin;
+    this.formatConfig = formatPlugin.getConfig();
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.ref = ref;
     this.columns = columns;
   }
 
+  @JsonProperty("entries")
   public List<RowGroupReadEntry> getRowGroupReadEntries() {
     return rowGroupReadEntries;
   }
 
-  public StorageEngineConfig getEngineConfig() {
-    return engineConfig;
+  @JsonProperty("storage")
+  public StoragePluginConfig getEngineConfig() {
+    return formatPlugin.getStorageConfig();
   }
 
   @Override
@@ -92,7 +103,6 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return null;
   }
 
-  
   public FieldReference getRef() {
     return ref;
   }
@@ -108,8 +118,8 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   }
 
   @JsonIgnore
-  public ParquetStorageEngine getStorageEngine(){
-    return parquetStorageEngine;
+  public ParquetFormatPlugin getStorageEngine() {
+    return formatPlugin;
   }
 
   @Override
@@ -120,8 +130,7 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    return new ParquetRowGroupScan(parquetStorageEngine, (ParquetStorageEngineConfig) engineConfig, rowGroupReadEntries,
-            ref, columns);
+    return new ParquetRowGroupScan(formatPlugin, rowGroupReadEntries, ref, columns);
   }
 
   @Override
@@ -133,36 +142,4 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
     return columns;
   }
 
-  public static class RowGroupReadEntry extends ReadEntryFromHDFS {
-
-    private int rowGroupIndex;
-
-    @parquet.org.codehaus.jackson.annotate.JsonCreator
-    public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
-                             @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
-      super(path, start, length);
-      this.rowGroupIndex = rowGroupIndex;
-    }
-
-    @Override
-    public OperatorCost getCost() {
-      return new OperatorCost(1, 2, 1, 1);
-    }
-
-    @Override
-    public Size getSize() {
-      // TODO - these values are wrong, I cannot know these until after I read a file
-      return new Size(10, 10);
-    }
-
-    @JsonIgnore
-    public RowGroupReadEntry getRowGroupReadEntry() {
-      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
-    }
-
-    public int getRowGroupIndex(){
-      return rowGroupIndex;
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 966a16b..17e7da2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -26,7 +26,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Stopwatch;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -38,8 +41,10 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
 
@@ -50,9 +55,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
   public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
+    
+    FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
+    
     // keep footers in a map to avoid re-reading them
     Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
-    for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
+    for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
       /*
       Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
       TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
@@ -63,11 +71,11 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       try {
         if ( ! footers.containsKey(e.getPath())){
           footers.put(e.getPath(),
-              ParquetFileReader.readFooter( rowGroupScan.getStorageEngine().getFileSystem().getConf(), new Path(e.getPath())));
+              ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath())));
         }
         readers.add(
             new ParquetRecordReader(
-                context, e.getPath(), e.getRowGroupIndex(), rowGroupScan.getStorageEngine().getFileSystem(),
+                context, e.getPath(), e.getRowGroupIndex(), fs,
                 rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
                 footers.get(e.getPath()),
                 rowGroupScan.getRef(),

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
deleted file mode 100644
index c17a9e3..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.IOException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.store.ClassPathFileSystem;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-import com.beust.jcommander.internal.Lists;
-
-public class ParquetSchemaProvider implements SchemaProvider{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetSchemaProvider.class);
-
-  public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
-  final ParquetStorageEngineConfig configuration;
-  final FileSystem fs;
-  final Configuration conf;
-
-  public ParquetSchemaProvider(ParquetStorageEngineConfig configuration, DrillConfig config){
-    this.configuration = configuration;
-    try {
-      this.conf = new Configuration();
-      this.conf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
-      this.conf.set(HADOOP_DEFAULT_NAME, configuration.getDfsName());
-      logger.debug("{}: {}",HADOOP_DEFAULT_NAME, configuration.getDfsName());
-      this.fs = FileSystem.get(conf);
-    } catch (IOException ie) {
-      throw new RuntimeException("Error setting up filesystem", ie);
-    }
-  }
-
-  @Override
-  public Object getSelectionBaseOnName(String tableName) {
-    try{
-      if(!fs.exists(new Path(tableName))) return null;
-      ReadEntryWithPath re = new ReadEntryWithPath(tableName);
-      return Lists.newArrayList(re);
-    }catch(Exception e){
-      logger.warn(String.format("Failure while checking table name %s.", tableName), e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
deleted file mode 100644
index ad9756e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import com.beust.jcommander.internal.Lists;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.drill.exec.store.mock.MockStorageEngine;
-
-import com.google.common.collect.ListMultimap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.ParquetFileReader;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-
-public class ParquetStorageEngine extends AbstractStorageEngine{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
-
-  private final DrillbitContext context;
-  static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-  private CodecFactoryExposer codecFactoryExposer;
-  private final ParquetSchemaProvider schemaProvider;
-  private final ParquetStorageEngineConfig engineConfig;
-
-  public ParquetStorageEngine(ParquetStorageEngineConfig configuration, DrillbitContext context){
-    this.context = context;
-    this.schemaProvider = new ParquetSchemaProvider(configuration, context.getConfig());
-    codecFactoryExposer = new CodecFactoryExposer(schemaProvider.conf);
-    this.engineConfig = configuration;
-  }
-
-  public Configuration getHadoopConfig() {
-    double y = 5;
-    int x = (int) y;
-    return schemaProvider.conf;
-  }
-
-  public FileSystem getFileSystem() {
-    return schemaProvider.fs;
-  }
-
-  public ParquetStorageEngineConfig getEngineConfig() {
-    return engineConfig;
-  }
-
-  public DrillbitContext getContext() {
-    return this.context;
-  }
-
-  @Override
-  public boolean supportsRead() {
-    return true;
-  }
-
-  @Override
-  public ParquetGroupScan getPhysicalScan(Scan scan) throws IOException {
-
-    ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
-        new TypeReference<ArrayList<ReadEntryWithPath>>() {});
-
-    return new ParquetGroupScan( readEntries, this, scan.getOutputReference(), null);
-  }
-
-  @Override
-  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
-    return null;
-  }
-
-  @Override
-  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
-    return null;
-  }
-
-
-  public CodecFactoryExposer getCodecFactoryExposer() {
-    return codecFactoryExposer;
-  }
-
-  @Override
-  public ParquetSchemaProvider getSchemaProvider() {
-    return schemaProvider;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
deleted file mode 100644
index f2d6124..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.util.HashMap;
-
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-import org.apache.drill.exec.store.DistributedStorageEngine;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("parquet")
-public class ParquetStorageEngineConfig extends StorageEngineConfigBase implements DistributedStorageEngine{
-
-  public String getDfsName() {
-    return dfsName;
-  }
-
-  // information needed to identify an HDFS instance
-  private String dfsName;
-  private HashMap<String,String> map;
-
-  @JsonCreator
-  public ParquetStorageEngineConfig(@JsonProperty("dfsName") String dfsName) {
-    this.dfsName = dfsName;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    ParquetStorageEngineConfig that = (ParquetStorageEngineConfig) o;
-
-    if (dfsName != null ? !dfsName.equals(that.dfsName) : that.dfsName != null) return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return dfsName != null ? dfsName.hashCode() : 0;
-  }
-  public void set(String key, String value) {
-    map.put(key, value);
-  }
-
-  public String get(String key) {
-    return map.get(key);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
new file mode 100644
index 0000000..986328e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RowGroupReadEntry extends ReadEntryFromHDFS {
+
+  private int rowGroupIndex;
+
+  @parquet.org.codehaus.jackson.annotate.JsonCreator
+  public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
+                           @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+    super(path, start, length);
+    this.rowGroupIndex = rowGroupIndex;
+  }
+
+  @JsonIgnore
+  public RowGroupReadEntry getRowGroupReadEntry() {
+    return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+  }
+
+  public int getRowGroupIndex(){
+    return rowGroupIndex;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
index 0321838..d9e498e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -103,6 +103,10 @@ public class VarLenBinaryReader {
     do {
       lengthVarFieldsInCurrentRecord = 0;
       for (ColumnReader columnReader : columns) {
+        if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){
+          rowGroupFinished = true;
+          break;
+        }
         if (columnReader.pageReadStatus.currentPage == null
             || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
           columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
new file mode 100644
index 0000000..d25abad
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.beust.jcommander.internal.Lists;
+import com.carrotsearch.hppc.ObjectFloatOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectFloatCursor;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import com.google.common.base.Stopwatch;
+
+public class AffinityCreator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCreator.class);
+  
+  public static <T extends CompleteWork> List<EndpointAffinity> getAffinityMap(List<T> work){
+    Stopwatch watch = new Stopwatch();
+    
+    long totalBytes = 0;
+    for (CompleteWork entry : work) {
+      totalBytes += entry.getTotalBytes();
+    }
+    
+    ObjectFloatOpenHashMap<DrillbitEndpoint> affinities = new ObjectFloatOpenHashMap<DrillbitEndpoint>();
+    for (CompleteWork entry : work) {
+      for (ObjectLongCursor<DrillbitEndpoint> cursor : entry.getByteMap()) {
+        long bytes = cursor.value;
+        float affinity = (float)bytes / (float)totalBytes;
+        logger.debug("Work: {} Endpoint: {} Bytes: {}", work, cursor.key.getAddress(), bytes);
+        affinities.putOrAdd(cursor.key, affinity, affinity);
+      }
+    }
+    
+    List<EndpointAffinity> affinityList = Lists.newLinkedList();
+    for (ObjectFloatCursor<DrillbitEndpoint> d : affinities) {
+      logger.debug("Endpoint {} has affinity {}", d.key.getAddress(), d.value);
+      affinityList.add(new EndpointAffinity(d.key, d.value));
+    }
+
+    logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS));
+    return affinityList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
new file mode 100644
index 0000000..eaa4f17
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
+/**
+ * The AssignmentCreator is responsible for assigning a set of work units to the available slices.
+ */
+public class AssignmentCreator<T extends CompleteWork> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
+
+  static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
+  private final ArrayListMultimap<Integer, T> mappings;
+  private final List<DrillbitEndpoint> endpoints;
+
+  
+
+  /**
+   * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
+   * Drillbits.
+   * 
+   * @param incomingEndpoints
+   *          The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
+   *          multiple slices on a node working on the task simultaneously.
+   * @param units
+   *          The work units to assign.
+   * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with 
+   */
+  public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
+      List<T> units) {
+    AssignmentCreator<T> creator = new AssignmentCreator<T>(incomingEndpoints, units);
+    return creator.mappings;
+  }
+
+  private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+    Stopwatch watch = new Stopwatch();
+    
+    Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
+        + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
+    this.mappings = ArrayListMultimap.create();
+    this.endpoints = Lists.newLinkedList(incomingEndpoints);
+
+    ArrayList<T> rowGroupList = new ArrayList<>(units);
+    for (double cutoff : ASSIGNMENT_CUTOFFS) {
+      scanAndAssign(rowGroupList, cutoff, false);
+    }
+    scanAndAssign(rowGroupList, 0.0, true);
+    
+    logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
+    Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
+    Preconditions.checkState(!units.isEmpty());
+
+  }
+
+  /**
+   * 
+   * @param mappings
+   *          the mapping between fragment/endpoint and rowGroup
+   * @param endpoints
+   *          the list of drillbits, ordered by the corresponding fragment
+   * @param workunits
+   *          the list of rowGroups to assign
+   * @param requiredPercentage
+   *          the percentage of max bytes required to make an assignment
+   * @param assignAll
+   *          if true, will assign even if no affinity
+   */
+  private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAll) {
+    Collections.sort(workunits);
+    int fragmentPointer = 0;
+    final boolean requireAffinity = requiredPercentage > 0;
+    int maxAssignments = (int) (workunits.size() / endpoints.size());
+
+    if (maxAssignments < 1)
+      maxAssignments = 1;
+
+    for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
+      T unit = iter.next();
+      for (int i = 0; i < endpoints.size(); i++) {
+        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+        EndpointByteMap endpointByteMap = unit.getByteMap();
+        boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
+
+        if (assignAll
+            || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
+                && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
+                .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
+
+          mappings.put(minorFragmentId, unit);
+          // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
+          // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
+          // if (bytesPerEndpoint.get(currentEndpoint) != null) {
+          // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
+          // } else {
+          // // assignmentAffinityStats.update(0);
+          // }
+          iter.remove();
+          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
+          break;
+        }
+      }
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
new file mode 100644
index 0000000..432c1d7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.internal.Lists;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableRangeMap;
+import com.google.common.collect.Range;
+
+public class BlockMapBuilder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockMapBuilder.class);
+  static final MetricRegistry metrics = DrillMetrics.getInstance();
+  static final String BLOCK_MAP_BUILDER_TIMER = MetricRegistry.name(BlockMapBuilder.class, "blockMapBuilderTimer");
+
+  private HashMap<Path,ImmutableRangeMap<Long,BlockLocation>> blockMapMap = new HashMap<>();
+  private Collection<DrillbitEndpoint> endpoints;
+  private FileSystem fs;
+  private HashMap<String,DrillbitEndpoint> endPointMap;
+
+  public BlockMapBuilder(FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
+    this.fs = fs;
+    this.endpoints = endpoints;
+    buildEndpointMap();
+  }
+
+  
+  public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException{
+    List<CompleteFileWork> work = Lists.newArrayList();
+    for(FileStatus f : files){
+      ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f);
+      if(!blockify){
+        work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
+        continue;
+      }
+      
+      for(Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()){
+        work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString()));
+      }
+    }
+    return work;
+  }
+  
+  private class FileStatusWork implements FileWork{
+    private FileStatus status;
+
+    public FileStatusWork(FileStatus status) {
+      if(status.isDir()) throw new IllegalStateException("FileStatus work only works with files, not directories.");
+      this.status = status;
+    }
+
+    @Override
+    public String getPath() {
+      return status.getPath().toString();
+    }
+
+    @Override
+    public long getStart() {
+      return 0;
+    }
+
+    @Override
+    public long getLength() {
+      return status.getLen();
+    }
+    
+    
+    
+  }
+  
+  private ImmutableRangeMap<Long,BlockLocation> buildBlockMap(Path path) throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    return buildBlockMap(status);
+  }
+  
+  /**
+   * Builds a mapping of block locations to file byte range
+   */
+  private ImmutableRangeMap<Long,BlockLocation> buildBlockMap(FileStatus status) throws IOException {
+    final Timer.Context context = metrics.timer(BLOCK_MAP_BUILDER_TIMER).time();
+    BlockLocation[] blocks;
+    ImmutableRangeMap<Long,BlockLocation> blockMap;
+    blocks = fs.getFileBlockLocations(status, 0 , status.getLen());
+    ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
+    for (BlockLocation block : blocks) {
+      long start = block.getOffset();
+      long end = start + block.getLength();
+      Range<Long> range = Range.closedOpen(start, end);
+      blockMapBuilder = blockMapBuilder.put(range, block);
+    }
+    blockMap = blockMapBuilder.build();
+    blockMapMap.put(status.getPath(), blockMap);
+    context.stop();
+    return blockMap;
+  }
+  
+  private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws IOException{
+    ImmutableRangeMap<Long,BlockLocation> blockMap  = blockMapMap.get(path);
+    if(blockMap == null){
+      blockMap = buildBlockMap(path);
+    }
+    return blockMap;
+  }
+  
+  private ImmutableRangeMap<Long,BlockLocation> getBlockMap(FileStatus status) throws IOException{
+    ImmutableRangeMap<Long,BlockLocation> blockMap  = blockMapMap.get(status.getPath());
+    if(blockMap == null){
+      blockMap = buildBlockMap(status);
+    }
+    return blockMap;
+  }
+
+  
+  /**
+   * For a given FileWork, calculate how many bytes are available on each on drillbit endpoint
+   *
+   * @param work the FileWork to calculate endpoint bytes for
+   * @throws IOException 
+   */
+  public EndpointByteMap getEndpointByteMap(FileWork work) throws IOException {
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    Path fileName = new Path(work.getPath());
+    
+    
+    ImmutableRangeMap<Long,BlockLocation> blockMap = getBlockMap(fileName);
+    EndpointByteMapImpl endpointByteMap = new EndpointByteMapImpl();
+    long start = work.getStart();
+    long end = start + work.getLength();
+    Range<Long> rowGroupRange = Range.closedOpen(start, end);
+
+    // Find submap of ranges that intersect with the rowGroup
+    ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
+
+    // Iterate through each block in this submap and get the host for the block location
+    for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
+      String[] hosts;
+      Range<Long> blockRange = block.getKey();
+      try {
+        hosts = block.getValue().getHosts();
+      } catch (IOException ioe) {
+        throw new RuntimeException("Failed to get hosts for block location", ioe);
+      }
+      Range<Long> intersection = rowGroupRange.intersection(blockRange);
+      long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+
+      // For each host in the current block location, add the intersecting bytes to the corresponding endpoint
+      for (String host : hosts) {
+        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
+        if(endpoint != null){
+          endpointByteMap.add(endpoint, bytes);
+        }else{
+          logger.debug("Failure finding Drillbit running on host {}.  Skipping affinity to that host.", host);
+        }
+      }
+    }
+
+    logger.debug("FileWork group ({},{}) max bytes {}", work.getPath(), work.getStart(), endpointByteMap.getMaxBytes());
+    
+    logger.debug("Took {} ms to set endpoint bytes", watch.stop().elapsed(TimeUnit.MILLISECONDS));
+    return endpointByteMap;
+  }
+
+  private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
+    return endPointMap.get(hostName);
+  }
+
+  /**
+   * Builds a mapping of Drillbit endpoints to hostnames
+   */
+  private void buildEndpointMap() {
+    Stopwatch watch = new Stopwatch();
+    watch.start();
+    endPointMap = new HashMap<String, DrillbitEndpoint>();
+    for (DrillbitEndpoint d : endpoints) {
+      String hostName = d.getAddress();
+      endPointMap.put(hostName, d);
+    }
+    watch.stop();
+    logger.debug("Took {} ms to build endpoint map", watch.elapsed(TimeUnit.MILLISECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
new file mode 100644
index 0000000..30b08f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import org.apache.drill.exec.store.dfs.easy.FileWork;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CompleteFileWork implements FileWork, CompleteWork{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
+
+  private long start;
+  private long length;
+  private String path;
+  private EndpointByteMap byteMap;
+  
+  public CompleteFileWork(EndpointByteMap byteMap, long start, long length, String path) {
+    super();
+    this.start = start;
+    this.length = length;
+    this.path = path;
+    this.byteMap = byteMap;
+  }
+
+  @Override
+  public int compareTo(CompleteWork o) {
+    return Long.compare(getTotalBytes(), o.getTotalBytes());
+  }
+
+  @Override
+  public long getTotalBytes() {
+    return length;
+  }
+
+  @Override
+  public EndpointByteMap getByteMap() {
+    return byteMap;
+  }
+
+  @Override
+  public String getPath() {
+    return path;
+  }
+
+  @Override
+  public long getStart() {
+    return start;
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+  
+  public FileWorkImpl getAsFileWork(){
+    return new FileWorkImpl(start, length, path);
+  }
+  
+  public static class FileWorkImpl implements FileWork{
+
+    @JsonCreator
+    public FileWorkImpl(@JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("path") String path) {
+      super();
+      this.start = start;
+      this.length = length;
+      this.path = path;
+    }
+
+    public long start;
+    public long length;
+    public String path;
+    
+    @Override
+    public String getPath() {
+      return path;
+    }
+
+    @Override
+    public long getStart() {
+      return start;
+    }
+
+    @Override
+    public long getLength() {
+      return length;
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java
new file mode 100644
index 0000000..44e27d4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteWork.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+
+/**
+ * Container that holds a complete work unit.  Can contain one or more partial units.
+ */
+public interface CompleteWork extends Comparable<CompleteWork>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteWork.class);
+  
+  public long getTotalBytes();
+  public EndpointByteMap getByteMap();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java
new file mode 100644
index 0000000..f543d75
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMap.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+
+/**
+ * Presents an interface that describes the number of bytes for a particular work unit associated with a particular DrillbitEndpoint.
+ */
+public interface EndpointByteMap extends Iterable<ObjectLongCursor<DrillbitEndpoint>>{
+
+  public boolean isSet(DrillbitEndpoint endpoint);
+  public long get(DrillbitEndpoint endpoint);
+  public boolean isEmpty();
+  public long getMaxBytes();
+  public void add(DrillbitEndpoint endpoint, long bytes);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java
new file mode 100644
index 0000000..64e52eb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/EndpointByteMapImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.carrotsearch.hppc.ObjectLongOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+
+public class EndpointByteMapImpl implements EndpointByteMap{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointByteMapImpl.class);
+  
+  private final ObjectLongOpenHashMap<DrillbitEndpoint> map = new ObjectLongOpenHashMap<>();
+  
+  private long maxBytes;
+  
+  public boolean isSet(DrillbitEndpoint endpoint){
+    return map.containsKey(endpoint);
+  }
+  
+  public long get(DrillbitEndpoint endpoint){
+    return map.get(endpoint);
+  }
+ 
+  public boolean isEmpty(){
+    return map.isEmpty();
+  }
+  
+  public void add(DrillbitEndpoint endpoint, long bytes){
+    assert endpoint != null;
+    maxBytes = Math.max(maxBytes, map.putOrAdd(endpoint, bytes, bytes)+1);
+  }
+
+  public long getMaxBytes() {
+    return maxBytes;
+  }
+
+  @Override
+  public Iterator<ObjectLongCursor<DrillbitEndpoint>> iterator() {
+    return map.iterator();
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java
new file mode 100644
index 0000000..bb8d950
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/PartialWork.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.schedule;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class PartialWork {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartialWork.class);
+  
+  private final long length;
+  private final DrillbitEndpoint[] locations;
+  
+  public PartialWork(long length, DrillbitEndpoint[] locations) {
+    super();
+    this.length = length;
+    this.locations = locations;
+  }
+  
+  public long getLength() {
+    return length;
+  }
+  public DrillbitEndpoint[] getLocations() {
+    return locations;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 329815d..baecc3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -346,7 +346,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
 
   private void runSQL(String sql) {
     try{
-      DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getSchemaFactory(), context.getFunctionRegistry());
+      DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getFactory(), context.getFunctionRegistry());
       LogicalPlan plan = sqlWorker.getPlan(sql);
       
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/storage-engines.json b/exec/java-exec/src/main/resources/storage-engines.json
deleted file mode 100644
index d1d0413..0000000
--- a/exec/java-exec/src/main/resources/storage-engines.json
+++ /dev/null
@@ -1,29 +0,0 @@
-{
-  "storage":{
-    "parquet-local" :
-      {
-        "type":"parquet",
-        "dfsName" : "file:///"
-      },
-    "parquet-cp" :
-      {
-        "type":"parquet",
-        "dfsName" : "classpath:///"
-      },
-    "jsonl" :
-      {
-        "type":"json",
-        "dfsName" : "file:///"
-      },
-    "json-cp" :
-      {
-        "type":"json",
-        "dfsName" : "classpath:///"
-      },
-    "parquet" :
-      {
-        "type":"parquet",
-        "dfsName" : "file:///"
-      }
-  }
-}
\ No newline at end of file


Mime
View raw message