carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2776][CarbonStore] Support ingesting data from Kafka service
Date Tue, 31 Jul 2018 17:00:21 GMT
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 2d4628868 -> a6027ae11


[CARBONDATA-2776][CarbonStore] Support ingesting data from Kafka service

This closes #2544


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a6027ae1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a6027ae1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a6027ae1

Branch: refs/heads/carbonstore
Commit: a6027ae116b7d7a5ee65a3d286aa68cd4b357bd4
Parents: 2d46288
Author: QiangCai <qiangcai@qq.com>
Authored: Tue Jul 24 11:18:59 2018 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Aug 1 00:59:53 2018 +0800

----------------------------------------------------------------------
 .../carbondata/stream/StreamJobManager.scala    |   4 +-
 .../stream/CarbonCreateStreamCommand.scala      |  16 +-
 .../table/CarbonCreateTableCommand.scala        |   1 -
 store/sql/pom.xml                               |  21 ++-
 .../apache/carbondata/dis/SensorProducer.java   | 151 +++++++++++++++++++
 .../horizon/rest/controller/SqlHorizon.java     |  78 +++++-----
 .../rest/controller/SqlHorizonController.java   |   7 +
 .../carbondata/horizon/rest/util/Upload.java    |  11 ++
 .../apache/carbondata/dis/SensorConsumer.scala  | 142 +++++++++++++++++
 streaming/pom.xml                               |  54 +++++++
 10 files changed, 438 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 470d89a..478e90d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -108,7 +108,9 @@ object StreamJobManager {
     validateSourceTable(sourceTable)
 
     // kafka source always have fixed schema, need to get actual schema
-    val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka")
+    val isKafka = Option(sourceTable.getFormat).exists { x =>
+      x.equalsIgnoreCase("kafka") || x.equalsIgnoreCase("dis")
+    }
     val dataFrame = if (isKafka) {
       streamDf.selectExpr("CAST(value as STRING)")
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index c413a62..e3e92b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -104,13 +104,15 @@ case class CarbonCreateStreamCommand(
     if (format == null) {
       throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
     }
-    val streamReader = if (format != "kafka") {
+    val streamReader = if (format.equalsIgnoreCase("kafka") ||
+                           format.equalsIgnoreCase("dis")) {
+      // kafka source fixed schema, it cannot be set to a custom schema
       sparkSession.readStream
-        .schema(getSparkSchema(sourceTable))
         .format(format)
+
     } else {
-      // kafka source fixed schema, it cannot be set to a custom schema
       sparkSession.readStream
+        .schema(getSparkSchema(sourceTable))
         .format(format)
     }
     val dataFrame = format match {
@@ -120,7 +122,13 @@ case class CarbonCreateStreamCommand(
             s"'path' tblproperty should be provided for '$format' format")
         }
         streamReader.load(tblProperty.get("path"))
-      case "kafka" | "socket" =>
+      case "kafka" | "socket" | "dis" =>
+        // get source information from SparkSession
+        sparkSession.conf.getAll.foreach { entry =>
+          if (entry._1.toLowerCase.startsWith("carbon.source.")) {
+            streamReader.option(entry._1.substring("carbon.source.".length), entry._2)
+          }
+        }
         streamReader.options(tblProperty).load()
       case other =>
         throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index c403d52..1b1da3f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -143,7 +143,6 @@ case class CarbonCreateTableCommand(
                  |  tableName "$tableName",
                  |  dbName "$dbName",
                  |  tablePath "$tablePath",
-                 |  path "$tablePath",
                  |  isExternal "$isExternal",
                  |  isTransactional "$isTransactionalTable",
                  |  isVisible "$isVisible"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/store/sql/pom.xml
----------------------------------------------------------------------
diff --git a/store/sql/pom.xml b/store/sql/pom.xml
index 683e1af..d7e1f88 100644
--- a/store/sql/pom.xml
+++ b/store/sql/pom.xml
@@ -133,7 +133,7 @@
             <configuration>
               <keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope>
               <createDependencyReducedPom>true</createDependencyReducedPom>
-              <outputFile>${project.build.directory}/horizon-sql-shade.jar</outputFile>
+              <outputFile>${project.build.directory}/carbondata-streamsql.jar</outputFile>
               <artifactSet>
                 <includes>
                   <include>*:*</include>
@@ -156,7 +156,24 @@
                   <exclude>org.apache.hadoop:hadoop-yarn-client</exclude>
                   <exclude>org.apache.hadoop:hadoop-yarn-common</exclude>
                   <exclude>org.apache.hadoop:hadoop-yarn-server-common</exclude>
-                  <exclude>org.apache.spark:*</exclude>
+
+                  <exclude>org.apache.spark:spark-catalyst_2.11</exclude>
+                  <exclude>org.apache.spark:spark-core_2.11</exclude>
+                  <exclude>org.apache.spark:spark-graphx_2.11</exclude>
+                  <exclude>org.apache.spark:spark-hive_2.11</exclude>
+                  <exclude>org.apache.spark:spark-hive-thriftserver_2.11</exclude>
+                  <exclude>org.apache.spark:spark-launcher_2.11</exclude>
+                  <exclude>org.apache.spark:spark-network-common_2.11</exclude>
+                  <exclude>org.apache.spark:spark-network-shuffle_2.11</exclude>
+                  <exclude>org.apache.spark:spark-mllib_2.11</exclude>
+                  <exclude>org.apache.spark:spark-mllib-local_2.11</exclude>
+                  <exclude>org.apache.spark:spark-repl_2.11</exclude>
+                  <exclude>org.apache.spark:spark-sketch_2.11</exclude>
+                  <exclude>org.apache.spark:spark-sql_2.11</exclude>
+                  <exclude>org.apache.spark:spark-streaming_2.11</exclude>
+                  <exclude>org.apache.spark:spark-unsafe_2.11</exclude>
+
+                  <exclude>org.spark-project.hive:*</exclude>
                   <exclude>org.apache.zookeeper:*</exclude>
                   <exclude>org.apache.avro:*</exclude>
                   <exclude>com.google.guava:guava</exclude>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/store/sql/src/main/java/org/apache/carbondata/dis/SensorProducer.java
----------------------------------------------------------------------
diff --git a/store/sql/src/main/java/org/apache/carbondata/dis/SensorProducer.java b/store/sql/src/main/java/org/apache/carbondata/dis/SensorProducer.java
new file mode 100644
index 0000000..2a2335d
--- /dev/null
+++ b/store/sql/src/main/java/org/apache/carbondata/dis/SensorProducer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.dis;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.huaweicloud.dis.DIS;
+import com.huaweicloud.dis.DISClientBuilder;
+import com.huaweicloud.dis.exception.DISClientException;
+import com.huaweicloud.dis.http.exception.ResourceAccessException;
+import com.huaweicloud.dis.iface.data.request.PutRecordsRequest;
+import com.huaweicloud.dis.iface.data.request.PutRecordsRequestEntry;
+import com.huaweicloud.dis.iface.data.response.PutRecordsResult;
+import com.huaweicloud.dis.iface.data.response.PutRecordsResultEntry;
+
+public class SensorProducer {
+
+  private static AtomicLong eventId = new AtomicLong(10000);
+
+  public static void main(String[] args) {
+    if (args.length < 6) {
+      System.err.println(
+          "Usage: SensorProducer <stream name> <endpoint> <region> <ak>
<sk> <project id> ");
+      return;
+    }
+
+    DIS dic = DISClientBuilder.standard().withEndpoint(args[1]).withAk(args[3]).withSk(args[4])
+        .withProjectId(args[5]).withRegion(args[2]).build();
+
+    Sensor sensor = new Sensor(dic, args[0]);
+    Timer timer = new Timer();
+    timer.schedule(sensor, 0, 5000);
+
+  }
+
+  static class Sensor extends TimerTask {
+    private DIS dic;
+
+    private String streamName;
+
+    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    private Random random = new Random();
+
+    private int i = 0;
+    private int flag = 1;
+
+    Sensor(DIS dic, String streamName) {
+      this.dic = dic;
+      this.streamName = streamName;
+    }
+
+    @Override public void run() {
+      uploadData();
+      // recordSensor();
+    }
+
+    private void uploadData() {
+      PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
+      putRecordsRequest.setStreamName(streamName);
+      List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
+      PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
+      putRecordsRequestEntry.setData(ByteBuffer.wrap(recordSensor()));
+      putRecordsRequestEntry
+          .setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
+      putRecordsRequestEntryList.add(putRecordsRequestEntry);
+      putRecordsRequest.setRecords(putRecordsRequestEntryList);
+
+      System.out.println("========== BEGIN PUT ============");
+
+      PutRecordsResult putRecordsResult = null;
+      try {
+        putRecordsResult = dic.putRecords(putRecordsRequest);
+      } catch (DISClientException e) {
+        e.printStackTrace(System.err);
+        System.err.println(
+            "Failed to get a normal response, please check params and retry." + e.getMessage());
+      } catch (ResourceAccessException e) {
+        e.printStackTrace(System.err);
+        System.err.println("Failed to access endpoint. " + e.getMessage());
+      } catch (Exception e) {
+        e.printStackTrace(System.err);
+        System.err.println(e.getMessage());
+      }
+
+      if (putRecordsResult != null) {
+        System.out.println(String.format("Put [%d] records [%d successful / %d failed]",
+            putRecordsResult.getRecords().size(),
+            putRecordsResult.getRecords().size() - putRecordsResult.getFailedRecordCount().get(),
+            putRecordsResult.getFailedRecordCount().get()));
+        for (int j = 0; j < putRecordsResult.getRecords().size(); j++) {
+          PutRecordsResultEntry putRecordsRequestEntry1 = putRecordsResult.getRecords().get(j);
+          if (putRecordsRequestEntry1.getErrorCode() != null) {
+            System.out.println(String.format("[%s] put failed, errorCode [%s], errorMessage
[%s]",
+                new String(putRecordsRequestEntryList.get(j).getData().array(),
+                    Charset.defaultCharset()),
+                putRecordsRequestEntry1.getErrorCode(),
+                putRecordsRequestEntry1.getErrorMessage()));
+          } else {
+            System.out.println(String.format(
+                "[%s] put success, partitionId [%s], partitionKey [%s], sequenceNumber [%s]",
+                new String(putRecordsRequestEntryList.get(j).getData().array(),
+                    Charset.defaultCharset()),
+                putRecordsRequestEntry1.getPartitionId(),
+                putRecordsRequestEntryList.get(j).getPartitionKey(),
+                putRecordsRequestEntry1.getSequenceNumber()));
+          }
+        }
+      }
+      System.out.println("========== END PUT ============");
+    }
+
+    private byte[] recordSensor() {
+      StringBuilder builder = new StringBuilder();
+      // event_id,building,device_id,record_time,temperature
+      builder.append(eventId.getAndIncrement()).append(",G1,1,").append(format.format(new
Date()));
+      i = i + 1 * flag;
+      if (i >= 5 || i <= 0) {
+        flag = 0 - flag;
+      }
+      builder.append(",").append(25 + i).append(".").append(random.nextInt(10));
+      System.out.println(builder.toString());
+      return builder.toString().getBytes(Charset.defaultCharset());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
----------------------------------------------------------------------
diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
index 9bf8e3a..7749094 100644
--- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
+++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizon.java
@@ -19,16 +19,14 @@ package org.apache.carbondata.horizon.rest.controller;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Iterator;
-import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.store.api.conf.StoreConf;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.CarbonSessionBuilder;
 import org.apache.spark.sql.SparkSession;
 
@@ -38,50 +36,19 @@ public class SqlHorizon extends Horizon {
       LogServiceFactory.getLogService(SqlHorizon.class.getCanonicalName());
 
   private static SparkSession session;
-  private static Configuration configuration;
   private static String storeLocation;
 
-  private static void createSession(String[] args) throws IOException {
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
-        .addProperty(CarbonCommonConstants.CARBON_TASK_LOCALITY, "false")
-        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss")
-        .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd")
-        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
-        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "");
-
-    SparkSession.Builder baseBuilder = SparkSession.builder()
-        .appName("Horizon-SQL")
-        .config("spark.ui.port", 9876)
-        .config("spark.sql.crossJoin.enabled", "true");
-
-    Iterator<Map.Entry<String, String>> iterator = configuration.iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<String, String> entry = iterator.next();
-      baseBuilder.config(entry.getKey(), entry.getValue());
-    }
-
-    session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, null, true);
-  }
-
-  static SparkSession getSession() {
-    return session;
-  }
-
   public static void main(String[] args) {
-    if (args.length < 5) {
+    if (args.length < 11) {
       LOGGER.error("Usage: SqlHorizon <store location> <fs.s3a.endpoint> <fs.s3a.access.key>"
-          + " <fs.s3a.secret.key> <fs.s3a.impl>");
+          + " <fs.s3a.secret.key> <fs.s3a.impl> <dis.endpoint> <dis.region>
<dis.projectid>"
+          + " <hive metastore> <mrs hdfs url> <hive warehouse>");
       return;
     }
 
     try {
       storeLocation = args[0];
-      configuration = new Configuration();
-      configuration.set("fs.s3a.endpoint", args[1]);
-      configuration.set("fs.s3a.access.key", args[2]);
-      configuration.set("fs.s3a.secret.key", args[3]);
-      configuration.set("fs.s3a.impl", args[4]);
+      FileFactory.getConfiguration().set("fs.defaultFS", args[9]);
 
       String ip = InetAddress.getLocalHost().getHostAddress();
       LOGGER.audit("Driver IP: " + ip);
@@ -97,10 +64,43 @@ public class SqlHorizon extends Horizon {
     try {
       createSession(args);
       Thread.sleep(Long.MAX_VALUE);
-    } catch (IOException | InterruptedException e) {
+    } catch (InterruptedException e) {
       LOGGER.error(e);
       throw new RuntimeException(e);
     }
   }
 
+  private static void createSession(String[] args) {
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+        .addProperty(CarbonCommonConstants.CARBON_TASK_LOCALITY, "false")
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
+        .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "");
+
+    SparkSession.Builder baseBuilder = SparkSession.builder()
+        .appName("Horizon-SQL")
+        .config("spark.ui.port", 9876)
+        .config("spark.sql.crossJoin.enabled", "true")
+        .config("carbon.source.endpoint", args[5])
+        .config("carbon.source.region", args[6])
+        .config("carbon.source.ak", args[2])
+        .config("carbon.source.sk", args[3])
+        .config("carbon.source.projectid", args[7])
+        .config("spark.sql.warehouse.dir", args[10])
+        .config("hive.metastore.uris", args[8])
+        .config("spark.hadoop.fs.s3a.endpoint", args[1])
+        .config("spark.hadoop.fs.s3a.access.key", args[2])
+        .config("spark.hadoop.fs.s3a.secret.key", args[3])
+        .config("spark.hadoop.fs.s3a.impl", args[4])
+        .config("spark.hadoop.fs.defaultFS", args[9]);
+
+    session = new CarbonSessionBuilder(baseBuilder).build(storeLocation, null, false);
+  }
+
+  static SparkSession getSession() {
+    return session;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
----------------------------------------------------------------------
diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
index da9df52..4c3b996 100644
--- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
+++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.horizon.rest.controller;
 import java.util.List;
 import java.util.stream.IntStream;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.horizon.rest.model.validate.RequestValidator;
 import org.apache.carbondata.horizon.rest.model.view.SqlRequest;
 import org.apache.carbondata.horizon.rest.model.view.SqlResponse;
@@ -40,6 +42,9 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 public class SqlHorizonController {
 
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SqlHorizonController.class.getName());
+
   @RequestMapping(value = "/table/sql", produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<SqlResponse> sql(@RequestBody SqlRequest request) throws StoreException
{
     RequestValidator.validateSql(request);
@@ -50,8 +55,10 @@ public class SqlHorizonController {
               request.getSqlStatement());
       rows = sqlDataFrame.collectAsList();
     } catch (AnalysisException e) {
+      LOGGER.error(e);
       throw new StoreException(e.getSimpleMessage());
     } catch (Exception e) {
+      LOGGER.error(e);
       throw new StoreException(e.getMessage());
     }
     final String[] fieldNames = sqlDataFrame.schema().fieldNames();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
----------------------------------------------------------------------
diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
index 2069a30..e86e152 100644
--- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
+++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/util/Upload.java
@@ -28,6 +28,17 @@ import org.apache.hadoop.io.IOUtils;
 
 /**
  * a util to upload a local file to s3
+ * How to run it?
+ * VM Options:
+ * -Dhadoop.fs.s3a.endpoint=obs.cn-north-1.myhwclouds.com
+ * -Dhadoop.fs.s3a.access.key=XXXX
+ * -Dhadoop.fs.s3a.secret.key=XXXX
+ * -Dhadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
+ *
+ * Program arguments:
+ * local file : xxxx/store/sql/target/carbondata-streamsql.jar
+ * s3 path : s3a://xxxx/carbondata-streamsql.jar
+ * overwrite : true
  */
 
 public class Upload {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/store/sql/src/main/scala/org/apache/carbondata/dis/SensorConsumer.scala
----------------------------------------------------------------------
diff --git a/store/sql/src/main/scala/org/apache/carbondata/dis/SensorConsumer.scala b/store/sql/src/main/scala/org/apache/carbondata/dis/SensorConsumer.scala
new file mode 100644
index 0000000..95d2b09
--- /dev/null
+++ b/store/sql/src/main/scala/org/apache/carbondata/dis/SensorConsumer.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.dis
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+object SensorConsumer {
+
+  def main(args: Array[String]): Unit = {
+
+    if (args.length < 6) {
+      // scalastyle:off println
+      System.err.println(
+          "Usage: SensorConsumer <stream name> <endpoint> <region> <ak>
<sk> <project id>")
+      // scalastyle:on println
+      return
+    }
+
+    val streamName = args(0)
+    val spark = createCarbonSession("DisConsumer")
+    spark.conf.set("carbon.source.endpoint", args(1))
+    spark.conf.set("carbon.source.region", args(2))
+    spark.conf.set("carbon.source.ak", args(3))
+    spark.conf.set("carbon.source.sk", args(4))
+    spark.conf.set("carbon.source.projectid", args(5))
+
+    spark.sql("drop table if exists sensor")
+    spark.sql("drop table if exists sensor_table")
+
+    spark.sql(
+      s"""
+         |create table sensor(
+         |  event_id bigint,
+         |  building string,
+         |  device_id int,
+         |  record_time timestamp,
+         |  temperature double
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         |  'streaming'='source',
+         |  'format'='dis',
+         |  'streamname'='$streamName'
+         |)
+      """.stripMargin)
+
+    spark.sql(
+      s"""
+         |CREATE TABLE sensor_table(
+         | event_id bigint,
+         | building string,
+         | device_id int,
+         | record_time timestamp,
+         | temperature double
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES(
+         |  'streaming'='sink',
+         |  'sort_columns'='device_id, record_time'
+         |)
+      """.stripMargin)
+
+    spark.sql(
+      """
+        |CREATE STREAM stream123 ON TABLE sensor_table
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='5 seconds',
+        |  'carbon.stream.parser'='org.apache.carbondata.streaming.parser.CSVStreamParserImp')
+        |AS
+        |  SELECT *
+        |  FROM sensor
+      """.stripMargin).show(false)
+
+
+    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    (0 to 100).foreach { x =>
+      val time = format.format(new Date())
+      Thread.sleep(5000)
+      spark.sql("select * from sensor_table " +
+                "where record_time > current_timestamp() - INTERVAL 1 MINUTE")
+        .show(100, false)
+    }
+  }
+
+  def createCarbonSession(appName: String, workThreadNum: Int = 1): SparkSession = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/store/sql/target/store"
+    val warehouse = s"$rootPath/store/sql/target/warehouse"
+    val metastoredb = s"$rootPath/store/sql/target"
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss")
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+      .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "")
+
+    val masterUrl = if (workThreadNum <= 1) {
+      "local"
+    } else {
+      "local[" + workThreadNum.toString() + "]"
+    }
+
+    import org.apache.spark.sql.CarbonSession._
+
+    val spark = SparkSession
+      .builder()
+      .master(masterUrl)
+      .appName(appName)
+      .config("spark.sql.warehouse.dir", warehouse)
+      .config("spark.driver.host", "localhost")
+      .config("spark.sql.crossJoin.enabled", "true")
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
+
+    spark.sparkContext.setLogLevel("ERROR")
+    spark
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6027ae1/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 0883f70..1a2d3d7 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -139,4 +139,58 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>horizon</id>
+      <dependencies>
+        <!-- kafka dependencies-->
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
+          <version>2.2.1</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka-clients</artifactId>
+          <version>0.10.0.1</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.kafka</groupId>
+          <artifactId>kafka_2.11</artifactId>
+          <version>0.10.0.1</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
+          <version>2.2.1</version>
+        </dependency>
+        <!--DIS dependencies-->
+        <dependency>
+          <groupId>com.huawei.apigateway</groupId>
+          <artifactId>java-sdk-core</artifactId>
+          <version>2.0.1</version>
+        </dependency>
+        <dependency>
+          <groupId>com.huaweicloud.dis</groupId>
+          <artifactId>huaweicloud-sdk-java-dis</artifactId>
+          <version>1.3.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+          <groupId>com.huaweicloud.dis</groupId>
+          <artifactId>huaweicloud-sdk-java-dis-iface</artifactId>
+          <version>1.3.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+          <groupId>com.bigdata.dis</groupId>
+          <artifactId>dis-kafka-adapter</artifactId>
+          <version>1.0.4</version>
+        </dependency>
+        <dependency>
+          <groupId>com.bigdata.dis</groupId>
+          <artifactId>dis-structured-streaming_2.11</artifactId>
+          <version>1.0.4</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>


Mime
View raw message