drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [25/38] Integrate new SQL changes with Hive storage engine, moving to automatic file detection. Rename storage engine to storage plugin. Separate storage plugins from format plugins, updating Parquet and JSON to format engines. Refactor distribution log
Date Tue, 04 Mar 2014 08:07:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
new file mode 100644
index 0000000..5b8ccff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.TableFunction;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+
+public abstract class AbstractSchema implements Schema{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSchema.class);
+
+  private final SchemaHolder parentSchema;
+
+  protected final String name;
+  private static final Expression EXPRESSION = new DefaultExpression(Object.class);
+
+  public AbstractSchema(SchemaHolder parentSchema, String name) {
+    super();
+    this.parentSchema = parentSchema;
+    this.name = name;
+  }
+
+  
+  @Override
+  public SchemaPlus getParentSchema() {
+    return parentSchema.getSchema();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public Collection<TableFunction> getTableFunctions(String name) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public Set<String> getTableFunctionNames() {
+    return Collections.emptySet();
+  }
+  
+  
+
+  @Override
+  public Schema getSubSchema(String name) {
+    return null;
+  }
+
+  @Override
+  public Set<String> getSubSchemaNames() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Expression getExpression() {
+    return EXPRESSION;
+  }
+
+  @Override
+  public boolean isMutable() {
+    return false;
+  }
+
+  @Override
+  public DrillTable getTable(String name){
+    return null;
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return Collections.emptySet();
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
deleted file mode 100644
index 77fad2a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimap;
-
-public abstract class AbstractStorageEngine implements StorageEngine{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStorageEngine.class);
-
-  protected AbstractStorageEngine(){
-  }
-  
-  
-  @Override
-  public boolean supportsRead() {
-    return false;
-  }
-
-  @Override
-  public boolean supportsWrite() {
-    return false;
-  }
-
-  @Override
-  public List<QueryOptimizerRule> getOptimizerRules() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public RecordRecorder getWriter(FragmentContext context, WriteEntry writeEntry) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Multimap<DrillbitEndpoint, ReadEntry> getEntryAssignments(List<DrillbitEndpoint> assignments,
-      Collection<ReadEntry> entries) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Multimap<DrillbitEndpoint, WriteEntry> getWriteAssignments(List<DrillbitEndpoint> assignments,
-      Collection<ReadEntry> entries) {
-    throw new UnsupportedOperationException();
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
new file mode 100644
index 0000000..8baa72a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+
+public abstract class AbstractStoragePlugin implements StoragePlugin{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStoragePlugin.class);
+
+  protected AbstractStoragePlugin(){
+  }
+  
+  @Override
+  public boolean supportsRead() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public List<QueryOptimizerRule> getOptimizerRules() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
deleted file mode 100644
index 7061c58..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableRangeMap;
-import com.google.common.collect.Range;
-import com.codahale.metrics.*;
-import com.codahale.metrics.Timer;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-public class AffinityCalculator {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
-  static final MetricRegistry metrics = DrillMetrics.getInstance();
-  static final String BLOCK_MAP_BUILDER_TIMER = MetricRegistry.name(AffinityCalculator.class, "blockMapBuilderTimer");
-
-
-  HashMap<String,ImmutableRangeMap<Long,BlockLocation>> blockMapMap = new HashMap<>();
-  FileSystem fs;
-  String fileName;
-  Collection<DrillbitEndpoint> endpoints;
-  HashMap<String,DrillbitEndpoint> endPointMap;
-
-  public AffinityCalculator(FileSystem fs, Collection<DrillbitEndpoint> endpoints) {
-    this.fs = fs;
-    this.endpoints = endpoints;
-    buildEndpointMap();
-  }
-
-  /**
-   * Builds a mapping of block locations to file byte range
-   */
-  private void buildBlockMap(String fileName) {
-    final Timer.Context context = metrics.timer(BLOCK_MAP_BUILDER_TIMER).time();
-    BlockLocation[] blocks;
-    ImmutableRangeMap<Long,BlockLocation> blockMap;
-    try {
-      FileStatus file = fs.getFileStatus(new Path(fileName));
-      blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
-    } catch (IOException ioe) { throw new RuntimeException(ioe); }
-    ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>();
-    for (BlockLocation block : blocks) {
-      long start = block.getOffset();
-      long end = start + block.getLength();
-      Range<Long> range = Range.closedOpen(start, end);
-      blockMapBuilder = blockMapBuilder.put(range, block);
-    }
-    blockMap = blockMapBuilder.build();
-    blockMapMap.put(fileName, blockMap);
-    context.stop();
-  }
-  /**
-   * For a given RowGroup, calculate how many bytes are available on each on drillbit endpoint
-   *
-   * @param rowGroup the RowGroup to calculate endpoint bytes for
-   */
-  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
-    Stopwatch watch = new Stopwatch();
-    watch.start();
-    String fileName = rowGroup.getPath();
-    if (!blockMapMap.containsKey(fileName)) {
-      buildBlockMap(fileName);
-    }
-
-    ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(fileName);
-    HashMap<String,Long> hostMap = new HashMap<>();
-    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
-    long start = rowGroup.getStart();
-    long end = start + rowGroup.getLength();
-    Range<Long> rowGroupRange = Range.closedOpen(start, end);
-
-    // Find submap of ranges that intersect with the rowGroup
-    ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
-
-    // Iterate through each block in this submap and get the host for the block location
-    for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
-      String[] hosts;
-      Range<Long> blockRange = block.getKey();
-      try {
-        hosts = block.getValue().getHosts();
-      } catch (IOException ioe) {
-        throw new RuntimeException("Failed to get hosts for block location", ioe);
-      }
-      Range<Long> intersection = rowGroupRange.intersection(blockRange);
-      long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
-
-      // For each host in the current block location, add the intersecting bytes to the corresponding endpoint
-      for (String host : hosts) {
-        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
-        if (endpointByteMap.containsKey(endpoint)) {
-          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + bytes);
-        } else {
-          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
-        }
-      }
-    }
-
-    rowGroup.setEndpointBytes(endpointByteMap);
-    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? Collections.max(endpointByteMap.values()) : 0);
-    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), rowGroup.getStart(), rowGroup.getMaxBytes());
-    watch.stop();
-    logger.debug("Took {} ms to set endpoint bytes", watch.elapsed(TimeUnit.MILLISECONDS));
-  }
-
-  private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
-    return endPointMap.get(hostName);
-  }
-
-  /**
-   * Builds a mapping of drillbit endpoints to hostnames
-   */
-  private void buildEndpointMap() {
-    Stopwatch watch = new Stopwatch();
-    watch.start();
-    endPointMap = new HashMap<String, DrillbitEndpoint>();
-    for (DrillbitEndpoint d : endpoints) {
-      String hostName = d.getAddress();
-      endPointMap.put(hostName, d);
-    }
-    watch.stop();
-    logger.debug("Took {} ms to build endpoint map", watch.elapsed(TimeUnit.MILLISECONDS));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassPathFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassPathFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassPathFileSystem.java
index 5ca575b..e39e1a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassPathFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ClassPathFileSystem.java
@@ -61,13 +61,27 @@ public class ClassPathFileSystem extends FileSystem{
     throw new IOException(ERROR_MSG);
   }
 
+  private String getFileName(Path path){
+    String file = path.toString();
+    if(file.charAt(0) == '/'){
+      file = file.substring(1);
+    }
+    return file;
+  }
+  
   @Override
   public FileStatus getFileStatus(Path arg0) throws IOException {
-    URL url = Resources.getResource(arg0.toString());
+    String file = getFileName(arg0);
+
+    try{      
+    URL url = Resources.getResource(file);
     if(url == null){
       throw new IOException(String.format("Unable to find path %s.", arg0.toString()));
     }
-    return new FileStatus(-1, false, 1, 8096, System.currentTimeMillis(), arg0);
+    return new FileStatus(1, false, 1, 8096, System.currentTimeMillis(), arg0);
+    }catch(RuntimeException e){
+      throw new IOException(String.format("Failure trying to load file %s", arg0), e);
+    }
   }
 
   @Override
@@ -96,7 +110,8 @@ public class ClassPathFileSystem extends FileSystem{
 
   @Override
   public FSDataInputStream open(Path arg0, int arg1) throws IOException {
-    URL url = Resources.getResource(arg0.toString());
+    String file = getFileName(arg0);
+    URL url = Resources.getResource(file);
     if(url == null){
       throw new IOException(String.format("Unable to find path %s.", arg0.getName()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
new file mode 100644
index 0000000..b67c019
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("named")
+public class NamedStoragePluginConfig implements StoragePluginConfig{
+  public String name;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
new file mode 100644
index 0000000..a8d053c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+public interface SchemaFactory {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaFactory.class);
+  
+  public Schema add(SchemaPlus parent);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaHolder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaHolder.java
new file mode 100644
index 0000000..faaca2a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaHolder.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import net.hydromatic.optiq.SchemaPlus;
+
+/**
+ * Helper class to provide parent schema after initialization given Optiq's backwards schema build model.
+ */
+public class SchemaHolder {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaHolder.class);
+  
+  private SchemaPlus schema;
+
+  public SchemaHolder(){}
+  
+  public SchemaHolder(SchemaPlus schema){
+    this.schema = schema;
+  }
+  
+  public SchemaPlus getSchema() {
+    return schema;
+  }
+
+  public void setSchema(SchemaPlus schema) {
+    this.schema = schema;
+  }
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProvider.java
deleted file mode 100644
index 7611461..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProvider.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-public interface SchemaProvider {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaProvider.class);
-  
-  public Object getSelectionBaseOnName(String tableName);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProviderRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProviderRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProviderRegistry.java
deleted file mode 100644
index 3ae6943..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SchemaProviderRegistry.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.util.PathScanner;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SetupException;
-
-public class SchemaProviderRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaProviderRegistry.class);
-  
-  private Map<Object, Constructor<? extends SchemaProvider>> allProviders = new HashMap<Object, Constructor<? extends SchemaProvider>>();
-  private Map<StorageEngineConfig, SchemaProvider> activeEngines = new HashMap<StorageEngineConfig, SchemaProvider>();
-
-  private final DrillConfig config;
-  
-  public SchemaProviderRegistry(DrillConfig config){
-    init(config);
-    this.config = config;
-  }
-  
-  @SuppressWarnings("unchecked")
-  public void init(DrillConfig config){
-    Collection<Class<? extends SchemaProvider>> providers = PathScanner.scanForImplementations(SchemaProvider.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-    logger.debug("Loading schema providers {}", providers);
-    for(Class<? extends SchemaProvider> schema: providers){
-      int i =0;
-      for(Constructor<?> c : schema.getConstructors()){
-        Class<?>[] params = c.getParameterTypes();
-        if(params.length != 2 || params[1] != DrillConfig.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
-          logger.info("Skipping SchemaProvider constructor {} for provider class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillConfig)]", c, schema);
-          continue;
-        }
-        allProviders.put(params[0], (Constructor<? extends SchemaProvider>) c);
-        i++;
-      }
-      if(i == 0){
-        logger.debug("Skipping registration of StorageSchemaProvider {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", schema.getName());
-      }
-    }
-  }
-  
-  public SchemaProvider getSchemaProvider(StorageEngineConfig engineConfig) throws SetupException{
-    SchemaProvider engine = activeEngines.get(engineConfig);
-    if(engine != null) return engine;
-    Constructor<? extends SchemaProvider> c = allProviders.get(engineConfig.getClass());
-    if(c == null) throw new SetupException(String.format("Failure finding StorageSchemaProvider constructor for config %s", engineConfig));
-    try {
-      return c.newInstance(engineConfig, config);
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
-      if(t instanceof SetupException) throw ((SetupException) t);
-      throw new SetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
-    }
-  }
-  
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
deleted file mode 100644
index 00574e6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngine.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimap;
-
-public interface StorageEngine {
-  public boolean supportsRead();
-
-  public boolean supportsWrite();
-
-  public enum PartitionCapabilities {
-    NONE, HASH, RANGE;
-  }
-
-  public List<QueryOptimizerRule> getOptimizerRules();
-
-  /**
-   * Get the physical scan operator populated with a set of read entries required for the particular GroupScan (read) node.
-   * This is somewhat analogous to traditional MapReduce. The difference is, this is the most granular split paradigm.
-   * 
-   * @param scan
-   *          The configured scan entries.
-   * @return
-   * @throws IOException
-   */
-  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException;
-
-  public SchemaProvider getSchemaProvider();
-  
-  
-  /**
-   * Get the set of Drillbit endpoints that are available for each read entry. Note that it is possible for a read entry
-   * to have no Drillbit locations. In that case, the multimap will contain no values for that read entry.
-   * 
-   * @return Multimap of ReadEntry > List<DrillbitEndpoint> for ReadEntries with available locations.
-   */
-  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries);
-
-  /**
-   * Apply read entry assignments based on the list of actually assigned Endpoints. A storage engine is allowed to
-   * update or modify the read entries based on the nature of the assignments. For example, if two blocks are initially
-   * considered separate read entries but then the storage engine realizes that the assignments for those two reads are
-   * on the same system, the storage engine may decide to collapse those entries into a single read entry that covers
-   * both original read entries.
-   * 
-   * @param assignments
-   * @param entries
-   * @return
-   */
-  public Multimap<DrillbitEndpoint, ReadEntry> getEntryAssignments(List<DrillbitEndpoint> assignments,
-      Collection<ReadEntry> entries);
-
-  /**
-   * Get a particular reader for a fragment context.
-   * 
-   * @param context
-   * @param readEntry
-   * @return
-   * @throws IOException
-   */
-  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException;
-
-  /**
-   * Apply write entry assignments based on the list of actually assigned endpoints. A storage engine is allowed to
-   * rewrite the WriteEntries if desired based on the nature of the assignments. For example, a storage engine could
-   * hold off actually determining the specific level of partitioning required until it finds out exactly the number of
-   * nodes associated with the operation.
-   * 
-   * @param assignments
-   * @param entries
-   * @return
-   */
-  public Multimap<DrillbitEndpoint, WriteEntry> getWriteAssignments(List<DrillbitEndpoint> assignments,
-      Collection<ReadEntry> entries);
-
-  /**
-   * 
-   * @param context
-   * @param writeEntry
-   * @return
-   * @throws IOException
-   */
-  public RecordRecorder getWriter(FragmentContext context, WriteEntry writeEntry) throws IOException;
-
-  public interface WriteEntry {
-  }
-
-  public static class Cost {
-    public long disk;
-    public long network;
-    public long memory;
-    public long cpu;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
deleted file mode 100644
index 4cc7346..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.util.PathScanner;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.server.DrillbitContext;
-
-public class StorageEngineRegistry {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
-
-  private Map<Object, Constructor<? extends StorageEngine>> availableEngines = new HashMap<Object, Constructor<? extends StorageEngine>>();
-  private Map<StorageEngineConfig, StorageEngine> activeEngines = new HashMap<StorageEngineConfig, StorageEngine>();
-
-  private DrillbitContext context;
-  public StorageEngineRegistry(DrillbitContext context){
-    init(context.getConfig());
-    this.context = context;
-  }
-
-  @SuppressWarnings("unchecked")
-  public void init(DrillConfig config){
-    Collection<Class<? extends StorageEngine>> engines = PathScanner.scanForImplementations(StorageEngine.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-    logger.debug("Loading storage engines {}", engines);
-    for(Class<? extends StorageEngine> engine: engines){
-      int i =0;
-      for(Constructor<?> c : engine.getConstructors()){
-        Class<?>[] params = c.getParameterTypes();
-        if(params.length != 2 || params[1] != DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
-          logger.info("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
-          continue;
-        }
-        availableEngines.put(params[0], (Constructor<? extends StorageEngine>) c);
-        i++;
-      }
-      if(i == 0){
-        logger.debug("Skipping registration of StorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
-      }
-    }
-  }
-
-  public synchronized StorageEngine getEngine(StorageEngineConfig engineConfig) throws ExecutionSetupException{
-    StorageEngine engine = activeEngines.get(engineConfig);
-    if(engine != null) return engine;
-    Constructor<? extends StorageEngine> c = availableEngines.get(engineConfig.getClass());
-    if(c == null) throw new ExecutionSetupException(String.format("Failure finding StorageEngine constructor for config %s", engineConfig));
-    try {
-      engine = c.newInstance(engineConfig, context);
-      activeEngines.put(engineConfig, engine);
-      return engine;
-    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException)e).getTargetException() : e;
-      if(t instanceof ExecutionSetupException) throw ((ExecutionSetupException) t);
-      throw new ExecutionSetupException(String.format("Failure setting up new storage engine configuration for config %s", engineConfig), t);
-    }
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
new file mode 100644
index 0000000..2e54b0d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.util.List;
+
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+
+public interface StoragePlugin {
+  public boolean supportsRead();
+
+  public boolean supportsWrite();
+
+  public List<QueryOptimizerRule> getOptimizerRules();
+
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   * 
+   * @param scan
+   *          The configured scan with a storage engine specific selection.
+   * @return
+   * @throws IOException
+   */
+  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException;
+  
+  public Schema createAndAddSchema(SchemaPlus parent);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
new file mode 100644
index 0000000..b182abe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import net.hydromatic.linq4j.function.Function1;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Resources;
+
+public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
+
+  private Map<Object, Constructor<? extends StoragePlugin>> availableEngines = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+  private final ImmutableMap<String, StoragePlugin> engines;
+
+  private DrillbitContext context;
+  private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
+  
+  public StoragePluginRegistry(DrillbitContext context) {
+    try{
+    this.context = context;
+    init(context.getConfig());
+    this.engines = ImmutableMap.copyOf(createEngines());
+    }catch(RuntimeException e){
+      logger.error("Failure while loading storage engine registry.", e);
+      throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void init(DrillConfig config){
+    Collection<Class<? extends StoragePlugin>> engines = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage engines {}", engines);
+    for(Class<? extends StoragePlugin> engine: engines){
+      int i =0;
+      for(Constructor<?> c : engine.getConstructors()){
+        Class<?>[] params = c.getParameterTypes();
+        if(params.length != 3 || params[1] != DrillbitContext.class || !StoragePluginConfig.class.isAssignableFrom(params[0]) || params[2] != String.class){
+          logger.info("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext, String)]", c, engine);
+          continue;
+        }
+        availableEngines.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        i++;
+      }
+      if(i == 0){
+        logger.debug("Skipping registration of StorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+      }
+    }
+    
+  }
+  
+  private Map<String, StoragePlugin> createEngines(){
+    StorageEngines engines = null;
+    Map<String, StoragePlugin> activeEngines = new HashMap<String, StoragePlugin>();
+    try{
+      String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+      engines = context.getConfig().getMapper().readValue(enginesData, StorageEngines.class);
+    }catch(IOException e){
+      throw new IllegalStateException("Failure while reading storage engines data.", e);
+    }
+    
+    for(Map.Entry<String, StoragePluginConfig> config : engines){
+      try{
+        StoragePlugin plugin = create(config.getKey(), config.getValue());
+        activeEngines.put(config.getKey(), plugin);
+      }catch(ExecutionSetupException e){
+        logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
+      }
+    }
+    return activeEngines;
+  }
+
+  public StoragePlugin getEngine(String registeredStorageEngineName) throws ExecutionSetupException {
+    return engines.get(registeredStorageEngineName);
+  }
+  
+  public StoragePlugin getEngine(StoragePluginConfig config) throws ExecutionSetupException {
+    if(config instanceof NamedStoragePluginConfig){
+      return engines.get(((NamedStoragePluginConfig) config).name);
+    }else{
+      // TODO: for now, we'll throw away transient configs.  we really ought to clean these up.
+      return create(null, config);
+    }
+  }
+  
+  public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
+    StoragePlugin p = getEngine(storageConfig);
+    if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a stroage engine that wasn't of type FileSystemPlugin.  The actual type of plugin was %s.", p.getClass().getName()));
+    FileSystemPlugin storage = (FileSystemPlugin) p;
+    return storage.getFormatPlugin(formatConfig);
+  }
+
+  private StoragePlugin create(String name, StoragePluginConfig engineConfig) throws ExecutionSetupException {
+    StoragePlugin engine = null;
+    Constructor<? extends StoragePlugin> c = availableEngines.get(engineConfig.getClass());
+    if (c == null)
+      throw new ExecutionSetupException(String.format("Failure finding StorageEngine constructor for config %s",
+          engineConfig));
+    try {
+      engine = c.newInstance(engineConfig, context, name);
+      return engine;
+    } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+      Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
+      if (t instanceof ExecutionSetupException)
+        throw ((ExecutionSetupException) t);
+      throw new ExecutionSetupException(String.format(
+          "Failure setting up new storage engine configuration for config %s", engineConfig), t);
+    }
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePlugin>> iterator() {
+    return engines.entrySet().iterator();
+  }
+  
+  public DrillSchemaFactory getSchemaFactory(){
+    return schemaFactory;
+  }
+
+  public class DrillSchemaFactory implements Function1<SchemaPlus, Schema>{
+
+    @Override
+    public Schema apply(SchemaPlus parent) {
+      Schema defaultSchema = null;
+      for(Map.Entry<String, StoragePlugin> e : engines.entrySet()){
+        Schema s = e.getValue().createAndAddSchema(parent);
+        if(defaultSchema == null) defaultSchema = s;
+      }
+      return defaultSchema;
+    }
+    
+  }
+  
+
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
new file mode 100644
index 0000000..1c391de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
+
+public class BasicFormatMatcher extends FormatMatcher{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
+
+  private final List<Pattern> patterns;
+  private final MagicStringMatcher matcher;
+  protected final DrillFileSystem fs;
+  protected final FormatPlugin plugin;
+  
+  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
+    super();
+    this.patterns = ImmutableList.copyOf(patterns);
+    this.matcher = new MagicStringMatcher(magicStrings);
+    this.fs = fs;
+    this.plugin = plugin;
+  }
+  
+  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, String extension){
+    this(plugin, fs, //
+        Lists.newArrayList(Pattern.compile(".*\\." + extension)), //
+        (List<MagicString>) Collections.EMPTY_LIST);
+  }
+  
+  @Override
+  public boolean supportDirectoryReads() {
+    return false;
+  }
+
+  @Override
+  public FormatSelection isReadable(FileSelection file) throws IOException {
+    if(isReadable(file.getFirstPath(fs))){
+      return new FormatSelection(plugin.getConfig(), file);
+    }
+    return null;
+  }
+
+  protected final boolean isReadable(FileStatus status) throws IOException {
+    for(Pattern p : patterns){
+      if(p.matcher(status.getPath().toString()).matches()){
+        return true;
+      }
+    }
+    
+    if(matcher.matches(status)) return true;
+    return false;
+  }
+  
+  
+  @Override
+  @JsonIgnore
+  public FormatPlugin getFormatPlugin() {
+    return plugin;
+  }
+
+
+  private class MagicStringMatcher{
+    
+    private List<RangeMagics> ranges;
+    
+    public MagicStringMatcher(List<MagicString> magicStrings){
+      ranges = Lists.newArrayList();
+      for(MagicString ms : magicStrings){
+        ranges.add(new RangeMagics(ms));
+      }
+    }
+    
+    public boolean matches(FileStatus status) throws IOException{
+      if(ranges.isEmpty()) return false;
+      final Range<Long> fileRange = Range.closedOpen( 0L, status.getLen());
+      
+      try(FSDataInputStream is = fs.open(status.getPath()).getInputStream()){
+        for(RangeMagics rMagic : ranges){
+          Range<Long> r = rMagic.range;
+          if(!fileRange.encloses(r)) continue;
+          int len = (int) (r.upperEndpoint() - r.lowerEndpoint());
+          byte[] bytes = new byte[len];
+          is.readFully(r.lowerEndpoint(), bytes);
+          for(byte[] magic : rMagic.magics){
+            if(Arrays.equals(magic, bytes)) return true;  
+          }
+          
+        }
+      }
+      return false;
+    }
+    
+    private class RangeMagics{
+      Range<Long> range;
+      byte[][] magics;
+      
+      public RangeMagics(MagicString ms){
+        this.range = Range.closedOpen( ms.getOffset(), (long) ms.getBytes().length);
+        this.magics = new byte[][]{ms.getBytes()};
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
new file mode 100644
index 0000000..5ab2c1a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Lists;
+
+/**
+ * Jackson serializable description of a file selection. Maintains an internal set of file statuses. However, also
+ * serializes out as a list of Strings. All accessing methods first regenerate the FileStatus objects if they are not
+ * available.  This allows internal movement of FileStatus and the ability to serialize if need be.
+ */
+public class FileSelection {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSelection.class);
+
+  @JsonIgnore
+  private List<FileStatus> statuses;
+
+  public List<String> files;
+
+  public FileSelection() {
+  }
+  
+  
+  public FileSelection(List<String> files, boolean dummy){
+    this.files = files;
+  }
+
+  public FileSelection(List<FileStatus> statuses) {
+    this.statuses = statuses;
+    this.files = Lists.newArrayList();
+    for (FileStatus f : statuses) {
+      files.add(f.getPath().toString());
+    }
+  }
+
+  public boolean containsDirectories(DrillFileSystem fs) throws IOException {
+    init(fs);
+    for (FileStatus p : statuses) {
+      if (p.isDir()) return true;
+    }
+    return false;
+  }
+
+  public FileSelection minusDirectorries(DrillFileSystem fs) throws IOException {
+    init(fs);
+    List<FileStatus> newList = Lists.newArrayList();
+    for (FileStatus p : statuses) {
+      if (p.isDir()) {
+        List<FileStatus> statuses = fs.list(true, p.getPath());
+        for (FileStatus s : statuses) {
+          newList.add(s);
+        }
+
+      } else {
+        newList.add(p);
+      }
+    }
+    return new FileSelection(newList);
+  }
+
+  public FileStatus getFirstPath(DrillFileSystem fs) throws IOException {
+    init(fs);
+    return statuses.get(0);
+  }
+
+  public List<String> getAsFiles(){
+    if(!files.isEmpty()) return files;
+    if(statuses == null) return Collections.emptyList();
+    List<String> files = Lists.newArrayList();
+    for(FileStatus s : statuses){
+      files.add(s.getPath().toString());
+    }
+    return files;
+  }
+  
+  private void init(DrillFileSystem fs) throws IOException {
+    if (files != null && statuses == null) {
+      statuses = Lists.newArrayList();
+      for (String p : files) {
+        statuses.add(fs.getFileStatus(new Path(p)));
+      }
+    }
+  }
+
+  public List<FileStatus> getFileStatusList(DrillFileSystem fs) throws IOException {
+    init(fs);
+    return statuses;
+  }
+
+  public static FileSelection create(DrillFileSystem fs, Path parent, String path) throws IOException {
+    if ( !(path.contains("*") || path.contains("?")) ) {
+      Path p = new Path(parent, path);
+      FileStatus status = fs.getFileStatus(p);
+      return new FileSelection(Collections.singletonList(status));
+    } else {
+      FileStatus[] status = fs.getUnderlying().globStatus(new Path(parent, path));
+      if(status == null || status.length == 0) return null;
+      return new FileSelection(Lists.newArrayList(status));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
new file mode 100644
index 0000000..455c4b2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.util.Map;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("file")
+public class FileSystemConfig implements StoragePluginConfig{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemConfig.class);
+  
+  public String connection;
+  public Map<String, String> workspaces;
+  public Map<String, FormatPluginConfig> formats;
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemFormatConfig.java
new file mode 100644
index 0000000..be396a8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemFormatConfig.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+public class FileSystemFormatConfig<T extends FormatPluginConfig> implements StoragePluginConfig{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemFormatConfig.class);
+  
+  public T getFormatConfig(){
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
new file mode 100644
index 0000000..3762c1a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.ClassPathFileSystem;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
+import org.apache.hadoop.conf.Configuration;
+
+import com.beust.jcommander.internal.Lists;
+import com.beust.jcommander.internal.Maps;
+
+/**
+ * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
+ * LocalFileSystem, as well Apache Drill specific CachedFileSystem, ClassPathFileSystem and LocalSyncableFileSystem.
+ * Tables are file names, directories and path patterns. This storage engine delegates to FSFormatEngines but shares
+ * references to the FileSystem configuration and path management.
+ */
+public class FileSystemPlugin extends AbstractStoragePlugin{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+
+  private final FileSystemSchemaFactory schemaFactory;
+  private Map<String, FormatPlugin> formatsByName;
+  private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
+  private FileSystemConfig config;
+  private DrillbitContext context;
+  private final DrillFileSystem fs;
+  
+  public FileSystemPlugin(FileSystemConfig config, DrillbitContext context, String name) throws ExecutionSetupException{
+    try{
+      this.config = config;
+      this.context = context;
+      
+      Configuration fsConf = new Configuration();
+      fsConf.set("fs.default.name", config.connection);
+      fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
+      this.fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
+      this.formatsByName = FormatCreator.getFormatPlugins(context, fs, config);
+      List<FormatMatcher> matchers = Lists.newArrayList();
+      formatPluginsByConfig = Maps.newHashMap();
+      for(FormatPlugin p : formatsByName.values()){
+        matchers.add(p.getMatcher());
+        formatPluginsByConfig.put(p.getConfig(), p);
+      }
+      
+      List<WorkspaceSchemaFactory> factories = null;
+      if(config.workspaces == null || config.workspaces.isEmpty()){
+        factories = Collections.singletonList(new WorkspaceSchemaFactory("default", name, fs, "/", matchers));
+      }else{
+        factories = Lists.newArrayList();
+        for(Map.Entry<String, String> space : config.workspaces.entrySet()){
+          factories.add(new WorkspaceSchemaFactory(space.getKey(), name, fs, space.getValue(), matchers));
+        }
+      }
+      this.schemaFactory = new FileSystemSchemaFactory(name, factories);
+    }catch(IOException e){
+      throw new ExecutionSetupException("Failure setting up file system plugin.", e);
+    }
+  }
+  
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+  
+  @Override
+  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
+    FormatSelection formatSelection = scan.getSelection().getWith(context.getConfig(), FormatSelection.class);
+    FormatPlugin plugin;
+    if(formatSelection.getFormat() instanceof NamedFormatPluginConfig){
+      plugin = formatsByName.get( ((NamedFormatPluginConfig) formatSelection.getFormat()).name);
+    }else{
+      plugin = formatPluginsByConfig.get(formatSelection.getFormat());
+    }
+    if(plugin == null) throw new IOException(String.format("Failure getting requested format plugin named '%s'.  It was not one of the format plugins registered.", formatSelection.getFormat()));
+    return plugin.getGroupScan(scan.getOutputReference(), formatSelection.getSelection());
+  }
+  
+  @Override
+  public Schema createAndAddSchema(SchemaPlus parent) {
+    return schemaFactory.add(parent);
+  }
+  
+  public FormatPlugin getFormatPlugin(String name){
+    return formatsByName.get(name);
+  }
+  
+  public FormatPlugin getFormatPlugin(FormatPluginConfig config){
+    if(config instanceof NamedFormatPluginConfig){
+      return formatsByName.get(((NamedFormatPluginConfig) config).name);
+    }else{
+      return formatPluginsByConfig.get(config);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
new file mode 100644
index 0000000..fbea81c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.TableFunction;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.SchemaHolder;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
+
+import com.google.common.collect.Maps;
+
+
+/**
+ * This is the top level schema that responds to root level path requests. Also supports
+ */
+public class FileSystemSchemaFactory implements SchemaFactory{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class);
+
+  private List<WorkspaceSchemaFactory> factories;
+  private String schemaName;
+  
+  
+  public FileSystemSchemaFactory(String schemaName, List<WorkspaceSchemaFactory> factories) {
+    super();
+    this.schemaName = schemaName;
+    this.factories = factories;
+  }
+
+  @Override
+  public Schema add(SchemaPlus parent) {
+    FileSystemSchema schema = new FileSystemSchema(parent, schemaName);
+    schema.setHolder(parent.add(schema));
+    return schema;
+  }
+
+  public class FileSystemSchema extends AbstractSchema{
+
+    private final WorkspaceSchema defaultSchema;
+    private final Map<String, WorkspaceSchema> schemaMap = Maps.newHashMap();
+    final SchemaHolder selfHolder = new SchemaHolder();
+    
+    public FileSystemSchema(SchemaPlus parentSchema, String name) {
+      super(new SchemaHolder(parentSchema), name);
+      for(WorkspaceSchemaFactory f :  factories){
+        WorkspaceSchema s = f.create(selfHolder);
+        schemaMap.put(s.getName(), s);
+      }
+      
+      defaultSchema = schemaMap.get("default");
+    }
+
+    void setHolder(SchemaPlus plusOfThis){
+      selfHolder.setSchema(plusOfThis);
+      for(WorkspaceSchema s : schemaMap.values()){
+        plusOfThis.add(s);
+      }
+    }
+    
+    @Override
+    public DrillTable getTable(String name) {
+      return defaultSchema.getTable(name);
+    }
+
+    @Override
+    public Collection<TableFunction> getTableFunctions(String name) {
+      return defaultSchema.getTableFunctions(name);
+    }
+
+    @Override
+    public Set<String> getTableFunctionNames() {
+      return defaultSchema.getTableFunctionNames();
+    }
+
+    @Override
+    public Schema getSubSchema(String name) {
+      return defaultSchema.getSubSchema(name);
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return defaultSchema.getSubSchemaNames();
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return defaultSchema.getTableNames();
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
new file mode 100644
index 0000000..7ce8c50
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.util.ConstructorChecker;
+import org.apache.drill.common.util.PathScanner;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+
+import com.google.common.collect.Maps;
+
+public class FormatCreator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
+  
+  
+  static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPlugin.class);
+  static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
+  
+  static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig){
+    final DrillConfig config = context.getConfig(); 
+    Map<String, FormatPlugin> plugins = Maps.newHashMap();
+
+    Collection<Class<? extends FormatPlugin>> pluginClasses = PathScanner.scanForImplementations(FormatPlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+
+    
+    if(storageConfig.formats == null || storageConfig.formats.isEmpty()){
+      
+      for(Class<? extends FormatPlugin> pluginClass: pluginClasses){
+        for(Constructor<?> c : pluginClass.getConstructors()){
+          try{
+            
+            if(!DEFAULT_BASED.check(c)) continue;
+            FormatPlugin plugin = (FormatPlugin) c.newInstance(null, context, fileSystem, storageConfig);
+            plugins.put(plugin.getName(), plugin);
+          }catch(Exception e){
+            logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
+          }
+        }
+      }
+      
+    }else{
+      
+      Map<Class<?>, Constructor<?>> constructors = Maps.newHashMap();
+      for(Class<? extends FormatPlugin> pluginClass: pluginClasses){
+        for(Constructor<?> c : pluginClass.getConstructors()){
+          try{
+            if(!FORMAT_BASED.check(c)) continue;
+            constructors.put(pluginClass, c);
+          }catch(Exception e){
+            logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
+          }
+        }
+      }
+      
+      for(Map.Entry<String, FormatPluginConfig> e : storageConfig.formats.entrySet()){
+        Constructor<?> c = constructors.get(e.getValue().getClass());
+        if(c == null){
+          logger.warn("Unable to find constructor for storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName());
+          continue;
+        }
+        try{
+        plugins.put(e.getKey(), (FormatPlugin) c.newInstance(e.getKey(), context, fileSystem, storageConfig, e.getValue()));
+        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e1) {
+          logger.warn("Failure initializing storage config named '{}' of type '{}'.", e.getKey(), e.getValue().getClass().getName(), e1);
+        }
+      }
+      
+      
+    }
+    
+    return plugins;
+  }
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
new file mode 100644
index 0000000..e8521e4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatMatcher.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.IOException;
+
+public abstract class FormatMatcher {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatMatcher.class);
+
+  public abstract boolean supportDirectoryReads();
+  public abstract FormatSelection isReadable(FileSelection file) throws IOException;
+  public abstract FormatPlugin getFormatPlugin();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
new file mode 100644
index 0000000..a37142e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.QueryOptimizerRule;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+
+/**
+ * Similar to a storage engine but built specifically to work within a FileSystem context.
+ */
+public interface FormatPlugin {
+
+  public boolean supportsRead();
+
+  public boolean supportsWrite();
+  
+  public FormatMatcher getMatcher();
+  
+  public AbstractGroupScan getGroupScan(FieldReference outputRef, FileSelection selection) throws IOException;
+
+  public List<QueryOptimizerRule> getOptimizerRules();
+  
+  public FormatPluginConfig getConfig();
+  public StoragePluginConfig getStorageConfig();
+  public DrillFileSystem getFileSystem();
+  public DrillbitContext getContext();
+  public String getName();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
new file mode 100644
index 0000000..5cf6ce3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class FormatSelection {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatSelection.class);
+  
+  private FormatPluginConfig format;
+  private FileSelection selection;
+
+  public FormatSelection(){}
+  
+  @JsonCreator
+  public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<String> files){
+    this.format = format;
+    this.selection = new FileSelection(files, true);
+  }
+  
+  public FormatSelection(FormatPluginConfig format, FileSelection selection) {
+    super();
+    this.format = format;
+    this.selection = selection;
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormat(){
+    return format;
+  }
+  
+  @JsonProperty("files")
+  public List<String> getAsFiles(){
+    return selection.getAsFiles();
+  }
+  
+  @JsonIgnore
+  public FileSelection getSelection(){
+    return selection;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MagicString.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MagicString.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MagicString.java
new file mode 100644
index 0000000..f514388
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MagicString.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+public class MagicString {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MagicString.class);
+  
+  private long offset;
+  private byte[] bytes;
+  
+  public MagicString(long offset, byte[] bytes) {
+    super();
+    this.offset = offset;
+    this.bytes = bytes;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public byte[] getBytes() {
+    return bytes;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
new file mode 100644
index 0000000..6b98ea2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+
+@JsonTypeName("named")
+public class NamedFormatPluginConfig implements FormatPluginConfig{
+  public String name;
+}


Mime
View raw message