drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jacq...@apache.org
Subject [23/38] Integrate new SQL changes with Hive storage engine, moving to automatic file detection. Rename storage engine to storage plugin. Separate storage plugins from format plugins, updating Parquet and JSON to format engines. Refactor distribution log
Date Tue, 04 Mar 2014 08:07:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
new file mode 100644
index 0000000..a1d575c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.io.IOException;
+
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class HiveStoragePlugin extends AbstractStoragePlugin {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
+  
+  private final HiveStoragePluginConfig config;
+  private final HiveConf hiveConf;
+  private final HiveSchemaFactory schemaFactory;
+  private final DrillbitContext context;
+  private final String name;
+
+  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) throws ExecutionSetupException {
+    this.config = config;
+    this.context = context;
+    this.schemaFactory = new HiveSchemaFactory(config, name, config.getHiveConf());
+    this.hiveConf = config.getHiveConf();
+    this.name = name;
+  }
+
+  public HiveStoragePluginConfig getConfig() {
+    return config;
+  }
+
+  public String getName(){
+    return name;
+  }
+  
+  public DrillbitContext getContext() {
+    return context;
+  }
+
+  @Override
+  public HiveScan getPhysicalScan(Scan scan) throws IOException {
+    HiveReadEntry hiveReadEntry = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
+    try {
+      return new HiveScan(hiveReadEntry, this, null);
+    } catch (ExecutionSetupException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Schema createAndAddSchema(SchemaPlus parent) {
+    return schemaFactory.add(parent);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
new file mode 100644
index 0000000..c9d76e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.Map;
+
+@JsonTypeName("hive")
+public class HiveStoragePluginConfig extends StoragePluginConfigBase {
+  @JsonProperty
+  public Map<String, String> configProps;
+  @JsonIgnore
+  private HiveConf hiveConf;
+
+  @JsonIgnore
+  public HiveConf getHiveConf() {
+    if (hiveConf == null) {
+      hiveConf = new HiveConf();
+      if (configProps != null) {
+        for (Map.Entry<String, String> entry : configProps.entrySet()) {
+          hiveConf.set(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+
+    return hiveConf;
+  }
+
+  @JsonCreator
+  public HiveStoragePluginConfig(@JsonProperty("config") Map<String, String> props) {
+    this.configProps = props;
+  }
+
+  @Override
+  public int hashCode() {
+    return configProps != null ? configProps.hashCode() : 0;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    HiveStoragePluginConfig that = (HiveStoragePluginConfig) o;
+
+    if (configProps != null ? !configProps.equals(that.configProps) : that.configProps != null) return false;
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
new file mode 100644
index 0000000..abec2c5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/DrillHiveTable.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.schema;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.hive.HiveReadEntry;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.SqlCollation;
+import org.eigenbase.sql.type.SqlTypeName;
+
+public class DrillHiveTable extends DrillTable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillHiveTable.class);
+  
+  private final Table hiveTable;
+  
+  public DrillHiveTable(String storageEngineName, HiveReadEntry readEntry, StoragePluginConfig storageEngineConfig) {
+    super(storageEngineName, readEntry, storageEngineConfig);
+    this.hiveTable = new org.apache.hadoop.hive.ql.metadata.Table(readEntry.getTable());
+  }
+
+  @Override
+  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+    ArrayList<RelDataType> typeList = new ArrayList<>();
+    ArrayList<String> fieldNameList = new ArrayList<>();
+
+    ArrayList<StructField> hiveFields = hiveTable.getFields();
+    for(StructField hiveField : hiveFields) {
+      fieldNameList.add(hiveField.getFieldName());
+      typeList.add(getRelDataTypeFromHiveType(typeFactory, hiveField.getFieldObjectInspector()));
+    }
+
+    for (FieldSchema field : hiveTable.getPartitionKeys()) {
+      fieldNameList.add(field.getName());
+      typeList.add(getRelDataTypeFromHiveTypeString(typeFactory, field.getType()));
+    }
+
+    final RelDataType rowType = typeFactory.createStructType(typeList, fieldNameList);
+    return rowType;
+  }
+
+  private RelDataType getRelDataTypeFromHiveTypeString(RelDataTypeFactory typeFactory, String type) {
+    switch(type) {
+      case "boolean":
+        return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+
+      case "tinyint":
+        return typeFactory.createSqlType(SqlTypeName.TINYINT);
+
+      case "smallint":
+        return typeFactory.createSqlType(SqlTypeName.SMALLINT);
+
+      case "int":
+        return typeFactory.createSqlType(SqlTypeName.INTEGER);
+
+      case "bigint":
+        return typeFactory.createSqlType(SqlTypeName.BIGINT);
+
+      case "float":
+        return typeFactory.createSqlType(SqlTypeName.FLOAT);
+
+      case "double":
+        return typeFactory.createSqlType(SqlTypeName.DOUBLE);
+
+      case "date":
+        return typeFactory.createSqlType(SqlTypeName.DATE);
+
+      case "timestamp":
+        return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+
+      case "binary":
+        return typeFactory.createSqlType(SqlTypeName.BINARY);
+
+      case "decimal":
+        return typeFactory.createSqlType(SqlTypeName.DECIMAL);
+
+      case "string":
+      case "varchar": {
+        return typeFactory.createTypeWithCharsetAndCollation(
+                typeFactory.createSqlType(SqlTypeName.VARCHAR), /*input type*/
+                Charset.forName("ISO-8859-1"), /*unicode char set*/
+                SqlCollation.IMPLICIT /* TODO: need to decide if implicit is the correct one */
+        );
+      }
+
+      default:
+        throw new RuntimeException("Unknown or unsupported hive type: " + type);
+    }
+  }
+
+  private RelDataType getRelDataTypeFromHivePrimitiveType(RelDataTypeFactory typeFactory, PrimitiveObjectInspector poi) {
+    switch(poi.getPrimitiveCategory()) {
+      case BOOLEAN:
+        return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+      case BYTE:
+        return typeFactory.createSqlType(SqlTypeName.TINYINT);
+
+      case SHORT:
+        return typeFactory.createSqlType(SqlTypeName.SMALLINT);
+
+      case INT:
+        return typeFactory.createSqlType(SqlTypeName.INTEGER);
+
+      case LONG:
+        return typeFactory.createSqlType(SqlTypeName.BIGINT);
+
+      case FLOAT:
+        return typeFactory.createSqlType(SqlTypeName.FLOAT);
+
+      case DOUBLE:
+        return typeFactory.createSqlType(SqlTypeName.DOUBLE);
+
+      case DATE:
+        return typeFactory.createSqlType(SqlTypeName.DATE);
+
+      case TIMESTAMP:
+        return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+
+      case BINARY:
+        return typeFactory.createSqlType(SqlTypeName.BINARY);
+
+      case DECIMAL:
+        return typeFactory.createSqlType(SqlTypeName.DECIMAL);
+
+      case STRING:
+      case VARCHAR: {
+        return typeFactory.createTypeWithCharsetAndCollation(
+          typeFactory.createSqlType(SqlTypeName.VARCHAR), /*input type*/
+          Charset.forName("ISO-8859-1"), /*unicode char set*/
+          SqlCollation.IMPLICIT /* TODO: need to decide if implicit is the correct one */
+        );
+      }
+
+      case UNKNOWN:
+      case VOID:
+      default:
+        throw new RuntimeException("Unknown or unsupported hive type");
+    }
+  }
+
+  private RelDataType getRelDataTypeFromHiveType(RelDataTypeFactory typeFactory, ObjectInspector oi) {
+    switch(oi.getCategory()) {
+      case PRIMITIVE:
+        return getRelDataTypeFromHivePrimitiveType(typeFactory, ((PrimitiveObjectInspector) oi));
+      case LIST:
+      case MAP:
+      case STRUCT:
+      case UNION:
+      default:
+        throw new RuntimeException("Unknown or unsupported hive type");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
new file mode 100644
index 0000000..e4ff372
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveDatabaseSchema.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.schema;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaHolder;
+import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory.HiveSchema;
+
+import com.google.common.collect.Sets;
+
+public class HiveDatabaseSchema extends AbstractSchema{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveDatabaseSchema.class);
+
+  private final HiveSchema hiveSchema;
+  private final Set<String> tables;
+  
+  public HiveDatabaseSchema( //
+      List<String> tableList, //
+      HiveSchema hiveSchema, //
+      SchemaHolder parentSchema, //
+      String name) {
+    super(parentSchema, name);
+    this.hiveSchema = hiveSchema;
+    this.tables = Sets.newHashSet(tableList);
+  }
+
+  @Override
+  public DrillTable getTable(String tableName) {
+    return hiveSchema.getDrillTable(this.name, tableName);
+  }
+
+  @Override
+  public Set<String> getTableNames() {
+    return tables;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
new file mode 100644
index 0000000..091381f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.schema;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.SchemaFactory;
+import org.apache.drill.exec.store.SchemaHolder;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
+import org.apache.drill.exec.store.hive.HiveReadEntry;
+import org.apache.drill.exec.store.hive.HiveStoragePluginConfig;
+import org.apache.drill.exec.store.hive.HiveTable;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.thrift.TException;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class HiveSchemaFactory implements SchemaFactory {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
+
+  private static final String DATABASES = "databases";
+
+  private final HiveMetaStoreClient mClient;
+  private LoadingCache<String, List<String>> databases;
+  private LoadingCache<String, List<String>> tableNameLoader;
+  private LoadingCache<String, LoadingCache<String, HiveReadEntry>> tableLoaders;
+  private HiveStoragePluginConfig pluginConfig;
+  private final String schemaName;
+
+  public HiveSchemaFactory(HiveStoragePluginConfig pluginConfig, String name, HiveConf hiveConf) throws ExecutionSetupException {
+    this.schemaName = name;
+    this.pluginConfig = pluginConfig;
+    
+    try {
+      this.mClient = new HiveMetaStoreClient(hiveConf);
+    } catch (MetaException e) {
+      throw new ExecutionSetupException("Failure setting up Hive metastore client.", e);
+    }
+
+    databases = CacheBuilder //
+        .newBuilder() //
+        .expireAfterAccess(1, TimeUnit.MINUTES) //
+        .build(new DatabaseLoader());
+
+    tableNameLoader = CacheBuilder //
+        .newBuilder() //
+        .expireAfterAccess(1, TimeUnit.MINUTES) //
+        .build(new TableNameLoader());
+
+    tableLoaders = CacheBuilder //
+        .newBuilder() //
+        .expireAfterAccess(4, TimeUnit.HOURS) //
+        .maximumSize(20) //
+        .build(new TableLoaderLoader());
+  }
+
+  private class TableNameLoader extends CacheLoader<String, List<String>> {
+
+    @Override
+    public List<String> load(String dbName) throws Exception {
+      try {
+        return mClient.getAllTables(dbName);
+      } catch (TException e) {
+        logger.warn("Failure while attempting to get hive tables", e);
+        mClient.reconnect();
+        return mClient.getAllTables(dbName);
+      }
+    }
+
+  }
+
+  private class DatabaseLoader extends CacheLoader<String, List<String>> {
+
+    @Override
+    public List<String> load(String key) throws Exception {
+      if (!DATABASES.equals(key))
+        throw new UnsupportedOperationException();
+      try {
+        return mClient.getAllDatabases();
+      } catch (TException e) {
+        logger.warn("Failure while attempting to get hive tables", e);
+        mClient.reconnect();
+        return mClient.getAllDatabases();
+      }
+    }
+  }
+
+  private class TableLoaderLoader extends CacheLoader<String, LoadingCache<String, HiveReadEntry>> {
+
+    @Override
+    public LoadingCache<String, HiveReadEntry> load(String key) throws Exception {
+      return CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.MINUTES).build(new TableLoader(key));
+    }
+
+  }
+
+  private class TableLoader extends CacheLoader<String, HiveReadEntry> {
+
+    private final String dbName;
+
+    public TableLoader(String dbName) {
+      super();
+      this.dbName = dbName;
+    }
+
+    @Override
+    public HiveReadEntry load(String key) throws Exception {
+      Table t = null;
+      try {
+        t = mClient.getTable(dbName, key);
+      } catch (TException e) {
+        mClient.reconnect();
+        t = mClient.getTable(dbName, key);
+      }
+
+      if (t == null)
+        throw new UnknownTableException(String.format("Unable to find table '%s'.", key));
+
+      List<Partition> partitions = null;
+      try {
+        partitions = mClient.listPartitions(dbName, key, Short.MAX_VALUE);
+      } catch (TException e) {
+        mClient.reconnect();
+        partitions = mClient.listPartitions(dbName, key, Short.MAX_VALUE);
+      }
+
+      List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
+      for (Partition part : partitions) {
+        hivePartitions.add(new HiveTable.HivePartition(part));
+      }
+
+      if (hivePartitions.size() == 0)
+        hivePartitions = null;
+      return new HiveReadEntry(new HiveTable(t), hivePartitions);
+
+    }
+
+  }
+
+  @Override
+  public Schema add(SchemaPlus parent) {
+    HiveSchema schema = new HiveSchema(new SchemaHolder(parent), schemaName);
+    SchemaPlus hPlus = parent.add(schema);
+    schema.setHolder(hPlus);
+    return schema;
+  }
+
+  class HiveSchema extends AbstractSchema {
+
+    private final SchemaHolder holder = new SchemaHolder();
+
+    private HiveDatabaseSchema defaultSchema;
+    
+    public HiveSchema(SchemaHolder parentSchema, String name) {
+      super(parentSchema, name);
+      getSubSchema("default");
+    }
+    
+    @Override
+    public Schema getSubSchema(String name) {
+      List<String> tables;
+      try {
+        tables = tableNameLoader.get(name);
+        HiveDatabaseSchema schema = new HiveDatabaseSchema(tables, this, holder, name);
+        if(name.equals("default")){
+          this.defaultSchema = schema;
+        }
+        return schema;
+      } catch (ExecutionException e) {
+        logger.warn("Failure while attempting to access HiveDatabase '{}'.", name, e.getCause());
+        return null;
+      }
+      
+    }
+    
+
+    void setHolder(SchemaPlus plusOfThis){
+      holder.setSchema(plusOfThis);
+      for(String s : getSubSchemaNames()){
+        plusOfThis.add(getSubSchema(s));
+      }
+    }
+    
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      try{
+        List<String> dbs = databases.get(DATABASES);
+        return Sets.newHashSet(dbs);
+      }catch(ExecutionException e){
+        logger.warn("Failure while getting Hive database list.", e);
+      }
+      return super.getSubSchemaNames();
+    }
+
+    @Override
+    public DrillTable getTable(String name) {
+      if(defaultSchema == null){ 
+        return super.getTable(name);
+      }
+      return defaultSchema.getTable(name);
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      if(defaultSchema == null){
+        return super.getTableNames();
+      }
+      return defaultSchema.getTableNames();
+    }
+
+    List<String> getTableNames(String dbName){
+      try{
+        return tableNameLoader.get(dbName);
+      }catch(ExecutionException e){
+        logger.warn("Failure while loading table names for database '{}'.", dbName, e.getCause());
+        return Collections.emptyList();
+      }
+    }
+    
+    DrillTable getDrillTable(String dbName, String t){
+      HiveReadEntry entry = getSelectionBaseOnName(dbName, t);
+      if(entry == null) return null;
+      return new DrillHiveTable(schemaName, entry, pluginConfig);
+    }
+    
+    HiveReadEntry getSelectionBaseOnName(String dbName, String t) {
+      if(dbName == null) dbName = "default";
+      try{
+        return tableLoaders.get(dbName).get(t);
+      }catch(ExecutionException e){
+        logger.warn("Exception occurred while trying to read table. {}.{}", dbName, t, e.getCause());
+        return null;
+      }
+    }
+    
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
deleted file mode 100644
index 4782d82..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONGroupScan.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.EndpointAffinity;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.store.StorageEngineRegistry;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("json-scan")
-public class JSONGroupScan extends AbstractGroupScan {
-  private static int ESTIMATED_RECORD_SIZE = 1024; // 1kb
-  private final JSONStorageEngine engine;
-
-  private LinkedList<JSONGroupScan.ScanEntry>[] mappings;
-  private final List<JSONGroupScan.ScanEntry> readEntries;
-  private final OperatorCost cost;
-  private final Size size;
-  private final FieldReference ref;
-  private final List<SchemaPath> columns;
-
-  @JsonCreator
-  public JSONGroupScan(@JsonProperty("entries") List<ScanEntry> entries,
-                       @JsonProperty("storageengine") JSONStorageEngineConfig storageEngineConfig,
-                       @JacksonInject StorageEngineRegistry engineRegistry, @JsonProperty("ref") FieldReference ref,
-                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
-    this(entries, (JSONStorageEngine) engineRegistry.getEngine(storageEngineConfig), ref, columns);
-  }
-
-  public JSONGroupScan(List<ScanEntry> entries, JSONStorageEngine engine, FieldReference ref,
-                       List<SchemaPath> columns) {
-    this.engine = engine;
-    this.readEntries = entries;
-    OperatorCost cost = new OperatorCost(0, 0, 0, 0);
-    Size size = new Size(0, 0);
-    for (JSONGroupScan.ScanEntry r : readEntries) {
-    cost = cost.add(r.getCost());
-    size = size.add(r.getSize());
-    }
-    this.cost = cost;
-    this.size = size;
-    this.ref = ref;
-    this.columns = columns;
-  }
-  
-  @SuppressWarnings("unchecked")
-  @Override
-  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
-    checkArgument(endpoints.size() <= readEntries.size());
-
-    mappings = new LinkedList[endpoints.size()];
-
-    int i = 0;
-    for (ScanEntry e : readEntries) {
-      if (i == endpoints.size()) i = 0;
-      LinkedList entries = mappings[i];
-      if (entries == null) {
-        entries = new LinkedList<>();
-        mappings[i] = entries;
-      }
-      entries.add(e);
-      i++;
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException{
-    checkArgument(minorFragmentId < mappings.length, "Mappings length [%s] should be longer than minor fragment id [%s] but it isn't.", mappings.length, minorFragmentId);
-    return new JSONSubScan(mappings[minorFragmentId], engine, ref, columns);
-  }
-
-  @Override
-  public List<EndpointAffinity> getOperatorAffinity() {
-    return Collections.emptyList();
-  }
-
-  @Override
-  @JsonIgnore
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new JSONGroupScan(readEntries, engine, ref, columns);
-  }
-
-  public static class ScanEntry implements ReadEntry {
-    private final String path;
-    private Size size;
-
-    @JsonCreator
-    public ScanEntry(@JsonProperty("path") String path) {
-      this.path = path;
-      size = new Size(ESTIMATED_RECORD_SIZE, ESTIMATED_RECORD_SIZE);
-    }
-
-    @Override
-    public OperatorCost getCost() {
-      return new OperatorCost(1, 1, 2, 2);
-    }
-
-    @Override
-    public Size getSize() {
-      return size;
-    }
-
-    public String getPath() {
-      return path;
-    }
-  }
-
-  @Override
-  public int getMaxParallelizationWidth() {
-    return readEntries.size();
-  }
-
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    return cost;
-  }
-
-  @Override
-  public Size getSize() {
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
deleted file mode 100644
index 11b972c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONRecordReader.java
+++ /dev/null
@@ -1,541 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.holders.NullableBitHolder;
-import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
-import org.apache.drill.exec.expr.holders.NullableIntHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.DiffSchema;
-import org.apache.drill.exec.schema.Field;
-import org.apache.drill.exec.schema.NamedField;
-import org.apache.drill.exec.schema.ObjectSchema;
-import org.apache.drill.exec.schema.RecordSchema;
-import org.apache.drill.exec.schema.SchemaIdGenerator;
-import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.VectorHolder;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.RepeatedBitVector;
-import org.apache.drill.exec.vector.RepeatedFloat4Vector;
-import org.apache.drill.exec.vector.RepeatedIntVector;
-import org.apache.drill.exec.vector.RepeatedVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class JSONRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
-  private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
-  public static final Charset UTF_8 = Charset.forName("UTF-8");
-
-  private final Map<String, VectorHolder> valueVectorMap;
-  private final FileSystem fileSystem;
-  private final Path hadoopPath;
-
-  private JsonParser parser;
-  private SchemaIdGenerator generator;
-  private DiffSchema diffSchema;
-  private RecordSchema currentSchema;
-  private List<Field> removedFields;
-  private OutputMutator outputMutator;
-  private BufferAllocator allocator;
-  private int batchSize;
-  private final FieldReference ref;
-  private final List<SchemaPath> columns;
-
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
-                          FieldReference ref, List<SchemaPath> columns) {
-    this.hadoopPath = new Path(inputPath);
-    this.fileSystem = fileSystem;
-    this.allocator = fragmentContext.getAllocator();
-    this.batchSize = batchSize;
-    valueVectorMap = Maps.newHashMap();
-    this.ref = ref;
-    this.columns = columns;
-  }
-
-  public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, FieldReference ref,
-                          List<SchemaPath> columns) {
-    this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, ref, columns);
-  }
-
-  private JsonParser getParser() {
-    return parser;
-  }
-
-  @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
-    outputMutator = output;
-    currentSchema = new ObjectSchema();
-    diffSchema = new DiffSchema();
-    removedFields = Lists.newArrayList();
-
-    try {
-      JsonFactory factory = new JsonFactory();
-      parser = factory.createJsonParser(fileSystem.open(hadoopPath));
-      parser.nextToken(); // Read to the first START_OBJECT token
-      generator = new SchemaIdGenerator();
-    } catch (IOException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  @Override
-  public int next() {
-    if (parser.isClosed() || !parser.hasCurrentToken()) {
-      return 0;
-    }
-
-    resetBatch();
-
-    int nextRowIndex = 0;
-
-    try {
-      while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
-        parser.nextToken(); // Read to START_OBJECT token
-
-        if (!parser.hasCurrentToken()) {
-          parser.close();
-          break;
-        }
-      }
-
-      parser.nextToken();
-
-      if (!parser.hasCurrentToken()) {
-        parser.close();
-      }
-
-      // Garbage collect fields never referenced in this batch
-      for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
-        diffSchema.addRemovedField(field);
-        outputMutator.removeField(field.getAsMaterializedField(ref));
-      }
-
-      if (diffSchema.isChanged()) {
-        outputMutator.setNewSchema();
-      }
-
-
-    } catch (IOException | SchemaChangeException e) {
-      logger.error("Error reading next in Json reader", e);
-    }
-
-    for (VectorHolder holder : valueVectorMap.values()) {
-      holder.populateVectorLength();
-    }
-
-    return nextRowIndex;
-  }
-
-  private void resetBatch() {
-    for (VectorHolder value : valueVectorMap.values()) {
-      value.reset();
-    }
-
-    currentSchema.resetMarkedFields();
-    diffSchema.reset();
-    removedFields.clear();
-  }
-
-  @Override
-  public void cleanup() {
-    try {
-      parser.close();
-    } catch (IOException e) {
-      logger.warn("Error closing Json parser", e);
-    }
-  }
-
-
-  private RecordSchema getCurrentSchema() {
-    return currentSchema;
-  }
-
-  private void setCurrentSchema(RecordSchema schema) {
-    currentSchema = schema;
-  }
-
-  private List<Field> getRemovedFields() {
-    return removedFields;
-  }
-
-  public BufferAllocator getAllocator() {
-    return allocator;
-  }
-
-  private boolean fieldSelected(String field){
-    SchemaPath sp = new SchemaPath(field, ExpressionPosition.UNKNOWN);
-    if (this.columns != null && this.columns.size() > 1){
-      for (SchemaPath expr : this.columns){
-        if ( sp.equals(expr)){
-          return true;
-        }
-      }
-      return false;
-    }
-    return true;
-  }
-
-  public static enum ReadType {
-    ARRAY(END_ARRAY) {
-      @Override
-      public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
-        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-      }
-
-      @Override
-      public RecordSchema createSchema() throws IOException {
-        return new ObjectSchema();
-      }
-    },
-    OBJECT(END_OBJECT) {
-      @Override
-      public Field createField(RecordSchema parentSchema,
-                               String prefixFieldName,
-                               String fieldName,
-                               MajorType fieldType,
-                               int index) {
-        return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
-      }
-
-      @Override
-      public RecordSchema createSchema() throws IOException {
-        return new ObjectSchema();
-      }
-    };
-
-    private final JsonToken endObject;
-
-    ReadType(JsonToken endObject) {
-      this.endObject = endObject;
-    }
-
-    public JsonToken getEndObject() {
-      return endObject;
-    }
-
-    @SuppressWarnings("ConstantConditions")
-    public boolean readRecord(JSONRecordReader reader,
-                              String prefixFieldName,
-                              int rowIndex,
-                              int groupCount) throws IOException, SchemaChangeException {
-      JsonParser parser = reader.getParser();
-      JsonToken token = parser.nextToken();
-      JsonToken endObject = getEndObject();
-      int colIndex = 0;
-      boolean isFull = false;
-      while (token != endObject) {
-        if (token == FIELD_NAME) {
-          token = parser.nextToken();
-          continue;
-        }
-
-        String fieldName = parser.getCurrentName();
-        if ( fieldName != null && ! reader.fieldSelected(fieldName)){
-          // this field was not requested in the query
-          token = parser.nextToken();
-          colIndex += 1;
-          continue;
-        }
-        MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
-        ReadType readType = null;
-        switch (token) {
-          case START_ARRAY:
-            readType = ReadType.ARRAY;
-            groupCount++;
-            break;
-          case START_OBJECT:
-            readType = ReadType.OBJECT;
-            groupCount = 0;
-            break;
-        }
-
-        if (fieldType != null) { // Including nulls
-          boolean currentFieldFull = !recordData(
-              readType,
-              reader,
-              fieldType,
-              prefixFieldName,
-              fieldName,
-              rowIndex,
-              colIndex,
-              groupCount);
-
-          isFull = isFull || currentFieldFull;
-        }
-        token = parser.nextToken();
-        colIndex += 1;
-      }
-      return !isFull;
-    }
-
-    private void removeChildFields(List<Field> removedFields, Field field) {
-      RecordSchema schema = field.getAssignedSchema();
-      if (schema == null) {
-        return;
-      }
-      for (Field childField : schema.getFields()) {
-        removedFields.add(childField);
-        if (childField.hasSchema()) {
-          removeChildFields(removedFields, childField);
-        }
-      }
-    }
-
-    private boolean recordData(JSONRecordReader.ReadType readType,
-                               JSONRecordReader reader,
-                               MajorType fieldType,
-                               String prefixFieldName,
-                               String fieldName,
-                               int rowIndex,
-                               int colIndex,
-                               int groupCount) throws IOException, SchemaChangeException {
-      RecordSchema currentSchema = reader.getCurrentSchema();
-      Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
-      boolean isFieldFound = field != null;
-      List<Field> removedFields = reader.getRemovedFields();
-      boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
-
-      if (isFieldFound && !field.getFieldType().equals(fieldType)) {
-        boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
-
-        if (newFieldLateBound && !existingFieldLateBound) {
-          fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
-        } else if (!newFieldLateBound && existingFieldLateBound) {
-          field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
-        } else if (!newFieldLateBound && !existingFieldLateBound) {
-          if (field.hasSchema()) {
-            removeChildFields(removedFields, field);
-          }
-          removedFields.add(field);
-          currentSchema.removeField(field, colIndex);
-
-          isFieldFound = false;
-        }
-      }
-
-      if (!isFieldFound) {
-        field = createField(
-            currentSchema,
-            prefixFieldName,
-            fieldName,
-            fieldType,
-            colIndex
-        );
-
-        reader.recordNewField(field);
-        currentSchema.addField(field);
-      }
-
-      field.setRead(true);
-
-      VectorHolder holder = getOrCreateVectorHolder(reader, field);
-      if (readType != null) {
-        RecordSchema fieldSchema = field.getAssignedSchema();
-        RecordSchema newSchema = readType.createSchema();
-
-        if (readType != ReadType.ARRAY) {
-          reader.setCurrentSchema(fieldSchema);
-          if (fieldSchema == null) reader.setCurrentSchema(newSchema);
-          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
-        } else {
-          readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
-        }
-
-        reader.setCurrentSchema(currentSchema);
-
-      } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
-        return addValueToVector(
-            rowIndex,
-            holder,
-            JacksonHelper.getValueFromFieldType(
-                reader.getParser(),
-                fieldType.getMinorType()
-            ),
-            fieldType.getMinorType(),
-            groupCount
-        );
-      }
-
-      return true;
-    }
-
-    private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
-      switch (minorType) {
-        case INT: {
-          holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableIntVector int4 = (NullableIntVector) holder.getValueVector();
-              NullableIntVector.Mutator m = int4.getMutator();
-              m.set(index, (Integer) val);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated int is not supported.");
-            }
-
-            RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector();
-            RepeatedIntVector.Mutator m = repeatedInt4.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Integer) val);
-          }
-
-          return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
-        }
-        case FLOAT4: {
-          holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
-              NullableFloat4Vector.Mutator m = float4.getMutator();
-              m.set(index, (Float) val);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated float is not supported.");
-            }
-
-            RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
-            RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Float) val);
-          }
-          return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
-        }
-        case VARCHAR: {
-          if (val == null) {
-            return (index + 1) * 4 <= holder.getLength();
-          } else {
-            byte[] bytes = ((String) val).getBytes(UTF_8);
-            int length = bytes.length;
-            holder.incAndCheckLength(length);
-            if (groupCount == 0) {
-              NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
-              NullableVarCharVector.Mutator m = varLen4.getMutator();
-              m.set(index, bytes);
-            } else {
-              RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
-              RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
-              holder.setGroupCount(index);
-              m.add(index, bytes);
-            }
-            return holder.hasEnoughSpace(length + 4 + 1);
-          }
-        }
-        case BIT: {
-          holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
-          if (groupCount == 0) {
-            if (val != null) {
-              NullableBitVector bit = (NullableBitVector) holder.getValueVector();
-              NullableBitVector.Mutator m = bit.getMutator();
-              m.set(index, (Boolean) val ? 1 : 0);
-            }
-          } else {
-            if (val == null) {
-              throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
-            }
-
-            RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
-            RepeatedBitVector.Mutator m = repeatedBit.getMutator();
-            holder.setGroupCount(index);
-            m.add(index, (Boolean) val ? 1 : 0);
-          }
-          return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
-        }
-        default:
-          throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
-      }
-    }
-
-    private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
-      return reader.getOrCreateVectorHolder(field);
-    }
-
-    public abstract RecordSchema createSchema() throws IOException;
-
-    public abstract Field createField(RecordSchema parentSchema,
-                                      String prefixFieldName,
-                                      String fieldName,
-                                      MajorType fieldType,
-                                      int index);
-  }
-
-  private void recordNewField(Field field) {
-    diffSchema.recordNewField(field);
-  }
-
-  private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
-    String fullFieldName = ref != null ? ref.getPath() + "." + field.getFullFieldName() : field.getFullFieldName();
-    VectorHolder holder = valueVectorMap.get(fullFieldName);
-
-    if (holder == null) {
-      MajorType type = field.getFieldType();
-      MinorType minorType = type.getMinorType();
-
-      if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
-        return null;
-      }
-
-      MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type);
-
-      ValueVector v = TypeHelper.getNewVector(f, allocator);
-      AllocationHelper.allocate(v, batchSize, 50);
-      holder = new VectorHolder(v);
-      valueVectorMap.put(fullFieldName, holder);
-      outputMutator.addField(v);
-      return holder;
-    }
-    return holder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
deleted file mode 100644
index c40cb47..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONScanBatchCreator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-
-import java.util.List;
-
-public class JSONScanBatchCreator implements BatchCreator<JSONSubScan> {
-
-    @Override
-    public RecordBatch getBatch(FragmentContext context, JSONSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
-        Preconditions.checkArgument(children.isEmpty());
-        List<JSONGroupScan.ScanEntry> entries = config.getReadEntries();
-        List<RecordReader> readers = Lists.newArrayList();
-        for (JSONGroupScan.ScanEntry e : entries) {
-            readers.add(new JSONRecordReader(context, e.getPath(), config.getStorageEngine().getFileSystem(), config.getRef(),
-                config.getColumns()));
-        }
-
-        return new ScanBatch(context, readers.iterator());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
deleted file mode 100644
index e4f2070..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngine.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.ClassPathFileSystem;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.drill.exec.store.json.JSONGroupScan.ScanEntry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public class JSONStorageEngine extends AbstractStorageEngine {
-  private final JSONStorageEngineConfig config;
-  private final Configuration conf;
-  private FileSystem fileSystem;
-  public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
-  private final JsonSchemaProvider schemaProvider;
- 
-  public JSONStorageEngine(JSONStorageEngineConfig config, DrillbitContext context) {
-    this.config = config;
-    this.schemaProvider = new JsonSchemaProvider(config, context.getConfig());
-    
-    try {
-      this.conf = new Configuration();
-      this.conf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
-      this.conf.set(HADOOP_DEFAULT_NAME, config.getDfsName());
-      this.fileSystem = FileSystem.get(conf);
-
-    } catch (IOException ie) {
-      throw new RuntimeException("Error setting up filesystem");
-    }
-  }
-
-  public FileSystem getFileSystem() {
-    return fileSystem;
-  }
-
-  public JSONStorageEngineConfig getConfig(){
-    return config;
-  }
-  
-  @Override
-  public JSONGroupScan getPhysicalScan(Scan scan) throws IOException {
-    ArrayList<ScanEntry> readEntries = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<ArrayList<ScanEntry>>() {});
-    return new JSONGroupScan(readEntries, this, scan.getOutputReference(), null);
-  }
-
-  @Override
-  public SchemaProvider getSchemaProvider() {
-    return schemaProvider;
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
deleted file mode 100644
index 359e7ed..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONStorageEngineConfig.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-
-@JsonTypeName("json")
-public class JSONStorageEngineConfig extends StorageEngineConfigBase {
-  private String dfsName;
-
-  public String getDfsName() {
-    return dfsName;
-  }
-
-  @JsonCreator
-  public JSONStorageEngineConfig(@JsonProperty("dfsName") String dfsName) {
-    this.dfsName = dfsName;
-  }
-
-  @Override
-  public int hashCode() {
-    return dfsName != null ? dfsName.hashCode() : 0;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    JSONStorageEngineConfig that = (JSONStorageEngineConfig) o;
-
-    if (dfsName != null ? !dfsName.equals(that.dfsName) : that.dfsName != null) return false;
-
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
deleted file mode 100644
index 92f6c0a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JSONSubScan.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.AbstractBase;
-import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.base.PhysicalVisitor;
-import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.store.StorageEngineRegistry;
-
-import com.fasterxml.jackson.annotation.JacksonInject;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Iterators;
-
-@JsonTypeName("json-sub-scan")
-public class JSONSubScan extends AbstractBase implements SubScan {
-
-  protected final List<JSONGroupScan.ScanEntry> readEntries;
-  private final OperatorCost cost;
-  private final Size size;
-  private final JSONStorageEngine storageEngine;
-  private final FieldReference ref;
-  private final List<SchemaPath> columns;
-
-  @JsonCreator
-  public JSONSubScan(@JacksonInject StorageEngineRegistry registry,
-                     @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
-                     @JsonProperty("readEntries") List<JSONGroupScan.ScanEntry> readEntries,
-                     @JsonProperty("ref") FieldReference ref,
-                     @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
-    this(readEntries, (JSONStorageEngine) registry.getEngine(engineConfig), ref, columns);
-  }
-  
-  JSONSubScan(List<JSONGroupScan.ScanEntry> readEntries, JSONStorageEngine engine, FieldReference ref,
-              List<SchemaPath> columns){
-    this.readEntries = readEntries;
-    this.storageEngine = engine;
-    OperatorCost cost = new OperatorCost(0, 0, 0, 0);
-    Size size = new Size(0, 0);
-    for (JSONGroupScan.ScanEntry r : readEntries) {
-      cost = cost.add(r.getCost());
-      size = size.add(r.getSize());
-    }
-    this.cost = cost;
-    this.size = size;
-    this.ref = ref;
-    this.columns = columns;
-  }
-  
-  public FieldReference getRef() {
-    return ref;
-  }
-
-  public List<JSONGroupScan.ScanEntry> getReadEntries() {
-    return readEntries;
-  }
-
-  public StorageEngineConfig getEngineConfig() {
-    return storageEngine.getConfig();
-  }
-
-  @JsonIgnore
-  public JSONStorageEngine getStorageEngine() {
-    return storageEngine;
-  }
-
-  @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException{
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public OperatorCost getCost() {
-    return cost;
-  }
-
-  @Override
-  public Size getSize() {
-    return size;
-  }
-
-  @Override
-  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
-    return physicalVisitor.visitSubScan(this, value);
-  }
-
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
-  }
-
-  public List<SchemaPath> getColumns() {
-    return columns;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JsonSchemaProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JsonSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JsonSchemaProvider.java
deleted file mode 100644
index 66a4542..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/json/JsonSchemaProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.json;
-
-import java.io.IOException;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.store.ClassPathFileSystem;
-import org.apache.drill.exec.store.SchemaProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
-import com.beust.jcommander.internal.Lists;
-
-public class JsonSchemaProvider implements SchemaProvider{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonSchemaProvider.class);
-
-  public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
-  final JSONStorageEngineConfig configuration;
-  final FileSystem fs;
-  final Configuration conf;
-
-  public JsonSchemaProvider(JSONStorageEngineConfig configuration, DrillConfig config){
-    this.configuration = configuration;
-    try {
-      this.conf = new Configuration();
-      this.conf.set(HADOOP_DEFAULT_NAME, "file:///");
-      this.conf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
-      this.fs = FileSystem.get(conf);
-    } catch (IOException ie) {
-      throw new RuntimeException("Error setting up filesystem");
-    }
-  }
-
-  @Override
-  public Object getSelectionBaseOnName(String tableName) {
-    try{
-//      if(!fs.exists(new Path(tableName))) return null;
-      ReadEntryWithPath re = new ReadEntryWithPath(tableName);
-      return Lists.newArrayList(re);
-    }catch(Exception e){
-      logger.warn(String.format("Failure while checking table name %s.", tableName), e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
index cb9cbf6..b71784b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java
@@ -27,7 +27,6 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.physical.EndpointAffinity;
 import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.ReadEntry;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
@@ -75,7 +74,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
     return readEntries;
   }
   
-  public static class MockScanEntry implements ReadEntry {
+  public static class MockScanEntry{
 
     private final int records;
     private final MockColumn[] types;
@@ -93,7 +92,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
       this.recordSize = size;
     }
 
-    @Override
+    @JsonIgnore
     public OperatorCost getCost() {
       return new OperatorCost(1, 2, 1, 1);
     }
@@ -106,7 +105,7 @@ public class MockGroupScanPOP extends AbstractGroupScan {
       return types;
     }
 
-    @Override
+    @JsonIgnore
     public Size getSize() {
       return new Size(records, recordSize);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index b58ab50..0578b06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -20,20 +20,22 @@ package org.apache.drill.exec.store.mock;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class MockStorageEngine extends AbstractStorageEngine {
+public class MockStorageEngine extends AbstractStoragePlugin {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
 
-  public MockStorageEngine(MockStorageEngineConfig configuration, DrillbitContext context) {
+  public MockStorageEngine(MockStorageEngineConfig configuration, DrillbitContext context, String name) {
 
   }
 
@@ -48,8 +50,9 @@ public class MockStorageEngine extends AbstractStorageEngine {
   }
 
   @Override
-  public SchemaProvider getSchemaProvider() {
-    throw new UnsupportedOperationException();
+  public Schema createAndAddSchema(SchemaPlus parent) {
+    return null;
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
index 786c0ec..8262544 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngineConfig.java
@@ -17,14 +17,14 @@
  */
 package org.apache.drill.exec.store.mock;
 
-import org.apache.drill.common.logical.StorageEngineConfigBase;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName("mock")
-public class MockStorageEngineConfig extends StorageEngineConfigBase{
+public class MockStorageEngineConfig extends StoragePluginConfigBase{
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngineConfig.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
index 800c7c5..0753be5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockSubScanPOP.java
@@ -41,21 +41,21 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
 
   private final String url;
   protected final List<MockGroupScanPOP.MockScanEntry> readEntries;
-  private final OperatorCost cost;
-  private final Size size;
+//  private final OperatorCost cost;
+//  private final Size size;
   private  LinkedList<MockGroupScanPOP.MockScanEntry>[] mappings;
 
   @JsonCreator
   public MockSubScanPOP(@JsonProperty("url") String url, @JsonProperty("entries") List<MockGroupScanPOP.MockScanEntry> readEntries) {
     this.readEntries = readEntries;
-    OperatorCost cost = new OperatorCost(0,0,0,0);
-    Size size = new Size(0,0);
-    for(MockGroupScanPOP.MockScanEntry r : readEntries){
-      cost = cost.add(r.getCost());
-      size = size.add(r.getSize());
-    }
-    this.cost = cost;
-    this.size = size;
+//    OperatorCost cost = new OperatorCost(0,0,0,0);
+//    Size size = new Size(0,0);
+//    for(MockGroupScanPOP.MockScanEntry r : readEntries){
+//      cost = cost.add(r.getCost());
+//      size = size.add(r.getSize());
+//    }
+//    this.cost = cost;
+//    this.size = size;
     this.url = url;
   }
 
@@ -73,12 +73,12 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
     return Iterators.emptyIterator();
   }
 
-  @Override
+  @Override @JsonIgnore
   public OperatorCost getCost() {
     throw new UnsupportedOperationException();
   }
 
-  @Override
+  @Override @JsonIgnore
   public Size getSize() {
     throw new UnsupportedOperationException();
   }
@@ -86,7 +86,7 @@ public class MockSubScanPOP extends AbstractBase implements SubScan {
   // will want to replace these two methods with an interface above for AbstractSubScan
   @Override
   public boolean isExecutable() {
-    return true;  //To change body of implemented methods use File | Settings | File Templates.
+    return true;  
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
index d0049c7..c489d5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
-public final class BitReader extends ColumnReader {
+final class BitReader extends ColumnReader {
 
   private byte currentByte;
   private byte nextByte;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index ec44747..a890f1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -28,7 +28,7 @@ import parquet.bytes.BytesInput;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 
-public class ColumnDataReader {
+class ColumnDataReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
   
   private final long endPosition;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
index 94ccbfc..2cc126c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
@@ -29,7 +29,7 @@ import parquet.schema.PrimitiveType;
 
 import java.io.IOException;
 
-public abstract class ColumnReader {
+abstract class ColumnReader {
   
   final ParquetRecordReader parentReader;
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
index 3aae189..4f14f60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
-public class FixedByteAlignedReader extends ColumnReader {
+class FixedByteAlignedReader extends ColumnReader {
 
   private byte[] bytes;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
index 1dde7c7..4c060f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
@@ -35,7 +35,7 @@ import java.io.IOException;
  * because page/batch boundaries that do not land on byte boundaries require shifting of all of the values
  * in the next batch.
  */
-public final class NullableBitReader extends ColumnReader {
+final class NullableBitReader extends ColumnReader {
 
   NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                     boolean fixedLength, ValueVector v) throws ExecutionSetupException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
index 5ac9bb1..4c33aeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
@@ -26,7 +26,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 import java.io.IOException;
 
-public abstract class NullableColumnReader extends ColumnReader{
+abstract class NullableColumnReader extends ColumnReader{
 
   int nullsFound;
   // used to skip nulls found

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
index bb81024..c2fc606 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import parquet.column.ColumnDescriptor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
-public class NullableFixedByteAlignedReader extends NullableColumnReader {
+class NullableFixedByteAlignedReader extends NullableColumnReader {
 
   private byte[] bytes;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
index c08dcf3..1aef7b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
@@ -31,7 +31,7 @@ import parquet.format.PageHeader;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 
 // class to keep track of the read position of variable length columns
-public final class PageReadStatus {
+final class PageReadStatus {
 
   private final ColumnReader parentColumnReader;
   private final ColumnDataReader dataReader;
@@ -90,9 +90,9 @@ public final class PageReadStatus {
         bytesIn,
         pageHeader.data_page_header.num_values,
         pageHeader.uncompressed_page_size,
-        ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
-        ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
-        ParquetStorageEngine.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+        ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
     );
 
     byteLength = pageHeader.uncompressed_page_size;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8b90cae3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
new file mode 100644
index 0000000..33eb68a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("parquet")
+public class ParquetFormatConfig implements FormatPluginConfig{
+
+  @Override
+  public int hashCode() {
+    return 7;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof ParquetFormatConfig;
+  }
+  
+}


Mime
View raw message