drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sor...@apache.org
Subject [drill] 01/03: DRILL-6965: Implement schema table function parameter
Date Sat, 04 May 2019 02:20:55 GMT
This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 772f7918700b89a0db545ce610e23bc32a5f55b2
Author: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
AuthorDate: Tue Mar 19 19:43:48 2019 +0200

    DRILL-6965: Implement schema table function parameter
    
    1. Added common schema table function parameter with can be used as single unit or with format plugin table function parameters.
    2. Allowed creating schema without columns, in case if user needs only to indicate table properties.
    3. Added unit tests.
    closes #1777
---
 .../record/metadata/schema/parser/SchemaParser.g4  |   4 +-
 .../src/main/codegen/includes/parserImpls.ftl      |   4 +-
 .../base/FileSystemMetadataProviderManager.java    |  12 +-
 .../physical/base/MetadataProviderManager.java     |  10 +-
 .../base/SimpleFileTableMetadataProvider.java      |   5 +-
 .../drill/exec/planner/logical/DrillTable.java     |   8 +-
 .../drill/exec/planner/sql/SqlConverter.java       |   2 +-
 .../exec/planner/sql/handlers/SchemaHandler.java   |  49 ++--
 .../drill/exec/planner/sql/parser/SqlSchema.java   |  22 +-
 .../record/metadata/AbstractColumnMetadata.java    |   6 +-
 .../drill/exec/record/metadata/TupleSchema.java    |   4 +-
 .../metadata/schema/FsMetastoreSchemaProvider.java |   2 +-
 .../metadata/schema/InlineSchemaProvider.java      |   9 +-
 .../record/metadata/schema/PathSchemaProvider.java |   2 +-
 .../record/metadata/schema/SchemaContainer.java    |   8 +-
 .../metadata/schema/SchemaProviderFactory.java     |  87 +++++++
 .../metadata/schema/parser/SchemaExprParser.java   |  23 +-
 .../metadata/schema/parser/SchemaVisitor.java      |  45 ++--
 .../apache/drill/exec/store/AbstractSchema.java    |  98 +++++++-
 .../store/dfs/FormatPluginOptionExtractor.java     |  35 +--
 .../store/dfs/FormatPluginOptionsDescriptor.java   |  61 ++---
 .../exec/store/dfs/WorkspaceSchemaFactory.java     | 212 ++++--------------
 .../exec/store/table/function/TableParamDef.java   | 105 +++++++++
 .../exec/store/table/function/TableSignature.java  |  98 ++++++++
 .../table/function/WithOptionsTableMacro.java      |  91 ++++++++
 .../java/org/apache/drill/TestSchemaCommands.java  |  64 +++++-
 .../apache/drill/TestSchemaWithTableFunction.java  | 249 +++++++++++++++++++++
 .../org/apache/drill/TestSelectWithOption.java     |  54 ++---
 .../record/metadata/schema/TestSchemaProvider.java |  43 +++-
 .../schema/parser/TestParserErrorHandling.java     |  74 +++---
 .../metadata/schema/parser/TestSchemaParser.java   |  83 +++++--
 31 files changed, 1145 insertions(+), 424 deletions(-)

diff --git a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4 b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4
index a4854be..384daf8 100644
--- a/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4
+++ b/exec/java-exec/src/main/antlr4/org/apache/drill/exec/record/metadata/schema/parser/SchemaParser.g4
@@ -25,9 +25,9 @@ options {
  */
 }
 
-schema: (columns | LEFT_PAREN columns RIGHT_PAREN) EOF;
+schema: (columns | LEFT_PAREN columns? RIGHT_PAREN) property_values? EOF;
 
-columns: column_def  (COMMA column_def)*;
+columns: column_def (COMMA column_def)*;
 
 column_def: column property_values?;
 
diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
index f90859a..be30318 100644
--- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
+++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl
@@ -377,9 +377,9 @@ void addProperty(SqlNodeList properties) :
   | < NUM: <DIGIT> (" " | "\t" | "\n" | "\r")* >
     // once schema is found, switch back to initial lexical state
     // must be enclosed in the parentheses
-    // inside may have left parenthesis only if number precededs (covers cases with varchar(10)),
+    // inside may have left parenthesis only if number precedes (covers cases with varchar(10)),
     // if left parenthesis is present in column name, it must be escaped with backslash
-  | < PAREN_STRING: <LPAREN> ((~[")"]) | (<NUM> ")") | ("\\)"))+ <RPAREN> > { popState(); }
+  | < PAREN_STRING: <LPAREN> ((~[")"]) | (<NUM> ")") | ("\\)"))* <RPAREN> > { popState(); }
 }
 
 /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileSystemMetadataProviderManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileSystemMetadataProviderManager.java
index 11bfbfe..cfc6ef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileSystemMetadataProviderManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FileSystemMetadataProviderManager.java
@@ -32,7 +32,7 @@ public class FileSystemMetadataProviderManager implements MetadataProviderManage
 
   private TableMetadataProvider tableMetadataProvider;
 
-  public static MetadataProviderManager getMetadataProviderManager() {
+  public static MetadataProviderManager init() {
     return new FileSystemMetadataProviderManager();
   }
 
@@ -42,6 +42,11 @@ public class FileSystemMetadataProviderManager implements MetadataProviderManage
   }
 
   @Override
+  public SchemaProvider getSchemaProvider() {
+    return schemaProvider;
+  }
+
+  @Override
   public void setStatsProvider(DrillStatsTable statsProvider) {
     this.statsProvider = statsProvider;
   }
@@ -52,11 +57,6 @@ public class FileSystemMetadataProviderManager implements MetadataProviderManage
   }
 
   @Override
-  public SchemaProvider getSchemaProvider() {
-    return schemaProvider;
-  }
-
-  @Override
   public void setTableMetadataProvider(TableMetadataProvider tableMetadataProvider) {
     this.tableMetadataProvider = tableMetadataProvider;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/MetadataProviderManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/MetadataProviderManager.java
index 65739d4..8faab9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/MetadataProviderManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/MetadataProviderManager.java
@@ -28,18 +28,18 @@ import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
  */
 public interface MetadataProviderManager {
 
-  DrillStatsTable getStatsProvider();
-
-  void setStatsProvider(DrillStatsTable statsProvider);
+  void setSchemaProvider(SchemaProvider schemaProvider);
 
   SchemaProvider getSchemaProvider();
 
-  void setSchemaProvider(SchemaProvider schemaProvider);
+  void setStatsProvider(DrillStatsTable statsProvider);
 
-  TableMetadataProvider getTableMetadataProvider();
+  DrillStatsTable getStatsProvider();
 
   void setTableMetadataProvider(TableMetadataProvider tableMetadataProvider);
 
+  TableMetadataProvider getTableMetadataProvider();
+
   /**
    * Returns builder responsible for constructing required {@link TableMetadataProvider} instances
    * based on specified providers.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java
index 1efad36..ecf45af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SimpleFileTableMetadataProvider.java
@@ -152,8 +152,9 @@ public class SimpleFileTableMetadataProvider implements TableMetadataProvider {
           } else {
             schema = schemaProvider != null ? schemaProvider.read().getSchema() : null;
           }
-        } catch (IOException e) {
-          logger.debug("Unable to deserialize schema from schema file for table: " + (tableName != null ? tableName : location), e);
+        } catch (IOException | IllegalArgumentException e) {
+          logger.debug("Unable to read schema from schema provider [{}]: {}", (tableName != null ? tableName : location), e.getMessage());
+          logger.trace("Error when reading the schema", e);
         }
         TableMetadata tableMetadata = new FileTableMetadata(tableName,
             location, schema,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index c522ad0..c53899e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -105,8 +105,8 @@ public abstract class DrillTable implements Table {
     this.scan = scan;
   }
 
-  public void setTableMetadataProviderBuilder(MetadataProviderManager metadataProviderBuilder) {
-    this.metadataProviderManager = metadataProviderBuilder;
+  public void setTableMetadataProviderManager(MetadataProviderManager metadataProviderManager) {
+    this.metadataProviderManager = metadataProviderManager;
   }
 
   public GroupScan getGroupScan() throws IOException {
@@ -121,7 +121,7 @@ public abstract class DrillTable implements Table {
   }
 
   /**
-   * Returns builder for {@link TableMetadataProvider} which may provide null for the case when scan wasn't created.
+   * Returns manager for {@link TableMetadataProvider} which may provide null for the case when scan wasn't created.
    * This method should be used only for the case when it is possible to obtain {@link TableMetadataProvider} when supplier returns null
    * or {@link TableMetadataProvider} usage may be omitted.
    *
@@ -130,7 +130,7 @@ public abstract class DrillTable implements Table {
   public MetadataProviderManager getMetadataProviderManager() {
     if (metadataProviderManager == null) {
       // for the case when scan wasn't initialized, return null to avoid reading data which may be pruned in future
-      metadataProviderManager = FileSystemMetadataProviderManager.getMetadataProviderManager();
+      metadataProviderManager = FileSystemMetadataProviderManager.init();
       if (scan != null) {
         metadataProviderManager.setTableMetadataProvider(scan.getMetadataProvider());
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 17b0490..59ac5ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -731,7 +731,7 @@ public class SqlConverter {
       if (table != null && (drillTable = table.unwrap(DrillTable.class)) != null) {
         drillTable.setOptions(session.getOptions());
 
-        drillTable.setTableMetadataProviderBuilder(tableCache.getUnchecked(
+        drillTable.setTableMetadataProviderManager(tableCache.getUnchecked(
             new DrillTableKey(SchemaPath.getCompoundPath(names.toArray(new String[0])), drillTable)));
       }
       return table;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
index b8739e1..cbc311d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SchemaHandler.java
@@ -31,11 +31,10 @@ import org.apache.drill.exec.planner.sql.parser.SqlCreateType;
 import org.apache.drill.exec.planner.sql.parser.SqlSchema;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.schema.FsMetastoreSchemaProvider;
 import org.apache.drill.exec.record.metadata.schema.PathSchemaProvider;
 import org.apache.drill.exec.record.metadata.schema.SchemaContainer;
 import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
-import org.apache.drill.exec.record.metadata.schema.parser.SchemaParsingException;
+import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
@@ -63,7 +62,7 @@ public abstract class SchemaHandler extends DefaultSqlHandler {
     super(config);
   }
 
-  WorkspaceSchemaFactory.WorkspaceSchema getWorkspaceSchema(List<String> tableSchema, String tableName) {
+  public WorkspaceSchemaFactory.WorkspaceSchema getWorkspaceSchema(List<String> tableSchema, String tableName) {
     SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
     AbstractSchema temporarySchema = SchemaUtilites.resolveToTemporarySchema(tableSchema, defaultSchema, context.getConfig());
 
@@ -112,15 +111,7 @@ public abstract class SchemaHandler extends DefaultSqlHandler {
       String schemaSource = sqlCall.hasTable() ? sqlCall.getTable().toString() : sqlCall.getPath();
       try {
 
-        SchemaProvider schemaProvider;
-        if (sqlCall.hasTable()) {
-          String tableName = sqlCall.getTableName();
-          WorkspaceSchemaFactory.WorkspaceSchema wsSchema = getWorkspaceSchema(sqlCall.getSchemaPath(), tableName);
-          schemaProvider = new FsMetastoreSchemaProvider(wsSchema, tableName);
-        } else {
-          schemaProvider = new PathSchemaProvider(new Path(sqlCall.getPath()));
-        }
-
+        SchemaProvider schemaProvider = SchemaProviderFactory.create(sqlCall, this);
         if (schemaProvider.exists()) {
           if (SqlCreateType.OR_REPLACE == sqlCall.getSqlCreateType()) {
             schemaProvider.delete();
@@ -134,11 +125,6 @@ public abstract class SchemaHandler extends DefaultSqlHandler {
           ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false);
         schemaProvider.store(schemaString, sqlCall.getProperties(), storageStrategy);
         return DirectPlan.createDirectPlan(context, true, String.format("Created schema for [%s]", schemaSource));
-      } catch (SchemaParsingException e) {
-        throw UserException.parseError(e)
-          .message(e.getMessage())
-          .addContext("Schema: " + schemaString)
-          .build(logger);
       } catch (IOException e) {
         throw UserException.resourceError(e)
           .message(e.getMessage())
@@ -200,12 +186,8 @@ public abstract class SchemaHandler extends DefaultSqlHandler {
     public PhysicalPlan getPlan(SqlNode sqlNode) {
       SqlSchema.Drop sqlCall = ((SqlSchema.Drop) sqlNode);
 
-      String tableName = sqlCall.getTableName();
-      WorkspaceSchemaFactory.WorkspaceSchema wsSchema = getWorkspaceSchema(sqlCall.getSchemaPath(), tableName);
-
       try {
-
-        SchemaProvider schemaProvider = new FsMetastoreSchemaProvider(wsSchema, tableName);
+        SchemaProvider schemaProvider = SchemaProviderFactory.create(sqlCall, this);
 
         if (!schemaProvider.exists()) {
           return produceErrorResult(String.format("Schema [%s] does not exist in table [%s] root directory",
@@ -239,15 +221,10 @@ public abstract class SchemaHandler extends DefaultSqlHandler {
     public PhysicalPlan getPlan(SqlNode sqlNode) {
       SqlSchema.Describe sqlCall = ((SqlSchema.Describe) sqlNode);
 
-      String tableName = sqlCall.getTableName();
-      WorkspaceSchemaFactory.WorkspaceSchema wsSchema = getWorkspaceSchema(sqlCall.getSchemaPath(), tableName);
-
       try {
-
-        SchemaProvider schemaProvider = new FsMetastoreSchemaProvider(wsSchema, tableName);
+        SchemaProvider schemaProvider = SchemaProviderFactory.create(sqlCall, this);
 
         if (schemaProvider.exists()) {
-
           SchemaContainer schemaContainer = schemaProvider.read();
 
           String schema;
@@ -258,13 +235,19 @@ public abstract class SchemaHandler extends DefaultSqlHandler {
             case STATEMENT:
               TupleMetadata metadata = schemaContainer.getSchema();
               StringBuilder builder = new StringBuilder("CREATE OR REPLACE SCHEMA \n");
-              builder.append("(\n");
 
-              builder.append(metadata.toMetadataList().stream()
-              .map(ColumnMetadata::columnString)
-              .collect(Collectors.joining(", \n")));
+              List<ColumnMetadata> columnsMetadata = metadata.toMetadataList();
+              if (columnsMetadata.isEmpty()) {
+                builder.append("() \n");
+              } else {
+                builder.append("(\n");
 
-              builder.append("\n) \n");
+                builder.append(columnsMetadata.stream()
+                  .map(ColumnMetadata::columnString)
+                  .collect(Collectors.joining(", \n")));
+
+                builder.append("\n) \n");
+              }
 
               builder.append("FOR TABLE ").append(schemaContainer.getTable()).append(" \n");
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
index bfbf06f..81e8910 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlSchema.java
@@ -42,15 +42,17 @@ import java.util.Map;
 
 /**
  * Parent class for CREATE, DROP, DESCRIBE SCHEMA commands.
- * Holds logic common command property: table.
+ * Holds logic common command property: table, path.
  */
 public abstract class SqlSchema extends DrillSqlCall {
 
   protected final SqlIdentifier table;
+  protected final SqlNode path;
 
-  protected SqlSchema(SqlParserPos pos, SqlIdentifier table) {
+  protected SqlSchema(SqlParserPos pos, SqlIdentifier table, SqlNode path) {
     super(pos);
     this.table = table;
+    this.path = path;
   }
 
   @Override
@@ -84,6 +86,10 @@ public abstract class SqlSchema extends DrillSqlCall {
     return null;
   }
 
+  public String getPath() {
+    return path == null ? null : path.accept(LiteralVisitor.INSTANCE);
+  }
+
   /**
    * Visits literal and returns bare value (i.e. single quotes).
    */
@@ -105,7 +111,6 @@ public abstract class SqlSchema extends DrillSqlCall {
 
     private final SqlCharStringLiteral schema;
     private final SqlNode load;
-    private final SqlNode path;
     private final SqlNodeList properties;
     private final SqlLiteral createType;
 
@@ -124,10 +129,9 @@ public abstract class SqlSchema extends DrillSqlCall {
                   SqlNode path,
                   SqlNodeList properties,
                   SqlLiteral createType) {
-      super(pos, table);
+      super(pos, table, path);
       this.schema = schema;
       this.load = load;
-      this.path = path;
       this.properties = properties;
       this.createType = createType;
     }
@@ -200,10 +204,6 @@ public abstract class SqlSchema extends DrillSqlCall {
       return load == null ? null : load.accept(LiteralVisitor.INSTANCE);
     }
 
-    public String getPath() {
-      return path == null ? null : path.accept(LiteralVisitor.INSTANCE);
-    }
-
     public Map<String, String> getProperties() {
       if (properties == null) {
         return null;
@@ -239,7 +239,7 @@ public abstract class SqlSchema extends DrillSqlCall {
     };
 
     public Drop(SqlParserPos pos, SqlIdentifier table, SqlLiteral existenceCheck) {
-      super(pos, table);
+      super(pos, table, null);
       this.existenceCheck = existenceCheck;
     }
 
@@ -292,7 +292,7 @@ public abstract class SqlSchema extends DrillSqlCall {
     };
 
     public Describe(SqlParserPos pos, SqlIdentifier table, SqlLiteral format) {
-      super(pos, table);
+      super(pos, table, null);
       this.format = format;
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
index afab274..b46f6b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/AbstractColumnMetadata.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser;
 import org.joda.time.format.DateTimeFormatter;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -69,7 +70,7 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied implemen
   public static AbstractColumnMetadata createColumnMetadata(@JsonProperty("name") String name,
                                                             @JsonProperty("type") String type,
                                                             @JsonProperty("mode") DataMode mode,
-                                                            @JsonProperty("properties") Map<String, String> properties) {
+                                                            @JsonProperty("properties") Map<String, String> properties) throws IOException {
     ColumnMetadata columnMetadata = SchemaExprParser.parseColumn(name, type, mode);
     columnMetadata.setProperties(properties);
     return (AbstractColumnMetadata) columnMetadata;
@@ -314,8 +315,7 @@ public abstract class AbstractColumnMetadata extends AbstractPropertied implemen
         builder.append(" DEFAULT '").append(defaultValue()).append("'");
       }
 
-      Map<String,String> copy = new HashMap<>();
-      copy.putAll(properties());
+      Map<String, String> copy = new HashMap<>(properties());
       copy.remove(FORMAT_PROP);
       copy.remove(DEFAULT_VALUE_PROP);
       if (! copy.isEmpty()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
index 548aa30..38baaee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/TupleSchema.java
@@ -56,7 +56,9 @@ public class TupleSchema extends AbstractPropertied implements TupleMetadata {
   @JsonCreator
   public TupleSchema(@JsonProperty("columns") List<AbstractColumnMetadata> columns,
                      @JsonProperty("properties") Map<String, String> properties) {
-    columns.forEach(this::addColumn);
+    if (columns != null) {
+      columns.forEach(this::addColumn);
+    }
     setProperties(properties);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java
index ff3d1a7..e60a800 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/FsMetastoreSchemaProvider.java
@@ -45,7 +45,7 @@ public class FsMetastoreSchemaProvider extends PathSchemaProvider {
   }
 
   @Override
-  protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) {
+  protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) throws IOException {
     return new SchemaContainer(tableName, schema, properties);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
index 8b76618..a24f5a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/InlineSchemaProvider.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.record.metadata.schema;
 
 import org.apache.drill.exec.store.StorageStrategy;
 
+import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -27,11 +28,9 @@ import java.util.Map;
 public class InlineSchemaProvider implements SchemaProvider {
 
   private final String schema;
-  private final Map<String, String> properties;
 
-  public InlineSchemaProvider(String schema, Map<String, String> properties) {
+  public InlineSchemaProvider(String schema) {
     this.schema = schema;
-    this.properties = properties;
   }
 
   @Override
@@ -45,8 +44,8 @@ public class InlineSchemaProvider implements SchemaProvider {
   }
 
   @Override
-  public SchemaContainer read() {
-    return new SchemaContainer(null, schema, properties);
+  public SchemaContainer read() throws IOException {
+    return new SchemaContainer(null, schema, null);
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
index d73e247..8a8933a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/PathSchemaProvider.java
@@ -124,7 +124,7 @@ public class PathSchemaProvider implements SchemaProvider {
     return fs.exists(path);
   }
 
-  protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) {
+  protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) throws IOException {
     return new SchemaContainer(null, schema, properties);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java
index 8db8f8e..cdd56ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaContainer.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.metadata.TupleSchema;
 import org.apache.drill.exec.record.metadata.schema.parser.SchemaExprParser;
-
+import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -46,11 +46,11 @@ public class SchemaContainer {
     this.version = new Version(version);
   }
 
-  public SchemaContainer(String table, String schema, Map<String, String> properties) {
+  public SchemaContainer(String table, String schema, Map<String, String> properties) throws IOException {
     this(table, schema, properties, Version.VERSION_1); //current default version
   }
 
-  public SchemaContainer(String table, String schema, Map<String, String> properties, Integer version) {
+  public SchemaContainer(String table, String schema, Map<String, String> properties, Integer version) throws IOException {
     this.table = table;
     this.schema = schema == null ? null : convert(schema, properties);
     this.version = new Version(version);
@@ -76,7 +76,7 @@ public class SchemaContainer {
     return version;
   }
 
-  private TupleMetadata convert(String schemaString, Map<String, String> properties) {
+  private TupleMetadata convert(String schemaString, Map<String, String> properties) throws IOException {
     TupleMetadata schema = SchemaExprParser.parseSchema(schemaString);
     if (properties != null) {
       schema.setProperties(properties);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProviderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProviderFactory.java
new file mode 100644
index 0000000..fa895a9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/SchemaProviderFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record.metadata.schema;
+
+import org.apache.drill.exec.planner.sql.handlers.SchemaHandler;
+import org.apache.drill.exec.planner.sql.parser.SqlSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Factory class responsible for creating different instances of schema provider based on given parameters.
+ */
+public class SchemaProviderFactory {
+
+  /**
+   * Creates schema provider for sql schema commands.
+   *
+   * @param sqlSchema sql schema call
+   * @param schemaHandler schema handler
+   * @return schema provider instance
+   * @throws IOException if unable to init schema provider
+   */
+  public static SchemaProvider create(SqlSchema sqlSchema, SchemaHandler schemaHandler) throws IOException {
+    if (sqlSchema.hasTable()) {
+      String tableName = sqlSchema.getTableName();
+      WorkspaceSchemaFactory.WorkspaceSchema wsSchema = schemaHandler.getWorkspaceSchema(sqlSchema.getSchemaPath(), tableName);
+      return new FsMetastoreSchemaProvider(wsSchema, tableName);
+    } else {
+      return new PathSchemaProvider(new Path(sqlSchema.getPath()));
+    }
+  }
+
+  /**
+   * Creates schema provider based table function schema parameter.
+   *
+   * @param parameterValue schema parameter value
+   * @return schema provider instance
+   * @throws IOException if unable to init schema provider
+   */
+  public static SchemaProvider create(String parameterValue) throws IOException {
+    String[] split = parameterValue.split("=", 2);
+    if (split.length < 2) {
+      throw new IOException("Incorrect parameter value format: " + parameterValue);
+    }
+    ProviderType providerType = ProviderType.valueOf(split[0].trim().toUpperCase());
+    String value = split[1].trim();
+    switch (providerType) {
+      case INLINE:
+        return new InlineSchemaProvider(value);
+      case PATH:
+        char c = value.charAt(0);
+        // if path starts with any type of quotes, strip them
+        if (c == '\'' || c == '"' || c == '`') {
+          value = value.substring(1, value.length() - 1);
+        }
+        return new PathSchemaProvider(new Path(value));
+      default:
+        throw new IOException("Unexpected provider type: " + providerType);
+    }
+  }
+
+  /**
+   * Indicates provider type will be used to provide schema.
+   */
+  private enum ProviderType {
+    INLINE,
+    PATH
+  }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java
index ea5071e..c0636af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaExprParser.java
@@ -27,6 +27,8 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 
+import java.io.IOException;
+
 public class SchemaExprParser {
 
   /**
@@ -35,10 +37,15 @@ public class SchemaExprParser {
    *
    * @param schema schema definition
    * @return metadata description of the schema
+   * @throws IOException when unable to parse the schema
    */
-  public static TupleMetadata parseSchema(String schema) {
+  public static TupleMetadata parseSchema(String schema) throws IOException {
     SchemaVisitor visitor = new SchemaVisitor();
-    return visitor.visit(initParser(schema).schema());
+    try {
+      return visitor.visit(initParser(schema).schema());
+    } catch (SchemaParsingException e) {
+      throw new IOException(String.format("Unable to parse schema [%s]: %s", schema, e.getMessage()), e);
+    }
   }
 
   /**
@@ -48,8 +55,9 @@ public class SchemaExprParser {
    * @param type column type
    * @param mode column mode
    * @return column metadata
+   * @throws IOException when unable to parse the column
    */
-  public static ColumnMetadata parseColumn(String name, String type, TypeProtos.DataMode mode) {
+  public static ColumnMetadata parseColumn(String name, String type, TypeProtos.DataMode mode) throws IOException {
     return parseColumn(String.format("`%s` %s %s",
       name.replaceAll("(\\\\)|(`)", "\\\\$0"),
       type,
@@ -62,10 +70,15 @@ public class SchemaExprParser {
    *
    * @param column column definition
    * @return metadata description of the column
+   * @throws IOException when unable to parse the column
    */
-  public static ColumnMetadata parseColumn(String column) {
+  public static ColumnMetadata parseColumn(String column) throws IOException {
     SchemaVisitor.ColumnVisitor visitor = new SchemaVisitor.ColumnVisitor();
-    return visitor.visit(initParser(column).column());
+    try {
+      return visitor.visit(initParser(column).column());
+    } catch (SchemaParsingException e) {
+      throw new IOException(String.format("Unable to parse column [%s]: %s", column, e.getMessage()), e);
+    }
   }
 
   private static SchemaParser initParser(String value) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
index 62bcbf5..274b6ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/metadata/schema/parser/SchemaVisitor.java
@@ -41,7 +41,12 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
 
   @Override
   public TupleMetadata visitSchema(SchemaParser.SchemaContext ctx) {
-    return visitColumns(ctx.columns());
+    TupleMetadata schema = ctx.columns() == null ? new TupleSchema() : visitColumns(ctx.columns());
+    if (ctx.property_values() != null) {
+      PropertiesVisitor propertiesVisitor = new PropertiesVisitor();
+      schema.setProperties(ctx.property_values().accept(propertiesVisitor));
+    }
+    return schema;
   }
 
   @Override
@@ -64,18 +69,8 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
       ColumnVisitor columnVisitor = new ColumnVisitor();
       ColumnMetadata columnMetadata = ctx.column().accept(columnVisitor);
       if (ctx.property_values() != null) {
-        StringValueVisitor stringValueVisitor = new StringValueVisitor();
-        Map<String, String> columnProperties = new LinkedHashMap<>();
-        ctx.property_values().property_pair().forEach(
-          pair -> {
-            List<String> pairValues = pair.string_value().stream()
-              .map(stringValueVisitor::visit)
-              .collect(Collectors.toList());
-            Preconditions.checkState(pairValues.size() == 2);
-            columnProperties.put(pairValues.get(0), pairValues.get(1));
-          }
-        );
-        columnMetadata.setProperties(columnProperties);
+        PropertiesVisitor propertiesVisitor = new PropertiesVisitor();
+        columnMetadata.setProperties(ctx.property_values().accept(propertiesVisitor));
       }
       return columnMetadata;
     }
@@ -322,4 +317,28 @@ public class SchemaVisitor extends SchemaParserBaseVisitor<TupleMetadata> {
     }
   }
 
+  /**
+   * Visits schema or column properties.
+   * Properties must be identified as key values pairs separated by equals sign.
+   * Properties pairs must be separated by comma.
+   * Property name and value must be enclosed into backticks, single quotes or double quotes.
+   */
+  public static class PropertiesVisitor extends SchemaParserBaseVisitor<Map<String, String>> {
+
+    @Override
+    public Map<String, String> visitProperty_values(SchemaParser.Property_valuesContext ctx) {
+      StringValueVisitor stringValueVisitor = new StringValueVisitor();
+      Map<String, String> properties = new LinkedHashMap<>();
+      ctx.property_pair().forEach(
+        pair -> {
+          List<String> pairValues = pair.string_value().stream()
+            .map(stringValueVisitor::visit)
+            .collect(Collectors.toList());
+          Preconditions.checkState(pairValues.size() == 2);
+          properties.put(pairValues.get(0), pairValues.get(1));
+        }
+      );
+      return properties;
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
index aabcbb6..5b303f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java
@@ -26,7 +26,16 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
-
+import java.util.stream.IntStream;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
+import org.apache.drill.exec.store.table.function.TableParamDef;
+import org.apache.drill.exec.store.table.function.TableSignature;
+import org.apache.drill.exec.store.table.function.WithOptionsTableMacro;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.calcite.linq4j.tree.DefaultExpression;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -40,14 +49,41 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.dotdrill.View;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 
 public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSchema.class);
 
+  private static final Expression EXPRESSION = new DefaultExpression(Object.class);
+
+  private static final String SCHEMA_PARAMETER_NAME = "schema";
+
+  /**
+   * Schema parameter for table function which creates schema provider based on given parameter value.
+   */
+  private static final TableParamDef SCHEMA_PARAMETER = TableParamDef.optional(
+    SCHEMA_PARAMETER_NAME, String.class, (drillTable, value) -> {
+      if (value == null) {
+        return;
+      }
+
+      SchemaProvider schemaProvider;
+      try {
+        schemaProvider = SchemaProviderFactory.create(String.valueOf(value));
+        // since we path schema here as table parameter
+        // read schema to ensure that schema is valid before query execution
+        schemaProvider.read();
+      } catch (IOException | IllegalArgumentException e) {
+        throw UserException.validationError(e)
+          .message(e.getMessage())
+          .addContext("Schema parameter value [%s]", value)
+          .build(logger);
+      }
+
+      drillTable.getMetadataProviderManager().setSchemaProvider(schemaProvider);
+    });
+
   protected final List<String> schemaPath;
   protected final String name;
-  private static final Expression EXPRESSION = new DefaultExpression(Object.class);
 
   public AbstractSchema(List<String> parentSchemaPath, String name) {
     name = name == null ? null : name.toLowerCase();
@@ -150,8 +186,8 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
 
   /**
    * Create stats table entry for given <i>tableName</i>.
-   * @param tableName
-   * @return
+   * @param tableName table name
+   * @return instance of create table entry
    */
   public CreateTableEntry createStatsTable(String tableName) {
     throw UserException.unsupportedError()
@@ -162,8 +198,8 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
   /**
    * Create an append statistics table entry for given <i>tableName</i>. If there is not existing
    * statistics table, a new one is created.
-   * @param tableName
-   * @return
+   * @param tableName table name
+   * @return instance of create table entry
    */
   public CreateTableEntry appendToStatsTable(String tableName) {
     throw UserException.unsupportedError()
@@ -173,8 +209,8 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
 
   /**
    * Get the statistics table for given <i>tableName</i>
-   * @param tableName
-   * @return
+   * @param tableName table name
+   * @return instance of statistics table
    */
   public Table getStatsTable(String tableName) {
     throw UserException.unsupportedError()
@@ -195,9 +231,51 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer,
     return true;
   }
 
+  /**
+   * For the given table names returns list of acceptable table functions
+   * which are common for all Drill schemas. When overriding this method,
+   * parent functions must be included first to be evaluated first.
+   * If not included, parent functions won't be taken into account when creating table instance.
+   *
+   * @param name table name
+   * @return list of table functions
+   */
   @Override
   public Collection<Function> getFunctions(String name) {
-    return Collections.emptyList();
+    List<TableParamDef> parameters = getFunctionParameters();
+    TableSignature signature = TableSignature.of(name, parameters);
+    WithOptionsTableMacro function = new WithOptionsTableMacro(signature, arguments -> {
+      Table table = getTable(name);
+      if (table instanceof DrillTable) {
+        return applyFunctionParameters((DrillTable) table, parameters, arguments);
+      }
+      throw new DrillRuntimeException(String.format("Table [%s] is not of Drill table instance. " +
+        "Given instance is of [%s].", name, table.getClass().getName()));
+    });
+    return Collections.singletonList(function);
+  }
+
+  /**
+   * Returns of common table function parameters that can be used all Drill schema implementations.
+   *
+   * @return list of table function parameters
+   */
+  public List<TableParamDef> getFunctionParameters() {
+    return Collections.singletonList(SCHEMA_PARAMETER);
+  }
+
+  /**
+   * For the given list of parameters definitions executes action for the corresponding value.
+   *
+   * @param drillTable Drill table instance
+   * @param paramDefs parameter definitions
+   * @param values parameter values
+   * @return updated Drill table instance
+   */
+  public DrillTable applyFunctionParameters(DrillTable drillTable, List<TableParamDef> paramDefs, List<Object> values) {
+    IntStream.range(0, paramDefs.size())
+      .forEach(i -> paramDefs.get(i).apply(drillTable, values.get(i)));
+    return drillTable;
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionExtractor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionExtractor.java
index 22aad73..447403f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionExtractor.java
@@ -32,13 +32,14 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.FormatPluginConfigBase;
 import org.apache.drill.common.scanner.persistence.ScanResult;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableInstance;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableSignature;
+import org.apache.drill.exec.store.table.function.TableParamDef;
+import org.apache.drill.exec.store.table.function.TableSignature;
 import org.slf4j.Logger;
 
 import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 
 /**
- * manages format plugins options to define table macros
+ * Manages format plugins options to define table macros.
  */
 final class FormatPluginOptionExtractor {
   private static final Logger logger = org.slf4j.LoggerFactory.getLogger(FormatPluginOptionExtractor.class);
@@ -46,8 +47,9 @@ final class FormatPluginOptionExtractor {
   private final Map<String, FormatPluginOptionsDescriptor> optionsByTypeName;
 
   /**
-   * extracts the format plugin options based on the scanned implementations of {@link FormatPluginConfig}
-   * @param scanResult
+   * Extracts the format plugin options based on the scanned implementations of {@link FormatPluginConfig}.
+   *
+   * @param scanResult scan result of the classpath
    */
   FormatPluginOptionExtractor(ScanResult scanResult) {
     Map<String, FormatPluginOptionsDescriptor> result = new HashMap<>();
@@ -68,37 +70,40 @@ final class FormatPluginOptionExtractor {
   }
 
   /**
-   * give a table name, returns function signatures to configure the FormatPlugin
+   * Give a table name, returns function signatures to configure the FormatPlugin.
+   *
    * @param tableName the name of the table (or table function in this context)
+   * @param tableParameters common table parameters to be included
    * @return the available signatures
    */
-  List<TableSignature> getTableSignatures(String tableName) {
+  List<TableSignature> getTableSignatures(String tableName, List<TableParamDef> tableParameters) {
     List<TableSignature> result = new ArrayList<>();
     for (FormatPluginOptionsDescriptor optionsDescriptor : optionsByTypeName.values()) {
-      TableSignature sig = optionsDescriptor.getTableSignature(tableName);
+      TableSignature sig = optionsDescriptor.getTableSignature(tableName, tableParameters);
       result.add(sig);
     }
     return unmodifiableList(result);
   }
 
   /**
-   * given a table function signature and the corresponding parameters
-   * return the corresponding formatPlugin configuration
-   * @param t the signature and parameters (it should be one of the signatures returned by {@link FormatPluginOptionExtractor#getTableSignatures(String)})
+   * Given a table function signature and the corresponding parameters
+   * return the corresponding formatPlugin configuration.
+   *
+   * @param t the signature and parameters (it should be one of the signatures returned by {@link FormatPluginOptionExtractor#getTableSignatures(String, List)})
    * @return the config
    */
   FormatPluginConfig createConfigForTable(TableInstance t) {
-    if (!t.sig.params.get(0).name.equals("type")) {
+    if (!t.sig.getSpecificParams().get(0).getName().equals("type")) {
       throw UserException.parseError()
         .message("unknown first param for %s", t.sig)
-        .addContext("table", t.sig.name)
+        .addContext("table", t.sig.getName())
         .build(logger);
     }
-    String type = (String)t.params.get(0);
+    String type = (String) t.params.get(0);
     if (type == null) {
       throw UserException.parseError()
           .message("type param must be present but was missing")
-          .addContext("table", t.sig.name)
+          .addContext("table", t.sig.getName())
           .build(logger);
     }
     FormatPluginOptionsDescriptor optionsDescriptor = optionsByTypeName.get(type.toLowerCase());
@@ -107,7 +112,7 @@ final class FormatPluginOptionExtractor {
           .message(
               "unknown type %s, expected one of %s",
               type, optionsByTypeName.keySet())
-          .addContext("table", t.sig.name)
+          .addContext("table", t.sig.getName())
           .build(logger);
     }
     return optionsDescriptor.createConfigForTable(t);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
index b140fce..dbfcca4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPluginOptionsDescriptor.java
@@ -30,8 +30,8 @@ import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableInstance;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableParamDef;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.TableSignature;
+import org.apache.drill.exec.store.table.function.TableParamDef;
+import org.apache.drill.exec.store.table.function.TableSignature;
 import org.slf4j.Logger;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -61,7 +61,7 @@ final class FormatPluginOptionsDescriptor {
     JsonTypeName annotation = pluginConfigClass.getAnnotation(JsonTypeName.class);
     this.typeName = annotation != null ? annotation.value() : null;
     if (this.typeName != null) {
-      paramsByName.put("type", new TableParamDef("type", String.class));
+      paramsByName.put("type", TableParamDef.required("type", String.class, null));
     }
     for (Field field : fields) {
       if (Modifier.isStatic(field.getModifiers())
@@ -74,18 +74,20 @@ final class FormatPluginOptionsDescriptor {
         // calcite does not like char type. Just use String and enforce later that length == 1
         fieldType = String.class;
       }
-      paramsByName.put(field.getName(), new TableParamDef(field.getName(), fieldType).optional());
+      paramsByName.put(field.getName(), TableParamDef.optional(field.getName(), fieldType, null));
     }
     this.functionParamsByName = unmodifiableMap(paramsByName);
   }
 
   /**
-   * returns the table function signature for this format plugin config class
+   * Returns the table function signature for this format plugin config class.
+   *
    * @param tableName the table for which we want a table function signature
+   * @param tableParameters common table parameters to be included
    * @return the signature
    */
-  TableSignature getTableSignature(String tableName) {
-    return new TableSignature(tableName, params());
+  TableSignature getTableSignature(String tableName, List<TableParamDef> tableParameters) {
+    return TableSignature.of(tableName, tableParameters, params());
   }
 
   /**
@@ -106,23 +108,28 @@ final class FormatPluginOptionsDescriptor {
       if (i != 0) {
         sb.append(", ");
       }
-      sb.append(paramDef.name).append(": ").append(paramDef.type.getSimpleName());
+      sb.append(paramDef.getName()).append(": ").append(paramDef.getType().getSimpleName());
     }
     sb.append(")");
     return sb.toString();
   }
 
   /**
-   * creates an instance of the FormatPluginConfig based on the passed parameters
+   * Creates an instance of the FormatPluginConfig based on the passed parameters.
+   *
    * @param t the signature and the parameters passed to the table function
    * @return the corresponding config
    */
   FormatPluginConfig createConfigForTable(TableInstance t) {
+    List<TableParamDef> formatParams = t.sig.getSpecificParams();
+    // Exclude common params values, leave only format related params
+    List<Object> formatParamsValues = t.params.subList(0, t.params.size() - t.sig.getCommonParams().size());
+
     // Per the constructor, the first param is always "type"
-    TableParamDef typeParamDef = t.sig.params.get(0);
-    Object typeParam = t.params.get(0);
-    if (!typeParamDef.name.equals("type")
-        || typeParamDef.type != String.class
+    TableParamDef typeParamDef = formatParams.get(0);
+    Object typeParam = formatParamsValues.get(0);
+    if (!typeParamDef.getName().equals("type")
+        || typeParamDef.getType() != String.class
         || !(typeParam instanceof String)
         || !typeName.equalsIgnoreCase((String)typeParam)) {
       // if we reach here, there's a bug as all signatures generated start with a type parameter
@@ -131,7 +138,7 @@ final class FormatPluginOptionsDescriptor {
               "This function signature is not supported: %s\n"
               + "expecting %s",
               t.presentParams(), this.presentParams())
-          .addContext("table", t.sig.name)
+          .addContext("table", t.sig.getName())
           .build(logger);
     }
     FormatPluginConfig config;
@@ -142,11 +149,11 @@ final class FormatPluginOptionsDescriptor {
           .message(
               "configuration for format of type %s can not be created (class: %s)",
               this.typeName, pluginConfigClass.getName())
-          .addContext("table", t.sig.name)
+          .addContext("table", t.sig.getName())
           .build(logger);
     }
-    for (int i = 1; i < t.params.size(); i++) {
-      Object param = t.params.get(i);
+    for (int i = 1; i < formatParamsValues.size(); i++) {
+      Object param = formatParamsValues.get(i);
       if (param == null) {
         // when null is passed, we leave the default defined in the config class
         continue;
@@ -155,27 +162,27 @@ final class FormatPluginOptionsDescriptor {
         // normalize Java literals, ex: \t, \n, \r
         param = StringEscapeUtils.unescapeJava((String) param);
       }
-      TableParamDef paramDef = t.sig.params.get(i);
-      TableParamDef expectedParamDef = this.functionParamsByName.get(paramDef.name);
-      if (expectedParamDef == null || expectedParamDef.type != paramDef.type) {
+      TableParamDef paramDef = formatParams.get(i);
+      TableParamDef expectedParamDef = this.functionParamsByName.get(paramDef.getName());
+      if (expectedParamDef == null || expectedParamDef.getType() != paramDef.getType()) {
         throw UserException.parseError()
         .message(
             "The parameters provided are not applicable to the type specified:\n"
                 + "provided: %s\nexpected: %s",
             t.presentParams(), this.presentParams())
-        .addContext("table", t.sig.name)
+        .addContext("table", t.sig.getName())
         .build(logger);
       }
       try {
-        Field field = pluginConfigClass.getField(paramDef.name);
+        Field field = pluginConfigClass.getField(paramDef.getName());
         field.setAccessible(true);
         if (field.getType() == char.class && param instanceof String) {
           String stringParam = (String) param;
           if (stringParam.length() != 1) {
             throw UserException.parseError()
               .message("Expected single character but was String: %s", stringParam)
-              .addContext("table", t.sig.name)
-              .addContext("parameter", paramDef.name)
+              .addContext("table", t.sig.getName())
+              .addContext("parameter", paramDef.getName())
               .build(logger);
           }
           param = stringParam.charAt(0);
@@ -183,9 +190,9 @@ final class FormatPluginOptionsDescriptor {
         field.set(config, param);
       } catch (IllegalAccessException | NoSuchFieldException | SecurityException e) {
         throw UserException.parseError(e)
-            .message("can not set value %s to parameter %s: %s", param, paramDef.name, paramDef.type)
-            .addContext("table", t.sig.name)
-            .addContext("parameter", paramDef.name)
+            .message("Can not set value %s to parameter %s: %s", param, paramDef.getName(), paramDef.getType())
+            .addContext("table", t.sig.getName())
+            .addContext("parameter", paramDef.getName())
             .build(logger);
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index a966e24..d11fb70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -36,13 +36,8 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.Function;
-import org.apache.calcite.schema.FunctionParameter;
 import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.TableMacro;
-import org.apache.calcite.schema.TranslatableTable;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -61,7 +56,6 @@ import org.apache.drill.exec.physical.base.MetadataProviderManager;
 import org.apache.drill.exec.planner.common.DrillStatsTable;
 import org.apache.drill.exec.planner.logical.CreateTableEntry;
 import org.apache.drill.exec.planner.logical.DrillTable;
-import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
 import org.apache.drill.exec.planner.logical.DrillViewTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
@@ -70,6 +64,9 @@ import org.apache.drill.exec.record.metadata.schema.FsMetastoreSchemaProvider;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.PartitionNotFoundException;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.table.function.TableParamDef;
+import org.apache.drill.exec.store.table.function.TableSignature;
+import org.apache.drill.exec.store.table.function.WithOptionsTableMacro;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.store.StorageStrategy;
 import org.apache.drill.exec.store.easy.json.JSONFormatPlugin;
@@ -220,81 +217,21 @@ public class WorkspaceSchemaFactory {
     }
   }
 
-  /**
-   * Implementation of a table macro that generates a table based on parameters
-   */
-  static final class WithOptionsTableMacro implements TableMacro {
-
-    private final TableSignature sig;
-    private final WorkspaceSchema schema;
-
-    WithOptionsTableMacro(TableSignature sig, WorkspaceSchema schema) {
-      super();
-      this.sig = sig;
-      this.schema = schema;
-    }
-
-    @Override
-    public List<FunctionParameter> getParameters() {
-      List<FunctionParameter> result = new ArrayList<>();
-      for (int i = 0; i < sig.params.size(); i++) {
-        final TableParamDef p = sig.params.get(i);
-        final int ordinal = i;
-        result.add(new FunctionParameter() {
-          @Override
-          public int getOrdinal() {
-            return ordinal;
-          }
-
-          @Override
-          public String getName() {
-            return p.name;
-          }
-
-          @Override
-          public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return typeFactory.createJavaType(p.type);
-          }
-
-          @Override
-          public boolean isOptional() {
-            return p.optional;
-          }
-        });
-      }
-      return result;
-    }
-
-    @Override
-    public TranslatableTable apply(List<Object> arguments) {
-      DrillTable drillTable = schema.getDrillTable(new TableInstance(sig, arguments));
-      if (drillTable == null) {
-        throw UserException
-            .validationError()
-            .message("Unable to find table [%s] in schema [%s]", sig.name, schema.getFullSchemaName())
-            .build(logger);
-    }
-      return new DrillTranslatableTable(drillTable);
-    }
-
-  }
-
   private static Object[] array(Object... objects) {
     return objects;
   }
 
-  static final class TableInstance {
+  public static final class TableInstance {
     final TableSignature sig;
     final List<Object> params;
 
-    TableInstance(TableSignature sig, List<Object> params) {
-      super();
-      if (params.size() != sig.params.size()) {
+    public TableInstance(TableSignature sig, List<Object> params) {
+      if (params.size() != sig.getParams().size()) {
         throw UserException.parseError()
             .message(
                 "should have as many params (%d) as signature (%d)",
-                params.size(), sig.params.size())
-            .addContext("table", sig.name)
+                params.size(), sig.getParams().size())
+            .addContext("table", sig.getName())
             .build(logger);
       }
       this.sig = sig;
@@ -312,8 +249,8 @@ public class WorkspaceSchemaFactory {
           } else {
             sb.append(", ");
           }
-          TableParamDef paramDef = sig.params.get(i);
-          sb.append(paramDef.name).append(": ").append(paramDef.type.getSimpleName()).append(" => ").append(param);
+          TableParamDef paramDef = sig.getParams().get(i);
+          sb.append(paramDef.getName()).append(": ").append(paramDef.getType().getSimpleName()).append(" => ").append(param);
         }
       }
       sb.append(")");
@@ -339,86 +276,7 @@ public class WorkspaceSchemaFactory {
 
     @Override
     public String toString() {
-      return sig.name + (params.size() == 0 ? "" : presentParams());
-    }
-  }
-
-  static final class TableParamDef {
-    final String name;
-    final Class<?> type;
-    final boolean optional;
-
-    TableParamDef(String name, Class<?> type) {
-      this(name, type, false);
-    }
-
-    TableParamDef(String name, Class<?> type, boolean optional) {
-      this.name = name;
-      this.type = type;
-      this.optional = optional;
-    }
-
-    TableParamDef optional() {
-      return new TableParamDef(name, type, true);
-    }
-
-    private Object[] toArray() {
-      return array(name, type, optional);
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(toArray());
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof TableParamDef) {
-        return Arrays.equals(this.toArray(), ((TableParamDef)obj).toArray());
-      }
-      return false;
-    }
-
-    @Override
-    public String toString() {
-      String p = name + ": " + type;
-      return optional ? "[" + p + "]" : p;
-    }
-  }
-
-  static final class TableSignature {
-    final String name;
-    final List<TableParamDef> params;
-
-    TableSignature(String name, TableParamDef... params) {
-      this(name, Arrays.asList(params));
-    }
-
-    TableSignature(String name, List<TableParamDef> params) {
-      this.name = name;
-      this.params = unmodifiableList(params);
-    }
-
-    private Object[] toArray() {
-      return array(name, params);
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(toArray());
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof TableSignature) {
-        return Arrays.equals(this.toArray(), ((TableSignature)obj).toArray());
-      }
-      return false;
-    }
-
-    @Override
-    public String toString() {
-      return name + params;
+      return sig.getName() + (params.size() == 0 ? "" : presentParams());
     }
   }
 
@@ -500,7 +358,7 @@ public class WorkspaceSchemaFactory {
 
     private Set<String> rawTableNames() {
       return tables.keySet().stream()
-          .map(input -> input.sig.name)
+          .map(input -> input.sig.getName())
           .collect(Collectors.toSet());
     }
 
@@ -516,10 +374,16 @@ public class WorkspaceSchemaFactory {
 
     @Override
     public List<Function> getFunctions(String name) {
-      List<TableSignature> sigs = optionExtractor.getTableSignatures(name);
-      return sigs.stream()
-          .map(input -> new WithOptionsTableMacro(input, WorkspaceSchema.this))
-          .collect(Collectors.toList());
+      // add parent functions first
+      List<Function> functions = new ArrayList<>(super.getFunctions(name));
+
+      List<TableParamDef> tableParameters = getFunctionParameters();
+      List<TableSignature> signatures = optionExtractor.getTableSignatures(name, tableParameters);
+      signatures.stream()
+        .map(signature -> new WithOptionsTableMacro(signature, params -> getDrillTable(new TableInstance(signature, params))))
+        .forEach(functions::add);
+
+      return functions;
     }
 
     private View getView(DotDrillFile f) throws IOException {
@@ -529,7 +393,7 @@ public class WorkspaceSchemaFactory {
 
     @Override
     public Table getTable(String tableName) {
-      TableInstance tableKey = new TableInstance(new TableSignature(tableName), ImmutableList.of());
+      TableInstance tableKey = new TableInstance(TableSignature.of(tableName), ImmutableList.of());
       // first check existing tables.
       if (tables.alreadyContainsKey(tableKey)) {
         return tables.get(tableKey);
@@ -549,7 +413,7 @@ public class WorkspaceSchemaFactory {
               .build(logger);
           }
         } catch (IOException e) {
-          logger.warn("Failure while trying to list view tables in workspace [{}]", tableName, getFullSchemaName(), e);
+          logger.warn("Failure while trying to list view tables in workspace [{}]", getFullSchemaName(), e);
         }
 
         for (DotDrillFile f : files) {
@@ -574,11 +438,11 @@ public class WorkspaceSchemaFactory {
       }
       final DrillTable table = tables.get(tableKey);
       if (table != null) {
-        MetadataProviderManager providerManager = FileSystemMetadataProviderManager.getMetadataProviderManager();
+        MetadataProviderManager providerManager = FileSystemMetadataProviderManager.init();
 
         setMetadataTable(providerManager, table, tableName);
         setSchema(providerManager, tableName);
-        table.setTableMetadataProviderBuilder(providerManager);
+        table.setTableMetadataProviderManager(providerManager);
       }
       return table;
     }
@@ -589,7 +453,7 @@ public class WorkspaceSchemaFactory {
           FsMetastoreSchemaProvider schemaProvider = new FsMetastoreSchemaProvider(this, tableName);
           providerManager.setSchemaProvider(schemaProvider);
         } catch (IOException e) {
-          logger.debug("Unable to deserialize schema from schema file for table: " + tableName, e);
+          logger.debug("Unable to init schema provider for table [{}]", tableName, e);
         }
       }
     }
@@ -684,7 +548,7 @@ public class WorkspaceSchemaFactory {
       ensureNotStatsTable(tableName);
       final String statsTableName = getStatsTableName(tableName);
       FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
-      return createOrAppendToTable(statsTableName, formatPlugin, ImmutableList.<String>of(),
+      return createOrAppendToTable(statsTableName, formatPlugin, Collections.emptyList(),
           StorageStrategy.DEFAULT);
     }
 
@@ -693,7 +557,7 @@ public class WorkspaceSchemaFactory {
       ensureNotStatsTable(tableName);
       final String statsTableName = getStatsTableName(tableName);
       FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
-      return createOrAppendToTable(statsTableName, formatPlugin, ImmutableList.<String>of(),
+      return createOrAppendToTable(statsTableName, formatPlugin, Collections.emptyList(),
           StorageStrategy.DEFAULT);
     }
 
@@ -726,14 +590,14 @@ public class WorkspaceSchemaFactory {
     @Override
     public DrillTable create(TableInstance key) {
       try {
-        final FileSelection fileSelection = FileSelection.create(getFS(), config.getLocation(), key.sig.name, config.allowAccessOutsideWorkspace());
+        final FileSelection fileSelection = FileSelection.create(getFS(), config.getLocation(), key.sig.getName(), config.allowAccessOutsideWorkspace());
         if (fileSelection == null) {
           return null;
         }
 
         boolean hasDirectories = fileSelection.containsDirectories(getFS());
 
-        if (key.sig.params.size() > 0) {
+        if (key.sig.getParams().size() > 0) {
           FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
 
           if (newSelection.isEmptyDirectory()) {
@@ -742,7 +606,15 @@ public class WorkspaceSchemaFactory {
 
           FormatPluginConfig formatConfig = optionExtractor.createConfigForTable(key);
           FormatSelection selection = new FormatSelection(formatConfig, newSelection);
-          return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
+          DrillTable drillTable = new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
+
+          List<TableParamDef> commonParams = key.sig.getCommonParams();
+          if (commonParams.isEmpty()) {
+            return drillTable;
+          }
+          // extract only common parameters related values
+          List<Object> paramValues = key.params.subList(key.params.size() - commonParams.size(), key.params.size());
+          return applyFunctionParameters(drillTable, commonParams, paramValues);
         }
 
         if (hasDirectories) {
@@ -811,7 +683,7 @@ public class WorkspaceSchemaFactory {
           }
         }
       } catch (IOException e) {
-        logger.debug("Failed to find format matcher for file: %s", file, e);
+        logger.debug("Failed to find format matcher for file: {}", file, e);
       }
       return null;
     }
@@ -925,7 +797,7 @@ public class WorkspaceSchemaFactory {
     @Override
     public List<Map.Entry<String, TableType>> getTableNamesAndTypes() {
       return Stream.concat(
-          tables.entrySet().stream().map(kv -> Pair.of(kv.getKey().sig.name, kv.getValue().getJdbcTableType())),
+          tables.entrySet().stream().map(kv -> Pair.of(kv.getKey().sig.getName(), kv.getValue().getJdbcTableType())),
           getViews().stream().map(viewName -> Pair.of(viewName, TableType.VIEW))
       ).collect(Collectors.toList());
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/TableParamDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/TableParamDef.java
new file mode 100644
index 0000000..b0167cc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/TableParamDef.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.table.function;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+/**
+ * Definition of table parameters, contains parameter name, class type, type status (optional / required).
+ * May also contain action this parameter can perform for the given value and table.
+ */
+public final class TableParamDef {
+
+  private final String name;
+  private final Class<?> type;
+  private final boolean optional;
+  private final BiConsumer<DrillTable, Object> action;
+
+  public static TableParamDef required(String name, Class<?> type, BiConsumer<DrillTable, Object> action) {
+    return new TableParamDef(name, type, false, action);
+  }
+
+  public static TableParamDef optional(String name, Class<?> type, BiConsumer<DrillTable, Object> action) {
+    return new TableParamDef(name, type, true, action);
+  }
+
+  private TableParamDef(String name, Class<?> type, boolean optional, BiConsumer<DrillTable, Object> action) {
+    this.name = name;
+    this.type = type;
+    this.optional = optional;
+    this.action = action;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Class<?> getType() {
+    return type;
+  }
+
+  public boolean isOptional() {
+    return optional;
+  }
+
+  public void apply(DrillTable drillTable, Object value) {
+    if (action == null) {
+      return;
+    }
+    action.accept(drillTable, value);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, type, optional, action);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TableParamDef that = (TableParamDef) o;
+    return optional == that.optional
+      && Objects.equals(name, that.name)
+      && Objects.equals(type, that.type)
+      && Objects.equals(action, that.action);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    if (optional) {
+      builder.append("[");
+    }
+    builder.append(name).append(": ").append(type);
+    if (action != null) {
+      builder.append(" (with action)");
+    }
+    if (optional) {
+      builder.append("]");
+    }
+    return builder.toString();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/TableSignature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/TableSignature.java
new file mode 100644
index 0000000..24de439
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/TableSignature.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.table.function;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Describes table and parameters that can be used during table initialization and usage.
+ * Common parameters are those that are common for all tables (ex: schema).
+ * Specific parameters are those that are specific to the schema table belongs to.
+ */
+public final class TableSignature {
+
+  private final String name;
+  private final List<TableParamDef> commonParams;
+  private final List<TableParamDef> specificParams;
+  private final List<TableParamDef> params;
+
+  public static TableSignature of(String name) {
+    return new TableSignature(name, Collections.emptyList(), Collections.emptyList());
+  }
+
+  public static TableSignature of(String name, List<TableParamDef> commonParams) {
+    return new TableSignature(name, commonParams, Collections.emptyList());
+  }
+
+  public static TableSignature of(String name, List<TableParamDef> commonParams, List<TableParamDef> specificParams) {
+    return new TableSignature(name, commonParams, specificParams);
+  }
+
+  private TableSignature(String name, List<TableParamDef> commonParams, List<TableParamDef> specificParams) {
+    this.name = name;
+    this.commonParams = Collections.unmodifiableList(commonParams);
+    this.specificParams = Collections.unmodifiableList(specificParams);
+    this.params = Stream.of(specificParams, commonParams)
+      .flatMap(Collection::stream)
+      .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public List<TableParamDef> getParams() {
+    return params;
+  }
+
+  public List<TableParamDef> getCommonParams() {
+    return commonParams;
+  }
+
+  public List<TableParamDef> getSpecificParams() { return specificParams; }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, params);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TableSignature that = (TableSignature) o;
+    return name.equals(that.name) && params.equals(that.params);
+  }
+
+  @Override
+  public String toString() {
+    return "TableSignature{" +
+      "name='" + name + '\'' +
+      ", commonParams=" + commonParams +
+      ", specificParams=" + specificParams + '}';
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/WithOptionsTableMacro.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/WithOptionsTableMacro.java
new file mode 100644
index 0000000..620dad5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/table/function/WithOptionsTableMacro.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.table.function;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.FunctionParameter;
+import org.apache.calcite.schema.TableMacro;
+import org.apache.calcite.schema.TranslatableTable;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Implementation of a table macro that generates a table based on parameters.
+ */
+public class WithOptionsTableMacro implements TableMacro {
+
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WithOptionsTableMacro.class);
+
+  private final TableSignature sig;
+  private final Function<List<Object>, DrillTable> function;
+
+  public WithOptionsTableMacro(TableSignature sig, Function<List<Object>, DrillTable> function) {
+    this.sig = sig;
+    this.function = function;
+  }
+
+  @Override
+  public TranslatableTable apply(List<Object> arguments) {
+    DrillTable drillTable = function.apply(arguments);
+    if (drillTable == null) {
+      throw UserException
+        .validationError()
+        .message("Unable to find table [%s]", sig.getName())
+        .build(logger);
+    }
+    return new DrillTranslatableTable(drillTable);
+  }
+
+  @Override
+  public List<FunctionParameter> getParameters() {
+    List<FunctionParameter> result = new ArrayList<>();
+    for (int i = 0; i < sig.getParams().size(); i++) {
+      final TableParamDef p = sig.getParams().get(i);
+      final int ordinal = i;
+      FunctionParameter functionParameter = new FunctionParameter() {
+        @Override
+        public int getOrdinal() {
+          return ordinal;
+        }
+
+        @Override
+        public String getName() {
+          return p.getName();
+        }
+
+        @Override
+        public RelDataType getType(RelDataTypeFactory typeFactory) {
+          return typeFactory.createJavaType(p.getType());
+        }
+
+        @Override
+        public boolean isOptional() {
+          return p.isOptional();
+        }
+      };
+      result.add(functionParameter);
+    }
+    return result;
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
index 84acd1a..3258e1c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaCommands.java
@@ -344,7 +344,7 @@ public class TestSchemaCommands extends ClusterTest {
   @Test
   public void testCreateWithVariousColumnProperties() throws Exception {
     File tmpDir = dirTestWatcher.getTmpDir();
-    File schemaFile = new File(tmpDir, "schema_for_create_with__various_column_properties.schema");
+    File schemaFile = new File(tmpDir, "schema_for_create_with_various_column_properties.schema");
     assertFalse(schemaFile.exists());
     try {
       testBuilder()
@@ -396,6 +396,40 @@ public class TestSchemaCommands extends ClusterTest {
   }
 
   @Test
+  public void testCreateWithoutColumns() throws Exception {
+    File tmpDir = dirTestWatcher.getTmpDir();
+    File schemaFile = new File(tmpDir, "schema_for_create_without_columns.schema");
+    assertFalse(schemaFile.exists());
+    try {
+      testBuilder()
+        .sqlQuery("create schema () " +
+            "path '%s' " +
+            "properties ('prop' = 'val')",
+          schemaFile.getPath())
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Created schema for [%s]", schemaFile.getPath()))
+        .go();
+
+      SchemaProvider schemaProvider = new PathSchemaProvider(new Path(schemaFile.getPath()));
+      assertTrue(schemaProvider.exists());
+
+      SchemaContainer schemaContainer = schemaProvider.read();
+
+      assertNull(schemaContainer.getTable());
+      TupleMetadata schema = schemaContainer.getSchema();
+      assertNotNull(schema);
+
+      assertTrue(schema.isEmpty());
+      assertEquals("val", schema.property("prop"));
+    } finally {
+      if (schemaFile.exists()) {
+        assertTrue(schemaFile.delete());
+      }
+    }
+  }
+
+  @Test
   public void testCreateUsingLoadFromMissingFile() throws Exception {
     thrown.expect(UserException.class);
     thrown.expectMessage("RESOURCE ERROR: File with raw schema [path/to/file] does not exist");
@@ -665,4 +699,32 @@ public class TestSchemaCommands extends ClusterTest {
     }
   }
 
+  @Test
+  public void testDescribeWithoutColumns() throws Exception {
+    String tableName = "table_describe_statement_without_columns";
+    String table = String.format("dfs.tmp.%s", tableName);
+    try {
+      run("create table %s as select 'a' as c from (values(1))", table);
+
+      String statement = "CREATE OR REPLACE SCHEMA \n"
+        + "() \n"
+        + "FOR TABLE dfs.tmp.`table_describe_statement_without_columns` \n"
+        + "PROPERTIES (\n"
+        + "'drill.strict' = 'false', \n"
+        + "'some_schema_prop' = 'some_schema_val'\n"
+        + ")";
+
+      run(statement);
+
+      testBuilder()
+        .sqlQuery("describe schema for table %s as statement", table)
+        .unOrdered()
+        .baselineColumns("schema")
+        .baselineValues(statement)
+        .go();
+
+    } finally {
+      run("drop table if exists %s", table);
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
new file mode 100644
index 0000000..6f89b9d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.util.StoragePluginTestUtils;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSchemaWithTableFunction extends ClusterTest {
+
+  private static final String DATA_PATH = "store/text/data";
+  private static final String TABLE_PLACEHOLDER = "[TABLE]";
+  private static final String TABLE_NAME = String.format("%s.`%s/%s`", StoragePluginTestUtils.DFS_PLUGIN_NAME, DATA_PATH, TABLE_PLACEHOLDER);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    dirTestWatcher.copyResourceToRoot(Paths.get(DATA_PATH));
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+    client.alterSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY, true);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    client.resetSession(ExecConstants.ENABLE_V3_TEXT_READER_KEY);
+  }
+
+  @Test
+  public void testSchemaInline() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+    String query = "select Year from table(%s(schema=>'inline=(`Year` int)')) where Make = 'Ford'";
+
+    testBuilder()
+      .sqlQuery(query, table)
+      .unOrdered()
+      .baselineColumns("Year")
+      .baselineValues(1997)
+      .go();
+
+    String plan = queryBuilder().sql(query, table).explainText();
+    assertTrue(plan.contains("schema=[TupleSchema [PrimitiveColumnMetadata [`Year` (INT(0, 0):OPTIONAL)]]]"));
+  }
+
+  @Test
+  public void testSchemaInlineWithProperties() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+    String query = "select * from table(%s(schema=>'inline=(`Year` int, `Make` varchar) " +
+      "properties {`drill.strict` = `true`}')) where Make = 'Ford'";
+
+    testBuilder()
+      .sqlQuery(query, table)
+      .unOrdered()
+      .baselineColumns("Year", "Make")
+      .baselineValues(1997, "Ford")
+      .go();
+
+    String plan = queryBuilder().sql(query, table).explainText();
+    assertFalse(plan.contains("schema=null"));
+  }
+
+  @Test
+  public void testSchemaInlineWithoutColumns() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+    String query = "select * from table(%s(schema=>'inline=() " +
+      "properties {`drill.strict` = `true`}')) where Make = 'Ford'";
+
+    String plan = queryBuilder().sql(query, table).explainText();
+    assertFalse(plan.contains("schema=null"));
+  }
+
+  @Test
+  public void testSchemaInlineWithTableProperties() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh-test");
+    String query = "select Year from table(%s(type=>'text', fieldDelimiter=>',', extractHeader=>true, " +
+      "schema=>'inline=(`Year` int)')) where Make = 'Ford'";
+
+    testBuilder()
+      .sqlQuery(query, table)
+      .unOrdered()
+      .baselineColumns("Year")
+      .baselineValues(1997)
+      .go();
+
+    String plan = queryBuilder().sql(query, table).explainText();
+    assertFalse(plan.contains("schema=null"));
+  }
+
+  @Test
+  public void testSchemaInlineWithPropertiesInDifferentOrder() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh-test");
+
+    String sqlQuery = "select Year from table(%s(schema=>'inline=(`Year` int)', fieldDelimiter=>',', extractHeader=>true, " +
+      "type=>'text'))";
+
+    String sqlBaselineQuery = "select Year from table(%s(type=>'text', fieldDelimiter=>',', schema=>'inline=(`Year` int)', " +
+      "extractHeader=>true))";
+
+    testBuilder()
+      .sqlQuery(sqlQuery, table)
+      .unOrdered()
+      .sqlBaselineQuery(sqlBaselineQuery, table)
+      .go();
+
+    String plan = queryBuilder().sql(sqlQuery, table).explainText();
+    assertFalse(plan.contains("schema=null"));
+  }
+
+  @Test
+  public void testSchemaInlineForFolder() throws Exception {
+    run("use dfs.tmp");
+
+    String table = "text_table";
+    String sourceTable = TABLE_NAME.replace(TABLE_PLACEHOLDER, "regions.csv");
+    try {
+      client.alterSession(ExecConstants.OUTPUT_FORMAT_OPTION, "csv");
+      run("create table %s as select columns[0] as id, columns[1] as name from %s", table, sourceTable);
+
+      String query = "select * from table(%s(type=>'text', fieldDelimiter=>',', extractHeader=>true " +
+        ",schema=>'inline=(`id` int)')) where id = 1";
+
+      testBuilder()
+        .sqlQuery(query, table)
+        .unOrdered()
+        .baselineColumns("id", "name")
+        .baselineValues(1, "AMERICA")
+        .go();
+
+      String plan = queryBuilder().sql(query, table).explainText();
+      assertTrue(plan.contains("schema=[TupleSchema [PrimitiveColumnMetadata [`id` (INT(0, 0):OPTIONAL)]]]"));
+    } finally {
+      client.resetSession(ExecConstants.OUTPUT_FORMAT_OPTION);
+      run("drop table if exists %s", table);
+    }
+  }
+
+  @Test
+  public void testInvalidSchemaParameter() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage("VALIDATION ERROR");
+
+    run("select Year from table(%s(schema=>'(`Year` int)'))", table);
+  }
+
+  @Test
+  public void testInvalidSchemaProviderType() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage("VALIDATION ERROR");
+
+    run("select Year from table(%s(schema=>'line=(`Year` int)'))", table);
+  }
+
+  @Test
+  public void testSchemaInlineInvalidSchemaSyntax() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage("VALIDATION ERROR");
+
+    run("select Year from table(%s(schema=>'inline=(int)')) where Make = 'Ford'", table);
+  }
+
+  @Test
+  public void testSchemaPath() throws Exception {
+    File tmpDir = dirTestWatcher.getTmpDir();
+    File schemaFile = new File(tmpDir, "schema_for_path.schema");
+    assertFalse(schemaFile.exists());
+
+    try {
+      String path = schemaFile.getPath();
+      testBuilder()
+        .sqlQuery("create schema (`Year` int) path '%s'", path)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Created schema for [%s]", path))
+        .go();
+
+      String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+
+      List<String> queries = Arrays.asList(
+        "select Year from table(%s(schema=>'path=%s')) where Make = 'Ford'",
+        "select Year from table(%s(schema=>'path=`%s`')) where Make = 'Ford'");
+
+      for (String query : queries) {
+        testBuilder()
+          .sqlQuery(query, table, path)
+          .unOrdered()
+          .baselineColumns("Year")
+          .baselineValues(1997)
+          .go();
+
+        String plan = queryBuilder().sql(query, table, path).explainText();
+        assertFalse(plan.contains("schema=null"));
+      }
+
+    } finally {
+      if (schemaFile.exists()) {
+        assertTrue(schemaFile.delete());
+      }
+    }
+  }
+
+  @Test
+  public void testSchemaPathInvalid() throws Exception {
+    String table = TABLE_NAME.replace(TABLE_PLACEHOLDER, "cars.csvh");
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage("VALIDATION ERROR");
+
+    run("select Year from table(%s(schema=>'path=(int)')) where Make = 'Ford'", table);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
index ec23194..7bc65fc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestSelectWithOption.java
@@ -71,18 +71,17 @@ public class TestSelectWithOption extends BaseTestQuery {
         "\"b\"|\"1\"",
         "\"b\"|\"2\"");
 
-    String queryTemplate =
-        "select columns from table(%s (type => 'TeXT', fieldDelimiter => '%s'))";
+    String queryTemplate = "select columns from table(%s (type => 'TeXT', fieldDelimiter => '%s'))";
+
     testWithResult(format(queryTemplate, tableName, ","),
         listOf("b\"|\"0"),
         listOf("b\"|\"1"),
-        listOf("b\"|\"2")
-      );
+        listOf("b\"|\"2"));
+
     testWithResult(format(queryTemplate, tableName, "|"),
         listOf("b", "0"),
         listOf("b", "1"),
-        listOf("b", "2")
-      );
+        listOf("b", "2"));
   }
 
   @Test
@@ -163,8 +162,7 @@ public class TestSelectWithOption extends BaseTestQuery {
     testWithResult(format("select columns from table(%s(type => 'TeXT', fieldDelimiter => '|', quote => '@'))", tableName),
         listOf("\"b\"", "\"0\""),
         listOf("\"b\"", "\"1\""),
-        listOf("\"b\"", "\"2\"")
-        );
+        listOf("\"b\"", "\"2\""));
 
     String quoteTableName = genCSVTable("testTextQuote2",
         "@b@|@0@",
@@ -172,8 +170,7 @@ public class TestSelectWithOption extends BaseTestQuery {
     // It seems that a parameter can not be called "escape"
     testWithResult(format("select columns from table(%s(`escape` => '$', type => 'TeXT', fieldDelimiter => '|', quote => '@'))", quoteTableName),
         listOf("b", "0"),
-        listOf("b$@c", "1") // shouldn't $ be removed here?
-        );
+        listOf("b$@c", "1")); // shouldn't $ be removed here?
   }
 
   @Test
@@ -184,8 +181,7 @@ public class TestSelectWithOption extends BaseTestQuery {
           "b|1");
       testWithResult(format("select columns from table(%s(type => 'TeXT', fieldDelimiter => '|', comment => '@'))", commentTableName),
           listOf("b", "0"),
-          listOf("b", "1")
-          );
+          listOf("b", "1"));
   }
 
   @Test
@@ -196,8 +192,7 @@ public class TestSelectWithOption extends BaseTestQuery {
         "b|1");
     testWithResult(format("select columns from table(%s(type => 'TeXT', fieldDelimiter => '|', skipFirstLine => true))", headerTableName),
         listOf("b", "0"),
-        listOf("b", "1")
-        );
+        listOf("b", "1"));
 
     testBuilder()
         .sqlQuery(format("select a, b from table(%s(type => 'TeXT', fieldDelimiter => '|', extractHeader => true))", headerTableName))
@@ -205,7 +200,7 @@ public class TestSelectWithOption extends BaseTestQuery {
         .baselineColumns("b", "a")
         .baselineValues("b", "0")
         .baselineValues("b", "1")
-        .build().run();
+        .go();
   }
 
   @Test
@@ -214,18 +209,9 @@ public class TestSelectWithOption extends BaseTestQuery {
         "a,b",
         "c|d");
     // Using the defaults in TextFormatConfig (the field delimiter is neither "," not "|")
-    String[] csvQueries = {
-//        format("select columns from %s ('TeXT')", csvTableName),
-//        format("select columns from %s('TeXT')", csvTableName),
-        format("select columns from table(%s ('TeXT'))", csvTableName),
-        format("select columns from table(%s (type => 'TeXT'))", csvTableName),
-//        format("select columns from %s (type => 'TeXT')", csvTableName)
-    };
-    for (String csvQuery : csvQueries) {
-      testWithResult(csvQuery,
-          listOf("a,b"),
-          listOf("c|d"));
-    }
+    testWithResult(format("select columns from table(%s (type => 'TeXT'))", csvTableName),
+      listOf("a,b"),
+      listOf("c|d"));
     // the drill config file binds .csv to "," delimited
     testWithResult(format("select columns from %s", csvTableName),
           listOf("a", "b"),
@@ -248,7 +234,6 @@ public class TestSelectWithOption extends BaseTestQuery {
         listOf("{\"columns\": [\"f\"", "g\"]}\n")
         );
     String[] jsonQueries = {
-        format("select columns from table(%s ('JSON'))", jsonTableName),
         format("select columns from table(%s(type => 'JSON'))", jsonTableName),
         // we can use named format plugin configurations too!
         format("select columns from table(%s(type => 'Named', name => 'json'))", jsonTableName),
@@ -266,15 +251,8 @@ public class TestSelectWithOption extends BaseTestQuery {
     // the extension is actually csv
     test("use dfs");
     try {
-      String[] jsonQueries = {
-          format("select columns from table(%s ('JSON'))", jsonTableName),
-          format("select columns from table(%s(type => 'JSON'))", jsonTableName),
-      };
-      for (String jsonQuery : jsonQueries) {
-        testWithResult(jsonQuery, listOf("f","g"));
-      }
-
-      testWithResult(format("select length(columns[0]) as columns from table(%s ('JSON'))", jsonTableName), 1L);
+      testWithResult(format("select columns from table(%s(type => 'JSON'))", jsonTableName), listOf("f","g"));
+      testWithResult(format("select length(columns[0]) as columns from table(%s (type => 'JSON'))", jsonTableName), 1L);
     } finally {
       test("use sys");
     }
@@ -287,7 +265,7 @@ public class TestSelectWithOption extends BaseTestQuery {
     try {
       test("select * from table(`%s`.`%s`(type=>'parquet'))", schema, tableName);
     } catch (UserRemoteException e) {
-      assertThat(e.getMessage(), containsString(String.format("Unable to find table [%s] in schema [%s]", tableName, schema)));
+      assertThat(e.getMessage(), containsString(String.format("Unable to find table [%s]", tableName)));
       throw e;
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
index 5622e76..ec5d48f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/TestSchemaProvider.java
@@ -52,13 +52,13 @@ public class TestSchemaProvider {
 
   @Test
   public void testInlineProviderExists() throws Exception {
-    SchemaProvider provider = new InlineSchemaProvider("(i int)", null);
+    SchemaProvider provider = new InlineSchemaProvider("(i int)");
     assertTrue(provider.exists());
   }
 
   @Test
   public void testInlineProviderDelete() throws Exception {
-    SchemaProvider provider = new InlineSchemaProvider("(i int)", null);
+    SchemaProvider provider = new InlineSchemaProvider("(i int)");
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage("Schema deletion is not supported");
     provider.delete();
@@ -66,7 +66,7 @@ public class TestSchemaProvider {
 
   @Test
   public void testInlineProviderStore() throws Exception {
-    SchemaProvider provider = new InlineSchemaProvider("(i int)", null);
+    SchemaProvider provider = new InlineSchemaProvider("(i int)");
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage("Schema storage is not supported");
     provider.store("i int", null, StorageStrategy.DEFAULT);
@@ -74,9 +74,7 @@ public class TestSchemaProvider {
 
   @Test
   public void testInlineProviderRead() throws Exception {
-    Map<String, String> properties = new LinkedHashMap<>();
-    properties.put("k1", "v1");
-    SchemaProvider provider = new InlineSchemaProvider("(i int)", properties);
+    SchemaProvider provider = new InlineSchemaProvider("(i int) properties { 'k1' = 'v1' }");
 
     SchemaContainer schemaContainer = provider.read();
     assertNotNull(schemaContainer);
@@ -87,6 +85,8 @@ public class TestSchemaProvider {
     assertEquals(1, metadata.size());
     assertEquals(TypeProtos.MinorType.INT, metadata.metadata("i").type());
 
+    Map<String, String> properties = new LinkedHashMap<>();
+    properties.put("k1", "v1");
     assertEquals(properties, metadata.properties());
 
     SchemaContainer.Version version = schemaContainer.getVersion();
@@ -95,6 +95,16 @@ public class TestSchemaProvider {
   }
 
   @Test
+  public void testInlineProviderWithoutColumns() throws Exception {
+    SchemaProvider provider = new InlineSchemaProvider("() properties { 'k1' = 'v1' }");
+    SchemaContainer schemaContainer = provider.read();
+    assertNotNull(schemaContainer);
+    TupleMetadata metadata = schemaContainer.getSchema();
+    assertTrue(metadata.isEmpty());
+    assertEquals("v1", metadata.property("k1"));
+  }
+
+  @Test
   public void testPathProviderExists() throws Exception {
     File schema = new File(folder.getRoot(), "schema");
     SchemaProvider provider = new PathSchemaProvider(new org.apache.hadoop.fs.Path(schema.getPath()));
@@ -288,4 +298,25 @@ public class TestSchemaProvider {
     assertTrue(provider.exists());
     assertNotNull(provider.read());
   }
+
+  @Test
+  public void testPathProviderWithoutColumns() throws Exception {
+    Path schemaPath = folder.newFile("schema").toPath();
+    String schema = "{\n"
+      + "  \"table\" : \"tbl\",\n"
+      + "  \"schema\" : {\n"
+      + "    \"properties\" : {\n"
+      + "      \"prop\" : \"val\"\n"
+      + "    }\n"
+      + "  }\n"
+      + "}";
+    Files.write(schemaPath, Collections.singletonList(schema));
+    SchemaProvider provider = new PathSchemaProvider(new org.apache.hadoop.fs.Path(schemaPath.toUri().getPath()));
+    SchemaContainer schemaContainer = provider.read();
+    assertNotNull(schemaContainer);
+    TupleMetadata tableMetadata = schemaContainer.getSchema();
+    assertTrue(tableMetadata.isEmpty());
+    assertEquals("val", tableMetadata.property("prop"));
+  }
+
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java
index 320e146..6bd730b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestParserErrorHandling.java
@@ -21,131 +21,133 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.IOException;
+
 public class TestParserErrorHandling {
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
-  public void testUnsupportedType() {
+  public void testUnsupportedType() throws Exception {
     String schema = "col unk_type";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testVarcharWithScale() {
+  public void testVarcharWithScale() throws Exception {
     String schema = "col varchar(1, 2)";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("missing ')' at ','");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testUnquotedKeyword() {
+  public void testUnquotedKeyword() throws Exception {
     String schema = "int varchar";
-    thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("mismatched input 'int' expecting {'(', ID, QUOTED_ID}");
+    thrown.expect(IOException.class);
+    thrown.expectMessage("mismatched input 'int'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testUnquotedId() {
+  public void testUnquotedId() throws Exception {
     String schema = "id with space varchar";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testUnescapedBackTick() {
+  public void testUnescapedBackTick() throws Exception {
     String schema = "`c`o`l` varchar";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testUnescapedBackSlash() {
+  public void testUnescapedBackSlash() throws Exception {
     String schema = "`c\\o\\l` varchar";
-    thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("extraneous input '`' expecting {'(', ID, QUOTED_ID}");
+    thrown.expect(IOException.class);
+    thrown.expectMessage("extraneous input '`'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testMissingType() {
+  public void testMissingType() throws Exception {
     String schema = "col not null";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testIncorrectEOF() {
+  public void testIncorrectEOF() throws Exception {
     String schema = "col int not null footer";
-    thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("extraneous input 'footer' expecting <EOF>");
+    thrown.expect(IOException.class);
+    thrown.expectMessage("extraneous input 'footer'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testSchemaWithOneParen() {
+  public void testSchemaWithOneParen() throws Exception {
     String schema = "(col int not null";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("missing ')' at '<EOF>'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testMissingAngleBracket() {
+  public void testMissingAngleBracket() throws Exception {
     String schema = "col array<int not null";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("missing '>' at 'not'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testUnclosedAngleBracket() {
+  public void testUnclosedAngleBracket() throws Exception {
     String schema = "col struct<m array<int> not null";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("missing '>' at '<EOF>'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testMissingColumnNameForStruct() {
+  public void testMissingColumnNameForStruct() throws Exception {
     String schema = "col struct<int> not null";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("mismatched input 'int' expecting {ID, QUOTED_ID}");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testMissingNotBeforeNull() {
+  public void testMissingNotBeforeNull() throws Exception {
     String schema = "col int null";
-    thrown.expect(SchemaParsingException.class);
-    thrown.expectMessage("extraneous input 'null' expecting <EOF>");
+    thrown.expect(IOException.class);
+    thrown.expectMessage("extraneous input 'null'");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testExtraComma() {
+  public void testExtraComma() throws Exception {
     String schema = "id int,, name varchar";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("extraneous input ',' expecting {ID, QUOTED_ID}");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void testExtraCommaEOF() {
+  public void testExtraCommaEOF() throws Exception {
     String schema = "id int, name varchar,";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("mismatched input '<EOF>' expecting {ID, QUOTED_ID}");
     SchemaExprParser.parseSchema(schema);
   }
 
   @Test
-  public void incorrectNumber() {
+  public void incorrectNumber() throws Exception {
     String schema = "id decimal(5, 02)";
-    thrown.expect(SchemaParsingException.class);
+    thrown.expect(IOException.class);
     thrown.expectMessage("extraneous input '2' expecting ')'");
     SchemaExprParser.parseSchema(schema);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
index 01b5c9c..2d0d1be 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/metadata/schema/parser/TestSchemaParser.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.joda.time.LocalDate;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -37,7 +38,7 @@ import static org.junit.Assert.assertTrue;
 public class TestSchemaParser {
 
   @Test
-  public void checkQuotedIdWithEscapes() {
+  public void checkQuotedIdWithEscapes() throws Exception {
     String schemaWithEscapes = "`a\\\\b\\`c` INT";
     assertEquals(schemaWithEscapes, SchemaExprParser.parseSchema(schemaWithEscapes).metadata(0).columnString());
 
@@ -46,7 +47,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testSchemaWithParen() {
+  public void testSchemaWithParen() throws Exception {
     String schemaWithParen = "(`a` INT NOT NULL, `b` VARCHAR(10))";
     TupleMetadata schema = SchemaExprParser.parseSchema(schemaWithParen);
     assertEquals(2, schema.size());
@@ -55,7 +56,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testSkip() {
+  public void testSkip() throws Exception {
     String schemaString = "id\n/*comment*/int\r,//comment\r\nname\nvarchar\t\t\t";
     TupleMetadata schema = SchemaExprParser.parseSchema(schemaString);
     assertEquals(2, schema.size());
@@ -64,19 +65,19 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testCaseInsensitivity() {
+  public void testCaseInsensitivity() throws Exception {
     String schema = "`Id` InTeGeR NoT NuLl";
     assertEquals("`Id` INT NOT NULL", SchemaExprParser.parseSchema(schema).metadata(0).columnString());
   }
 
   @Test
-  public void testParseColumn() {
+  public void testParseColumn() throws Exception {
     ColumnMetadata column = SchemaExprParser.parseColumn("col int not null");
     assertEquals("`col` INT NOT NULL", column.columnString());
   }
 
   @Test
-  public void testNumericTypes() {
+  public void testNumericTypes() throws Exception {
     TupleMetadata schema = new SchemaBuilder()
       .addNullable("int_col", TypeProtos.MinorType.INT)
       .add("integer_col", TypeProtos.MinorType.INT)
@@ -104,12 +105,18 @@ public class TestSchemaParser {
     );
 
     schemas.forEach(
-      s -> checkSchema(s, schema)
+      s -> {
+        try {
+          checkSchema(s, schema);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
     );
   }
 
   @Test
-  public void testBooleanType() {
+  public void testBooleanType() throws Exception {
     TupleMetadata schema = new SchemaBuilder()
       .addNullable("col", TypeProtos.MinorType.BIT)
       .buildSchema();
@@ -136,12 +143,16 @@ public class TestSchemaParser {
         .add("col_p", value, 50)
         .buildSchema();
 
-      checkSchema(String.format(schemaPattern, key), schema);
+      try {
+        checkSchema(String.format(schemaPattern, key), schema);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     });
   }
 
   @Test
-  public void testTimeTypes() {
+  public void testTimeTypes() throws Exception {
     TupleMetadata schema = new SchemaBuilder()
       .addNullable("time_col", TypeProtos.MinorType.TIME)
       .addNullable("time_prec_col", TypeProtos.MinorType.TIME, 3)
@@ -155,7 +166,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testInterval() {
+  public void testInterval() throws Exception {
     TupleMetadata schema = new SchemaBuilder()
       .addNullable("interval_year_col", TypeProtos.MinorType.INTERVALYEAR)
       .addNullable("interval_month_col", TypeProtos.MinorType.INTERVALYEAR)
@@ -172,7 +183,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testArray() {
+  public void testArray() throws Exception {
     TupleMetadata schema = new SchemaBuilder()
       .addArray("simple_array", TypeProtos.MinorType.INT)
       .addRepeatedList("nested_array")
@@ -199,7 +210,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testStruct() {
+  public void testStruct() throws Exception {
     TupleMetadata schema = new SchemaBuilder()
       .addMap("struct_col")
         .addNullable("int_col", TypeProtos.MinorType.INT)
@@ -215,14 +226,14 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testModeForSimpleType() {
+  public void testModeForSimpleType() throws Exception {
     TupleMetadata schema = SchemaExprParser.parseSchema("id int not null, name varchar");
     assertFalse(schema.metadata("id").isNullable());
     assertTrue(schema.metadata("name").isNullable());
   }
 
   @Test
-  public void testModeForStructType() {
+  public void testModeForStructType() throws Exception {
     TupleMetadata schema  = SchemaExprParser.parseSchema("m struct<m1 int not null, m2 varchar>");
     ColumnMetadata map = schema.metadata("m");
     assertTrue(map.isMap());
@@ -234,7 +245,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testModeForRepeatedType() {
+  public void testModeForRepeatedType() throws Exception {
     TupleMetadata schema = SchemaExprParser.parseSchema(
       "a array<int>, aa array<array<int>>, ma array<struct<m1 int not null, m2 varchar>>");
 
@@ -253,7 +264,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testFormat() {
+  public void testFormat() throws Exception {
     String value = "`a` DATE NOT NULL FORMAT 'yyyy-MM-dd'";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
     ColumnMetadata columnMetadata = schema.metadata("a");
@@ -262,7 +273,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testDefault() {
+  public void testDefault() throws Exception {
     String value = "`a` INT NOT NULL DEFAULT '12'";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
     ColumnMetadata columnMetadata = schema.metadata("a");
@@ -273,7 +284,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testFormatAndDefault() {
+  public void testFormatAndDefault() throws Exception {
     String value = "`a` DATE NOT NULL FORMAT 'yyyy-MM-dd' DEFAULT '2018-12-31'";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
     ColumnMetadata columnMetadata = schema.metadata("a");
@@ -284,7 +295,7 @@ public class TestSchemaParser {
   }
 
   @Test
-  public void testColumnProperties() {
+  public void testColumnProperties() throws Exception {
     String value = "`a` INT NOT NULL PROPERTIES { 'k1' = 'v1', 'k2' = 'v2' }";
     TupleMetadata schema = SchemaExprParser.parseSchema(value);
 
@@ -298,7 +309,36 @@ public class TestSchemaParser {
     assertEquals(value, columnMetadata.columnString());
   }
 
-  private void checkSchema(String schemaString, TupleMetadata expectedSchema) {
+  @Test
+  public void testEmptySchema() throws Exception {
+    String value = "()";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+    assertEquals(0, schema.size());
+  }
+
+  @Test
+  public void testEmptySchemaWithProperties() throws Exception {
+    String value = "() properties { `drill.strict` = `false` }";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+    assertTrue(schema.isEmpty());
+    assertEquals("false", schema.property("drill.strict"));
+  }
+
+  @Test
+  public void testSchemaWithProperties() throws Exception {
+    String value = "(col int properties { `drill.blank-as` = `0` } ) " +
+      "properties { `drill.strict` = `false`, `drill.my-prop` = `abc` }";
+    TupleMetadata schema = SchemaExprParser.parseSchema(value);
+    assertEquals(1, schema.size());
+
+    ColumnMetadata column = schema.metadata("col");
+    assertEquals("0", column.property("drill.blank-as"));
+
+    assertEquals("false", schema.property("drill.strict"));
+    assertEquals("abc", schema.property("drill.my-prop"));
+  }
+
+  private void checkSchema(String schemaString, TupleMetadata expectedSchema) throws IOException {
     TupleMetadata actualSchema = SchemaExprParser.parseSchema(schemaString);
 
     assertEquals(expectedSchema.size(), actualSchema.size());
@@ -310,7 +350,6 @@ public class TestSchemaParser {
         assertEquals(expectedMetadata.columnString(), actualMetadata.columnString());
       }
     );
-
   }
 
 }


Mime
View raw message