drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [02/11] drill git commit: DRILL-5337: OpenTSDB storage plugin
Date Mon, 13 Nov 2017 12:07:16 GMT
DRILL-5337: OpenTSDB storage plugin

closes #774


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/496c97d1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/496c97d1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/496c97d1

Branch: refs/heads/master
Commit: 496c97d14eb428a5aff74e82d662a0da6930e94f
Parents: 29e0547
Author: Vlad Storona <vstorona@cybervisiontech.com>
Authored: Fri Nov 25 20:28:02 2016 +0200
Committer: Arina Ielchiieva <arina.yelchiyeva@gmail.com>
Committed: Mon Nov 13 11:04:54 2017 +0200

----------------------------------------------------------------------
 contrib/pom.xml                                 |   1 +
 contrib/storage-opentsdb/README.md              |  69 +++++
 contrib/storage-opentsdb/pom.xml                |  80 ++++++
 .../drill/exec/store/openTSDB/Constants.java    |  32 +++
 .../exec/store/openTSDB/DrillOpenTSDBTable.java |  81 ++++++
 .../store/openTSDB/OpenTSDBBatchCreator.java    |  53 ++++
 .../exec/store/openTSDB/OpenTSDBGroupScan.java  | 169 ++++++++++++
 .../store/openTSDB/OpenTSDBRecordReader.java    | 258 +++++++++++++++++++
 .../exec/store/openTSDB/OpenTSDBScanSpec.java   |  42 +++
 .../store/openTSDB/OpenTSDBStoragePlugin.java   |  77 ++++++
 .../openTSDB/OpenTSDBStoragePluginConfig.java   |  77 ++++++
 .../exec/store/openTSDB/OpenTSDBSubScan.java    | 132 ++++++++++
 .../apache/drill/exec/store/openTSDB/Util.java  |  66 +++++
 .../exec/store/openTSDB/client/OpenTSDB.java    |  50 ++++
 .../store/openTSDB/client/OpenTSDBTypes.java    |  28 ++
 .../exec/store/openTSDB/client/Schema.java      | 124 +++++++++
 .../exec/store/openTSDB/client/Service.java     |  55 ++++
 .../store/openTSDB/client/query/DBQuery.java    | 148 +++++++++++
 .../exec/store/openTSDB/client/query/Query.java | 187 ++++++++++++++
 .../openTSDB/client/services/ServiceImpl.java   | 174 +++++++++++++
 .../exec/store/openTSDB/dto/ColumnDTO.java      |  63 +++++
 .../exec/store/openTSDB/dto/MetricDTO.java      |  77 ++++++
 .../openTSDB/schema/OpenTSDBSchemaFactory.java  |  77 ++++++
 .../resources/bootstrap-storage-plugins.json    |   9 +
 .../src/main/resources/drill-module.conf        |  21 ++
 .../drill/store/openTSDB/TestDataHolder.java    | 247 ++++++++++++++++++
 .../store/openTSDB/TestOpenTSDBPlugin.java      | 189 ++++++++++++++
 distribution/pom.xml                            |   5 +
 28 files changed, 2591 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 2014923..d4ad434 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -38,6 +38,7 @@
     <module>storage-mongo</module>
     <module>storage-jdbc</module>
     <module>storage-kudu</module>
+    <module>storage-opentsdb</module>
     <module>sqlline</module>
     <module>data</module>
     <module>gis</module>

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/README.md
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/README.md b/contrib/storage-opentsdb/README.md
new file mode 100644
index 0000000..0c616b5
--- /dev/null
+++ b/contrib/storage-opentsdb/README.md
@@ -0,0 +1,69 @@
+# drill-storage-openTSDB
+
+Implementation of TSDB storage plugin. Plugin uses REST API to work with TSDB. 
+
+For more information about openTSDB follow this link <http://opentsdb.net>
+
+There is list of required params:
+
+* metric     - The name of a metric stored in the db.
+
+* start      - The start time for the query. This can be a relative or absolute timestamp.
+
+* aggregator - The name of an aggregation function to use.
+
+optional param is: 
+
+* downsample - An optional downsampling function to reduce the amount of data returned.
+
+* end - An end time for the query. If not supplied, the TSD will assume the local system time on the server. 
+This may be a relative or absolute timestamp. This param is optional, and if it isn't specified we will send null
+to the db in this field, but in this case db will assume the local system time on the server.
+
+List of supported aggregators
+
+<http://opentsdb.net/docs/build/html/user_guide/query/aggregators.html>
+
+List of supported time 
+
+<http://opentsdb.net/docs/build/html/user_guide/query/dates.html>
+
+Params must be specified in FROM clause of the query separated by commas. For example
+
+`openTSDB.(metric=metric_name, start=4d-ago, aggregator=sum)`
+
+Supported queries for now are listed below:
+
+```
+USE openTSDB
+```
+
+```
+SHOW tables
+```
+Will print available metrics. Max number of the printed results is a Integer.MAX value
+
+```
+SELECT * FROM openTSDB. `(metric=warp.speed.test, start=47y-ago, aggregator=sum)` 
+```
+Return aggregated elements from `warp.speed.test` table since 47y-ago 
+
+```
+SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)`
+```
+Return aggregated elements from `warp.speed.test` table
+
+```
+SELECT `timestamp`, sum(`aggregated value`) FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago)` GROUP BY `timestamp`
+```
+Return aggregated and grouped value by standard drill functions from `warp.speed.test table`, but with the custom aggregator
+
+```
+SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, downsample=5m-avg)`
+```
+Return aggregated data limited by downsample
+
+```
+SELECT * FROM openTSDB.`(metric=warp.speed.test, aggregator=avg, start=47y-ago, end=1407165403000)`
+```
+Return aggregated data limited by end time
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/pom.xml b/contrib/storage-opentsdb/pom.xml
new file mode 100644
index 0000000..aff1bfa
--- /dev/null
+++ b/contrib/storage-opentsdb/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>drill-contrib-parent</artifactId>
+        <groupId>org.apache.drill.contrib</groupId>
+        <version>1.12.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>drill-opentsdb-storage</artifactId>
+
+    <name>contrib/opentsdb-storage-plugin</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.drill.exec</groupId>
+            <artifactId>drill-java-exec</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.drill.exec</groupId>
+            <artifactId>drill-java-exec</artifactId>
+            <classifier>tests</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.drill</groupId>
+            <artifactId>drill-common</artifactId>
+            <classifier>tests</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.tomakehurst</groupId>
+            <artifactId>wiremock-standalone</artifactId>
+            <version>2.5.1</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.retrofit2</groupId>
+            <artifactId>retrofit</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.retrofit2</groupId>
+            <artifactId>converter-jackson</artifactId>
+            <version>2.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.madhukaraphatak</groupId>
+            <artifactId>java-sizeof_2.11</artifactId>
+            <version>0.1</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
new file mode 100644
index 0000000..c812ff5
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Constants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+public interface Constants {
+  /**
+   * openTSDB required constants for API call
+   */
+  public static final String DEFAULT_TIME = "47y-ago";
+  public static final String SUM_AGGREGATOR = "sum";
+
+  public static final String TIME_PARAM = "start";
+  public static final String END_TIME_PARAM = "end";
+  public static final String METRIC_PARAM = "metric";
+  public static final String AGGREGATOR_PARAM = "aggregator";
+  public static final String DOWNSAMPLE_PARAM = "downsample";
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java
new file mode 100644
index 0000000..bdbb670
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/DrillOpenTSDBTable.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+import org.apache.drill.exec.store.openTSDB.client.Schema;
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.DOUBLE;
+import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.STRING;
+import static org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes.TIMESTAMP;
+
+public class DrillOpenTSDBTable extends DynamicDrillTable {
+
+  private static final Logger log =
+          LoggerFactory.getLogger(DrillOpenTSDBTable.class);
+
+  private final Schema schema;
+
+  public DrillOpenTSDBTable(String storageEngineName, OpenTSDBStoragePlugin plugin, Schema schema, OpenTSDBScanSpec scanSpec) {
+    super(plugin, storageEngineName, scanSpec);
+    this.schema = schema;
+  }
+
+  @Override
+  public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+    List<String> names = Lists.newArrayList();
+    List<RelDataType> types = Lists.newArrayList();
+    convertToRelDataType(typeFactory, names, types);
+    return typeFactory.createStructType(types, names);
+  }
+
+  private void convertToRelDataType(RelDataTypeFactory typeFactory, List<String> names, List<RelDataType> types) {
+    for (ColumnDTO column : schema.getColumns()) {
+      names.add(column.getColumnName());
+      RelDataType type = getSqlTypeFromOpenTSDBType(typeFactory, column.getColumnType());
+      type = typeFactory.createTypeWithNullability(type, column.isNullable());
+      types.add(type);
+    }
+  }
+
+  private RelDataType getSqlTypeFromOpenTSDBType(RelDataTypeFactory typeFactory, OpenTSDBTypes type) {
+    switch (type) {
+      case STRING:
+        return typeFactory.createSqlType(SqlTypeName.VARCHAR, Integer.MAX_VALUE);
+      case DOUBLE:
+        return typeFactory.createSqlType(SqlTypeName.DOUBLE);
+      case TIMESTAMP:
+        return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+      default:
+        throw UserException.unsupportedError()
+                .message(String.format("%s is unsupported now. Currently supported types is %s, %s, %s", type, STRING, DOUBLE, TIMESTAMP))
+                .build(log);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
new file mode 100644
index 0000000..935aaa5
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBBatchCreator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import java.util.List;
+
+public class OpenTSDBBatchCreator implements BatchCreator<OpenTSDBSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, OpenTSDBSubScan subScan,
+                                       List<RecordBatch> children) throws ExecutionSetupException {
+    List<RecordReader> readers = Lists.newArrayList();
+    List<SchemaPath> columns;
+
+    for (OpenTSDBSubScan.OpenTSDBSubScanSpec scanSpec : subScan.getTabletScanSpecList()) {
+      try {
+        if ((columns = subScan.getColumns()) == null) {
+          columns = GroupScan.ALL_COLUMNS;
+        }
+        readers.add(new OpenTSDBRecordReader(subScan.getStorageEngine().getClient(), scanSpec, columns));
+      } catch (Exception e) {
+        throw new ExecutionSetupException(e);
+      }
+    }
+    return new ScanBatch(subScan, context, readers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
new file mode 100644
index 0000000..47c805a
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBGroupScan.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.madhukaraphatak.sizeof.SizeEstimator;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.openTSDB.OpenTSDBSubScan.OpenTSDBSubScanSpec;
+import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
+
+@JsonTypeName("openTSDB-scan")
+public class OpenTSDBGroupScan extends AbstractGroupScan {
+
+  private OpenTSDBStoragePluginConfig storagePluginConfig;
+  private OpenTSDBScanSpec openTSDBScanSpec;
+  private OpenTSDBStoragePlugin storagePlugin;
+
+  private List<SchemaPath> columns;
+
+  @JsonCreator
+  public OpenTSDBGroupScan(@JsonProperty("openTSDBScanSpec") OpenTSDBScanSpec openTSDBScanSpec,
+                           @JsonProperty("storage") OpenTSDBStoragePluginConfig openTSDBStoragePluginConfig,
+                           @JsonProperty("columns") List<SchemaPath> columns,
+                           @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
+    this((OpenTSDBStoragePlugin) pluginRegistry.getPlugin(openTSDBStoragePluginConfig), openTSDBScanSpec, columns);
+  }
+
+  public OpenTSDBGroupScan(OpenTSDBStoragePlugin storagePlugin,
+                           OpenTSDBScanSpec scanSpec, List<SchemaPath> columns) {
+    super((String) null);
+    this.storagePlugin = storagePlugin;
+    this.storagePluginConfig = storagePlugin.getConfig();
+    this.openTSDBScanSpec = scanSpec;
+    this.columns = columns == null || columns.size() == 0 ? ALL_COLUMNS : columns;
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   *
+   * @param that The OpenTSDBGroupScan to clone
+   */
+  private OpenTSDBGroupScan(OpenTSDBGroupScan that) {
+    super((String) null);
+    this.columns = that.columns;
+    this.openTSDBScanSpec = that.openTSDBScanSpec;
+    this.storagePlugin = that.storagePlugin;
+    this.storagePluginConfig = that.storagePluginConfig;
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+  }
+
+  @Override
+  public OpenTSDBSubScan getSpecificScan(int minorFragmentId) {
+    List<OpenTSDBSubScanSpec> scanSpecList = Lists.newArrayList();
+    scanSpecList.add(new OpenTSDBSubScanSpec(getTableName()));
+    return new OpenTSDBSubScan(storagePlugin, storagePluginConfig, scanSpecList, this.columns);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    ServiceImpl client = storagePlugin.getClient();
+    Map<String, String> params = fromRowData(openTSDBScanSpec.getTableName());
+    Set<MetricDTO> allMetrics = client.getAllMetrics(params);
+    long numMetrics = allMetrics.size();
+    float approxDiskCost = 0;
+    if (numMetrics != 0) {
+      MetricDTO metricDTO  = allMetrics.iterator().next();
+      // This method estimates the sizes of Java objects (number of bytes of memory they occupy).
+      // more detailed information about how this estimation method work you can find in this article
+      // http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
+      approxDiskCost = SizeEstimator.estimate(metricDTO) * numMetrics;
+    }
+    return new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, numMetrics, 1, approxDiskCost);
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new OpenTSDBGroupScan(this);
+  }
+
+  @Override
+  public String getDigest() {
+    return toString();
+  }
+
+  @Override
+  @JsonIgnore
+  public boolean canPushdownProjects(List<SchemaPath> columns) {
+    return true;
+  }
+
+  @JsonIgnore
+  public String getTableName() {
+    return getOpenTSDBScanSpec().getTableName();
+  }
+
+  @JsonProperty
+  public OpenTSDBScanSpec getOpenTSDBScanSpec() {
+    return openTSDBScanSpec;
+  }
+
+  @JsonProperty("storage")
+  public OpenTSDBStoragePluginConfig getStoragePluginConfig() {
+    return storagePluginConfig;
+  }
+
+  @JsonProperty
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    OpenTSDBGroupScan newScan = new OpenTSDBGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  @Override
+  public String toString() {
+    return "OpenTSDBGroupScan [OpenTSDBScanSpec=" + openTSDBScanSpec + ", columns=" + columns
+            + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java
new file mode 100644
index 0000000..044c232
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBRecordReader.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+import org.apache.drill.exec.store.openTSDB.client.Schema;
+import org.apache.drill.exec.store.openTSDB.client.Service;
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Util.fromRowData;
+
+public class OpenTSDBRecordReader extends AbstractRecordReader {
+
+  private static final Logger log = LoggerFactory.getLogger(OpenTSDBRecordReader.class);
+
+  // batch size should not exceed max allowed record count
+  private static final int TARGET_RECORD_COUNT = 4000;
+
+  private static final Map<OpenTSDBTypes, MinorType> TYPES;
+
+  private Service db;
+
+  private Iterator<MetricDTO> tableIterator;
+  private OutputMutator output;
+  private ImmutableList<ProjectedColumnInfo> projectedCols;
+
+  private Map<String, String> params;
+
+  public OpenTSDBRecordReader(Service client, OpenTSDBSubScan.OpenTSDBSubScanSpec subScanSpec,
+                       List<SchemaPath> projectedColumns) throws IOException {
+    setColumns(projectedColumns);
+    this.db = client;
+    this.params =
+            fromRowData(subScanSpec.getTableName());
+    log.debug("Scan spec: {}", subScanSpec);
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.output = output;
+    Set<MetricDTO> metrics =
+            db.getAllMetrics(params);
+    if (metrics == null) {
+      throw UserException.validationError()
+              .message(String.format("Table '%s' not found", params.get(METRIC_PARAM)))
+              .build(log);
+    }
+    this.tableIterator = metrics.iterator();
+  }
+
+  @Override
+  public int next() {
+    try {
+      return processOpenTSDBTablesData();
+    } catch (SchemaChangeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+  }
+
+  static {
+    TYPES = ImmutableMap.<OpenTSDBTypes, MinorType>builder()
+        .put(OpenTSDBTypes.STRING, MinorType.VARCHAR)
+        .put(OpenTSDBTypes.DOUBLE, MinorType.FLOAT8)
+        .put(OpenTSDBTypes.TIMESTAMP, MinorType.TIMESTAMP)
+        .build();
+  }
+
+  private static class ProjectedColumnInfo {
+    ValueVector vv;
+    ColumnDTO openTSDBColumn;
+  }
+
+  private int processOpenTSDBTablesData() throws SchemaChangeException {
+    int rowCounter = 0;
+    while (tableIterator.hasNext() && rowCounter < TARGET_RECORD_COUNT) {
+      MetricDTO metricDTO = tableIterator.next();
+      rowCounter = addRowResult(metricDTO, rowCounter);
+    }
+    return rowCounter;
+  }
+
+  private int addRowResult(MetricDTO table, int rowCounter) throws SchemaChangeException {
+    setupProjectedColsIfItNull();
+    for (String time : table.getDps().keySet()) {
+      String value = table.getDps().get(time);
+      setupDataToDrillTable(table, time, value, table.getTags(), rowCounter);
+      rowCounter++;
+    }
+    return rowCounter;
+  }
+
+  private void setupProjectedColsIfItNull() throws SchemaChangeException {
+    if (projectedCols == null) {
+      initCols(new Schema(db, params.get(METRIC_PARAM)));
+    }
+  }
+
+  private void setupDataToDrillTable(MetricDTO table, String timestamp, String value, Map<String, String> tags, int rowCount) {
+    for (ProjectedColumnInfo pci : projectedCols) {
+      switch (pci.openTSDBColumn.getColumnName()) {
+        case "metric":
+          setStringColumnValue(table.getMetric(), pci, rowCount);
+          break;
+        case "aggregate tags":
+          setStringColumnValue(table.getAggregateTags().toString(), pci, rowCount);
+          break;
+        case "timestamp":
+          setTimestampColumnValue(timestamp, pci, rowCount);
+          break;
+        case "aggregated value":
+          setDoubleColumnValue(value, pci, rowCount);
+          break;
+        default:
+          setStringColumnValue(tags.get(pci.openTSDBColumn.getColumnName()), pci, rowCount);
+      }
+    }
+  }
+
+  private void setTimestampColumnValue(String timestamp, ProjectedColumnInfo pci, int rowCount) {
+    setTimestampColumnValue(timestamp != null ? Long.parseLong(timestamp) : Long.parseLong("0"), pci, rowCount);
+  }
+
+  private void setDoubleColumnValue(String value, ProjectedColumnInfo pci, int rowCount) {
+    setDoubleColumnValue(value != null ? Double.parseDouble(value) : 0.0, pci, rowCount);
+  }
+
+  private void setStringColumnValue(String data, ProjectedColumnInfo pci, int rowCount) {
+    if (data == null) {
+      data = "null";
+    }
+    ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
+    ((NullableVarCharVector.Mutator) pci.vv.getMutator())
+        .setSafe(rowCount, value, 0, value.remaining());
+  }
+
+  private void setTimestampColumnValue(Long data, ProjectedColumnInfo pci, int rowCount) {
+    ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
+        .setSafe(rowCount, data * 1000);
+  }
+
+  private void setDoubleColumnValue(Double data, ProjectedColumnInfo pci, int rowCount) {
+    ((NullableFloat8Vector.Mutator) pci.vv.getMutator())
+        .setSafe(rowCount, data);
+  }
+
+  private void initCols(Schema schema) throws SchemaChangeException {
+    ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder();
+
+    for (int i = 0; i < schema.getColumnCount(); i++) {
+
+      ColumnDTO column = schema.getColumnByIndex(i);
+      final String name = column.getColumnName();
+      final OpenTSDBTypes type = column.getColumnType();
+      TypeProtos.MinorType minorType = TYPES.get(type);
+
+      if (isMinorTypeNull(minorType)) {
+        String message = String.format(
+                "A column you queried has a data type that is not currently supported by the OpenTSDB storage plugin. "
+                        + "The column's name was %s and its OpenTSDB data type was %s. ", name, type.toString());
+        throw UserException.unsupportedError()
+                .message(message)
+                .build(log);
+      }
+
+      ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType);
+      pciBuilder.add(pci);
+    }
+    projectedCols = pciBuilder.build();
+  }
+
+  private boolean isMinorTypeNull(MinorType minorType) {
+    return minorType == null;
+  }
+
+  private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, String name, MinorType minorType) throws SchemaChangeException {
+    MajorType majorType = getMajorType(minorType);
+
+    MaterializedField field =
+        MaterializedField.create(name, majorType);
+
+    ValueVector vector =
+        getValueVector(minorType, majorType, field);
+
+    return getProjectedColumnInfo(column, vector);
+  }
+
+  private MajorType getMajorType(MinorType minorType) {
+    MajorType majorType;
+    majorType = Types.optional(minorType);
+    return majorType;
+  }
+
+  private ValueVector getValueVector(MinorType minorType, MajorType majorType, MaterializedField field) throws SchemaChangeException {
+    final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass(
+        minorType, majorType.getMode());
+    ValueVector vector = output.addField(field, clazz);
+    vector.allocateNew();
+    return vector;
+  }
+
+  private ProjectedColumnInfo getProjectedColumnInfo(ColumnDTO column, ValueVector vector) {
+    ProjectedColumnInfo pci = new ProjectedColumnInfo();
+    pci.vv = vector;
+    pci.openTSDBColumn = column;
+    return pci;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java
new file mode 100644
index 0000000..f93758d
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBScanSpec.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OpenTSDBScanSpec {
+
+  private final String tableName;
+
+  @JsonCreator
+  public OpenTSDBScanSpec(@JsonProperty("tableName") String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public String toString() {
+    return "OpenTSDBScanSpec{" +
+            "tableName='" + tableName + '\'' +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
new file mode 100644
index 0000000..176dff0
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePlugin.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.openTSDB.client.services.ServiceImpl;
+import org.apache.drill.exec.store.openTSDB.schema.OpenTSDBSchemaFactory;
+
+import java.io.IOException;
+
+public class OpenTSDBStoragePlugin extends AbstractStoragePlugin {
+
+  private final DrillbitContext context;
+
+  private final OpenTSDBStoragePluginConfig engineConfig;
+  private final OpenTSDBSchemaFactory schemaFactory;
+
+  private final ServiceImpl db;
+
+  public OpenTSDBStoragePlugin(OpenTSDBStoragePluginConfig configuration, DrillbitContext context, String name) throws IOException {
+    this.context = context;
+    this.schemaFactory = new OpenTSDBSchemaFactory(this, name);
+    this.engineConfig = configuration;
+    this.db = new ServiceImpl(configuration.getConnection());
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public OpenTSDBStoragePluginConfig getConfig() {
+    return engineConfig;
+  }
+
+  @Override
+  public OpenTSDBGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
+    OpenTSDBScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<OpenTSDBScanSpec>() {
+    });
+    return new OpenTSDBGroupScan(this, scanSpec, null);
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  public ServiceImpl getClient() {
+    return db;
+  }
+
+  DrillbitContext getContext() {
+    return this.context;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
new file mode 100644
index 0000000..1b67c1d
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBStoragePluginConfig.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+@JsonTypeName(OpenTSDBStoragePluginConfig.NAME)
+public class OpenTSDBStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final Logger log = LoggerFactory.getLogger(OpenTSDBStoragePluginConfig.class);
+
+  public static final String NAME = "openTSDB";
+
+  private final String connection;
+
+  @JsonCreator
+  public OpenTSDBStoragePluginConfig(@JsonProperty("connection") String connection) throws IOException {
+    if (connection == null || connection.isEmpty()) {
+      throw UserException.validationError()
+              .message("Connection property must not be null. Check plugin configuration.")
+              .build(log);
+    }
+    this.connection = connection;
+  }
+
+  public String getConnection() {
+    return connection;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OpenTSDBStoragePluginConfig that = (OpenTSDBStoragePluginConfig) o;
+    return Objects.equals(connection, that.connection);
+  }
+
+  @Override
+  public int hashCode() {
+    return connection != null ? connection.hashCode() : 0;
+  }
+
+  @Override
+  public String toString() {
+    return "OpenTSDBStoragePluginConfig{" +
+            "connection='" + connection + '\'' +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
new file mode 100644
index 0000000..4e93804
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/OpenTSDBSubScan.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+@JsonTypeName("openTSDB-sub-scan")
+public class OpenTSDBSubScan extends AbstractBase implements SubScan {
+
+  private static final Logger log =
+      LoggerFactory.getLogger(OpenTSDBSubScan.class);
+
+  public final OpenTSDBStoragePluginConfig storage;
+
+  private final List<SchemaPath> columns;
+  private final OpenTSDBStoragePlugin openTSDBStoragePlugin;
+  private final List<OpenTSDBSubScanSpec> tabletScanSpecList;
+
+  @JsonCreator
+  public OpenTSDBSubScan(@JacksonInject StoragePluginRegistry registry,
+                         @JsonProperty("storage") OpenTSDBStoragePluginConfig storage,
+                         @JsonProperty("tabletScanSpecList") LinkedList<OpenTSDBSubScanSpec> tabletScanSpecList,
+                         @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
+    super((String) null);
+    openTSDBStoragePlugin = (OpenTSDBStoragePlugin) registry.getPlugin(storage);
+    this.tabletScanSpecList = tabletScanSpecList;
+    this.storage = storage;
+    this.columns = columns;
+  }
+
+  public OpenTSDBSubScan(OpenTSDBStoragePlugin plugin, OpenTSDBStoragePluginConfig config,
+                         List<OpenTSDBSubScanSpec> tabletInfoList, List<SchemaPath> columns) {
+    super((String) null);
+    openTSDBStoragePlugin = plugin;
+    storage = config;
+    this.tabletScanSpecList = tabletInfoList;
+    this.columns = columns;
+  }
+
+  @Override
+  public int getOperatorType() {
+    return 0;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    return new OpenTSDBSubScan(openTSDBStoragePlugin, storage, tabletScanSpecList, columns);
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Collections.emptyIterator();
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List<SchemaPath> getColumns() {
+    return columns;
+  }
+
+  public List<OpenTSDBSubScanSpec> getTabletScanSpecList() {
+    return tabletScanSpecList;
+  }
+
+  @JsonIgnore
+  public OpenTSDBStoragePlugin getStorageEngine() {
+    return openTSDBStoragePlugin;
+  }
+
+  @JsonProperty("storage")
+  public OpenTSDBStoragePluginConfig getStorageConfig() {
+    return storage;
+  }
+
+  public static class OpenTSDBSubScanSpec {
+
+    private final String tableName;
+
+    @JsonCreator
+    public OpenTSDBSubScanSpec(@JsonProperty("tableName") String tableName) {
+      this.tableName = tableName;
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java
new file mode 100644
index 0000000..6e0ef05
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/Util.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB;
+
+import com.google.common.base.Splitter;
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class Util {
+
+  private static final Logger log = LoggerFactory.getLogger(Util.class);
+
+  /**
+   * Parse FROM parameters to Map representation
+   *
+   * @param rowData with this syntax (metric=warp.speed.test)
+   * @return Map with params key: metric, value: warp.speed.test
+   */
+  public static Map<String, String> fromRowData(String rowData) {
+    try {
+      String fromRowData = rowData.replaceAll("[()]", "");
+      return Splitter.on(",").trimResults().omitEmptyStrings().withKeyValueSeparator("=").split(fromRowData);
+    } catch (IllegalArgumentException e) {
+      throw UserException.validationError()
+              .message(String.format("Syntax error in the query %s", rowData))
+              .build(log);
+    }
+  }
+
+  /**
+   * @param name Metric name
+   * @return Valid metric name
+   */
+  public static String getValidTableName(String name) {
+    if (!isTableNameValid(name)) {
+      name = fromRowData(name).get("metric");
+    }
+    return name;
+  }
+
+  /**
+   * @param name Metric name
+   * @return true if name is valid
+   */
+  public static boolean isTableNameValid(String name) {
+    return !name.contains("=");
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java
new file mode 100644
index 0000000..1d561c2
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDB.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+import org.apache.drill.exec.store.openTSDB.client.query.DBQuery;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.GET;
+import retrofit2.http.POST;
+
+import java.util.Set;
+
+/**
+ * Client for API requests to openTSDB
+ */
+public interface OpenTSDB {
+
+  /**
+   * Used for getting all metrics names from openTSDB
+   *
+   * @return Set<String> with all tables names
+   */
+  @GET("api/suggest?type=metrics&max=" + Integer.MAX_VALUE)
+  Call<Set<String>> getAllTablesName();
+
+  /**
+   * Overloaded getTables for POST request to DB
+   *
+   * @param query Query for for selecting data
+   * @return Set<Table> with metrics from openTSDB
+   */
+  @POST("api/query")
+  Call<Set<MetricDTO>> getTables(@Body DBQuery query);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java
new file mode 100644
index 0000000..2a6b802
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/OpenTSDBTypes.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+/**
+ * Types in openTSDB records,
+ * used for converting openTSDB data to Sql representation
+ */
+public enum OpenTSDBTypes {
+  STRING,
+  DOUBLE,
+  TIMESTAMP
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java
new file mode 100644
index 0000000..2c8dc9f
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Schema.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.store.openTSDB.Constants.AGGREGATOR_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.DEFAULT_TIME;
+import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.SUM_AGGREGATOR;
+import static org.apache.drill.exec.store.openTSDB.Constants.TIME_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Util.getValidTableName;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.AGGREGATED_VALUE;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.AGGREGATE_TAGS;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.METRIC;
+import static org.apache.drill.exec.store.openTSDB.client.Schema.DefaultColumns.TIMESTAMP;
+
+/**
+ * Abstraction for representing structure of openTSDB table
+ */
+public class Schema {
+
+  private static final Logger log =
+      LoggerFactory.getLogger(Schema.class);
+
+  private final List<ColumnDTO> columns = new ArrayList<>();
+  private final Service db;
+  private final String name;
+
+  public Schema(Service db, String name) {
+    this.db = db;
+    this.name = name;
+    setupStructure();
+  }
+
+  private void setupStructure() {
+    columns.add(new ColumnDTO(METRIC.toString(), OpenTSDBTypes.STRING));
+    columns.add(new ColumnDTO(AGGREGATE_TAGS.toString(), OpenTSDBTypes.STRING));
+    columns.add(new ColumnDTO(TIMESTAMP.toString(), OpenTSDBTypes.TIMESTAMP));
+    columns.add(new ColumnDTO(AGGREGATED_VALUE.toString(), OpenTSDBTypes.DOUBLE));
+    columns.addAll(db.getUnfixedColumns(getParamsForQuery()));
+  }
+
+  /**
+   * Return list with all columns names and its types
+   *
+   * @return List<ColumnDTO>
+   */
+  public List<ColumnDTO> getColumns() {
+    return Collections.unmodifiableList(columns);
+  }
+
+  /**
+   * Number of columns in table
+   *
+   * @return number of table columns
+   */
+  public int getColumnCount() {
+    return columns.size();
+  }
+
+  /**
+   * @param columnIndex index of required column in table
+   * @return ColumnDTO
+   */
+  public ColumnDTO getColumnByIndex(int columnIndex) {
+    return columns.get(columnIndex);
+  }
+
+  // Create map with required params, for querying metrics.
+  // Without this params, we cannot make API request to db.
+  private Map<String, String> getParamsForQuery() {
+    HashMap<String, String> params = new HashMap<>();
+    params.put(METRIC_PARAM, getValidTableName(name));
+    params.put(AGGREGATOR_PARAM, SUM_AGGREGATOR);
+    params.put(TIME_PARAM, DEFAULT_TIME);
+    return params;
+  }
+
+  /**
+   * Structure with constant openTSDB columns
+   */
+  enum DefaultColumns {
+
+    METRIC("metric"),
+    TIMESTAMP("timestamp"),
+    AGGREGATE_TAGS("aggregate tags"),
+    AGGREGATED_VALUE("aggregated value");
+
+    private String columnName;
+
+    DefaultColumns(String name) {
+      this.columnName = name;
+    }
+
+    @Override
+    public String toString() {
+      return columnName;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java
new file mode 100644
index 0000000..0be7394
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/Service.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client;
+
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public interface Service {
+  /**
+   *
+   * Used for getting all Metrics from openTSDB.
+   * Must be present required params: metric, start, aggregator
+   *
+   * @param queryParam parameters for the API request
+   * @return Set<MetricDTO> all metrics
+   */
+  Set<MetricDTO> getAllMetrics(Map<String, String> queryParam);
+
+  /**
+   *
+   * Used for getting all metrics names from openTSDB
+   *
+   * @return Set<String> metric names
+   */
+  Set<String> getAllMetricNames();
+
+  /**
+   *
+   * Used for getting all non fixed columns based on tags from openTSDB
+   * Must be present required params: metric, start, aggregator
+   *
+   * @param queryParam parameters for the API request
+   * @return List<ColumnDTO> columns based on tags
+   */
+  List<ColumnDTO> getUnfixedColumns(Map<String, String> queryParam);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java
new file mode 100644
index 0000000..e79d0ce
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/DBQuery.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client.query;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * DBQuery is an abstraction of an openTSDB query,
+ * that used for extracting data from the storage system by POST request to DB.
+ * <p>
+ * An OpenTSDB query requires at least one sub query,
+ * a means of selecting which time series should be included in the result set.
+ */
+public class DBQuery {
+
+  private static final Logger log =
+          LoggerFactory.getLogger(DBQuery.class);
+  /**
+   * The start time for the query. This can be a relative or absolute timestamp.
+   */
+  private String start;
+  /**
+   * An end time for the query. If not supplied, the TSD will assume the local system time on the server.
+   * This may be a relative or absolute timestamp. This param is optional, and if it isn't specified we will send null
+   * to the db in this field, but in this case db will assume the local system time on the server.
+   */
+  private String end;
+  /**
+   * One or more sub subQueries used to select the time series to return.
+   */
+  private Set<Query> queries;
+
+  private DBQuery(Builder builder) {
+    this.start = builder.start;
+    this.end = builder.end;
+    this.queries = builder.queries;
+  }
+
+  public String getStart() {
+    return start;
+  }
+
+  public String getEnd() {
+    return end;
+  }
+
+  public Set<Query> getQueries() {
+    return queries;
+  }
+
+  public static class Builder {
+
+    private String start;
+    private String end;
+    private Set<Query> queries = new HashSet<>();
+
+    public Builder() {
+    }
+
+    public Builder setStartTime(String startTime) {
+      if (startTime == null) {
+        throw UserException.validationError()
+                .message("start param must be specified")
+                .build(log);
+      }
+      this.start = startTime;
+      return this;
+    }
+
+    public Builder setEndTime(String endTime) {
+      this.end = endTime;
+      return this;
+    }
+
+    public Builder setQueries(Set<Query> queries) {
+      if (queries.isEmpty()) {
+        throw UserException.validationError()
+                .message("Required params such as metric, aggregator weren't specified. " +
+                        "Add these params to the query")
+                .build(log);
+      }
+      this.queries = queries;
+      return this;
+    }
+
+    public DBQuery build() {
+      return new DBQuery(this);
+    }
+
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    DBQuery dbQuery = (DBQuery) o;
+
+    if (!start.equals(dbQuery.start)) {
+      return false;
+    }
+    if (!end.equals(dbQuery.end)) {
+      return false;
+    }
+    return queries.equals(dbQuery.queries);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = start.hashCode();
+    result = 31 * result + end.hashCode();
+    result = 31 * result + queries.hashCode();
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "DBQuery{" +
+            "start='" + start + '\'' +
+            ", end='" + end + '\'' +
+            ", queries=" + queries +
+            '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java
new file mode 100644
index 0000000..bdcd1c4
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/query/Query.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client.query;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Query is an abstraction of openTSDB subQuery
+ * and it is integral part of DBQuery
+ * <p>
+ * Each sub query can retrieve individual or groups of timeseries data,
+ * performing aggregation on each set.
+ */
+public class Query {
+  private static final Logger log =
+          LoggerFactory.getLogger(Query.class);
+  /**
+   * The name of an aggregation function to use.
+   */
+  private String aggregator;
+  /**
+   * The name of a metric stored in the system
+   */
+  private String metric;
+  /**
+   * Whether or not the data should be converted into deltas before returning.
+   * This is useful if the metric is a continuously incrementing counter
+   * and you want to view the rate of change between data points.
+   */
+  private String rate;
+  /**
+   * An optional downsampling function to reduce the amount of data returned.
+   */
+  private String downsample;
+  /**
+   * To drill down to specific timeseries or group results by tag,
+   * supply one or more map values in the same format as the query string.
+   */
+  private Map<String, String> tags;
+
+  private Query(Builder builder) {
+    this.aggregator = builder.aggregator;
+    this.metric = builder.metric;
+    this.rate = builder.rate;
+    this.downsample = builder.downsample;
+    this.tags = builder.tags;
+  }
+
+  public String getAggregator() {
+    return aggregator;
+  }
+
+  public String getMetric() {
+    return metric;
+  }
+
+  public String getRate() {
+    return rate;
+  }
+
+  public String getDownsample() {
+    return downsample;
+  }
+
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
+  public static class Builder {
+
+    private String aggregator;
+    private String metric;
+    private String rate;
+    private String downsample;
+    private Map<String, String> tags = new HashMap<>();
+
+    public Builder(String metric) {
+      this.metric = metric;
+    }
+
+    public Builder setAggregator(String aggregator) {
+      if (aggregator == null) {
+        throw UserException.validationError()
+                .message("aggregator param must be specified")
+                .build(log);
+      }
+      this.aggregator = aggregator;
+      return this;
+    }
+
+    public Builder setMetric(String metric) {
+      if (metric == null) {
+        throw UserException.validationError()
+                .message("metric param must be specified")
+                .build(log);
+      }
+      this.metric = metric;
+      return this;
+    }
+
+    public Builder setRate(String rate) {
+      this.rate = rate;
+      return this;
+    }
+
+    public Builder setDownsample(String downsample) {
+      this.downsample = downsample;
+      return this;
+    }
+
+    public Builder setTags(Map<String, String> tags) {
+      this.tags = tags;
+      return this;
+    }
+
+    public Query build() {
+      return new Query(this);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    Query subQuery = (Query) o;
+
+    if (aggregator != null ? !aggregator.equals(subQuery.aggregator) : subQuery.aggregator != null) {
+      return false;
+    }
+    if (metric != null ? !metric.equals(subQuery.metric) : subQuery.metric != null) {
+      return false;
+    }
+    if (rate != null ? !rate.equals(subQuery.rate) : subQuery.rate != null) {
+      return false;
+    }
+    if (downsample != null ? !downsample.equals(subQuery.downsample) : subQuery.downsample != null) {
+      return false;
+    }
+    return tags != null ? tags.equals(subQuery.tags) : subQuery.tags == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = aggregator != null ? aggregator.hashCode() : 0;
+    result = 31 * result + (metric != null ? metric.hashCode() : 0);
+    result = 31 * result + (rate != null ? rate.hashCode() : 0);
+    result = 31 * result + (downsample != null ? downsample.hashCode() : 0);
+    result = 31 * result + (tags != null ? tags.hashCode() : 0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "SubQuery{" +
+        "aggregator='" + aggregator + '\'' +
+        ", metric='" + metric + '\'' +
+        ", rate='" + rate + '\'' +
+        ", downsample='" + downsample + '\'' +
+        ", tags=" + tags +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
new file mode 100644
index 0000000..41730bd
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/client/services/ServiceImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.client.services;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDB;
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+import org.apache.drill.exec.store.openTSDB.client.Service;
+import org.apache.drill.exec.store.openTSDB.client.query.DBQuery;
+import org.apache.drill.exec.store.openTSDB.client.query.Query;
+import org.apache.drill.exec.store.openTSDB.dto.ColumnDTO;
+import org.apache.drill.exec.store.openTSDB.dto.MetricDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import retrofit2.Retrofit;
+import retrofit2.converter.jackson.JacksonConverterFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.openTSDB.Constants.AGGREGATOR_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.DOWNSAMPLE_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.END_TIME_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.METRIC_PARAM;
+import static org.apache.drill.exec.store.openTSDB.Constants.TIME_PARAM;
+
+public class ServiceImpl implements Service {
+
+  private static final Logger log =
+      LoggerFactory.getLogger(ServiceImpl.class);
+
+  private final OpenTSDB client;
+
+  public ServiceImpl(String connectionURL) {
+    this.client = new Retrofit.Builder()
+        .baseUrl(connectionURL)
+        .addConverterFactory(JacksonConverterFactory.create())
+        .build()
+        .create(OpenTSDB.class);
+  }
+
+  @Override
+  public Set<MetricDTO> getAllMetrics(Map<String, String> queryParams) {
+    return getAllMetricsByTags(queryParams);
+  }
+
+  @Override
+  public Set<String> getAllMetricNames() {
+    return getTableNames();
+  }
+
+  @Override
+  public List<ColumnDTO> getUnfixedColumns(Map<String, String> queryParam) {
+    Set<MetricDTO> metrics = getAllMetricsByTags(queryParam);
+    List<ColumnDTO> unfixedColumns = new ArrayList<>();
+
+    for (MetricDTO metric : metrics) {
+      for (String tag : metric.getTags().keySet()) {
+        ColumnDTO tmp = new ColumnDTO(tag, OpenTSDBTypes.STRING);
+        if (!unfixedColumns.contains(tmp)) {
+          unfixedColumns.add(tmp);
+        }
+      }
+    }
+    return unfixedColumns;
+  }
+
+  private Set<MetricDTO> getAllMetricsByTags(Map<String, String> queryParams) {
+    try {
+      return getAllMetricsFromDBByTags(queryParams);
+    } catch (IOException e) {
+      throw UserException.connectionError(e)
+              .message("Cannot connect to the db. " +
+                      "Maybe you have incorrect connection params or db unavailable now")
+              .build(log);
+    }
+  }
+
+  private Set<String> getTableNames() {
+    try {
+      return client.getAllTablesName().execute().body();
+    } catch (IOException e) {
+      throw UserException.connectionError(e)
+              .message("Cannot connect to the db. " +
+                      "Maybe you have incorrect connection params or db unavailable now")
+              .build(log);
+    }
+  }
+
+  private Set<MetricDTO> getMetricsByTags(DBQuery base) throws IOException {
+    return client.getTables(base).execute().body();
+  }
+
+  private Set<MetricDTO> getAllMetricsFromDBByTags(Map<String, String> queryParams) throws IOException {
+    Map<String, String> tags = new HashMap<>();
+    DBQuery baseQuery = getConfiguredDbQuery(tags, queryParams);
+
+    Set<MetricDTO> metrics = getBaseMetric(baseQuery);
+    if (metrics == null || metrics.isEmpty()) {
+      throw UserException.validationError()
+              .message(String.format("Table '%s' not found. Please check your query and params", queryParams.get(METRIC_PARAM)))
+              .build(log);
+    }
+    Set<String> extractedTags = getTagsFromMetrics(metrics);
+
+    return getMetricsByTags(extractedTags, queryParams);
+  }
+
+  private Set<MetricDTO> getMetricsByTags(Set<String> extractedTags, Map<String, String> queryParams) throws IOException {
+    Set<MetricDTO> metrics = new HashSet<>();
+    for (String value : extractedTags) {
+      metrics.addAll(getMetricsByTags(getConfiguredDbQuery(getTransformedTag(value), queryParams)));
+    }
+    return metrics;
+  }
+
+  private DBQuery getConfiguredDbQuery(Map<String, String> tags, Map<String, String> queryParams) {
+    Query subQuery = new Query.Builder(queryParams.get(METRIC_PARAM))
+        .setAggregator(queryParams.get(AGGREGATOR_PARAM))
+        .setDownsample(queryParams.get(DOWNSAMPLE_PARAM))
+        .setTags(tags).build();
+
+    Set<Query> queries = new HashSet<>();
+    queries.add(subQuery);
+
+    return new DBQuery.Builder()
+        .setStartTime(queryParams.get(TIME_PARAM))
+        .setEndTime(queryParams.get(END_TIME_PARAM))
+        .setQueries(queries)
+        .build();
+  }
+
+  private Set<MetricDTO> getBaseMetric(DBQuery base) throws IOException {
+    return getMetricsByTags(base);
+  }
+
+  private Set<String> getTagsFromMetrics(Set<MetricDTO> metrics) {
+    Set<String> extractedTags = new HashSet<>();
+
+    for (MetricDTO table : metrics) {
+      extractedTags.addAll(table.getAggregateTags());
+      extractedTags.addAll(table.getTags().keySet());
+    }
+
+    return extractedTags;
+  }
+
+  private Map<String, String> getTransformedTag(String tag) {
+    Map<String, String> tags = new HashMap<>();
+    tags.put(tag, "*");
+    return tags;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java
new file mode 100644
index 0000000..03c5952
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/ColumnDTO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.dto;
+
+import org.apache.drill.exec.store.openTSDB.client.OpenTSDBTypes;
+
+import java.util.Objects;
+
+public class ColumnDTO {
+
+  private final String columnName;
+  private final OpenTSDBTypes columnType;
+
+  public ColumnDTO(String columnName, OpenTSDBTypes columnType) {
+    this.columnName = columnName;
+    this.columnType = columnType;
+  }
+
+  public String getColumnName() {
+    return columnName;
+  }
+
+  public OpenTSDBTypes getColumnType() {
+    return columnType;
+  }
+
+  public boolean isNullable() {
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnDTO columnDTO = (ColumnDTO) o;
+    return Objects.equals(columnName, columnDTO.columnName) &&
+        columnType == columnDTO.columnType;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(columnName, columnType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/496c97d1/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java
----------------------------------------------------------------------
diff --git a/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java
new file mode 100644
index 0000000..7e6285f
--- /dev/null
+++ b/contrib/storage-opentsdb/src/main/java/org/apache/drill/exec/store/openTSDB/dto/MetricDTO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.openTSDB.dto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class MetricDTO {
+
+  private String metric;
+  private Map<String, String> tags;
+  private List<String> aggregateTags;
+  private Map<String, String> dps;
+
+  public String getMetric() {
+    return metric;
+  }
+
+  public Map<String, String> getTags() {
+    return tags;
+  }
+
+  public List<String> getAggregateTags() {
+    return aggregateTags;
+  }
+
+  public Map<String, String> getDps() {
+    return dps;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MetricDTO metricDTO = (MetricDTO) o;
+    return Objects.equals(metric, metricDTO.metric) &&
+        Objects.equals(tags, metricDTO.tags) &&
+        Objects.equals(aggregateTags, metricDTO.aggregateTags) &&
+        Objects.equals(dps, metricDTO.dps);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(metric, tags, aggregateTags, dps);
+  }
+
+  @Override
+  public String toString() {
+    return "Table{" +
+        "metric='" + metric + '\'' +
+        ", tags=" + tags +
+        ", aggregateTags=" + aggregateTags +
+        ", dps=" + dps +
+        '}';
+  }
+}


Mime
View raw message