carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [18/25] carbondata git commit: [CARBONDATA-2055][Streaming] Support integrating Stream table with Spark Streaming
Date Sat, 03 Mar 2018 12:44:05 GMT
[CARBONDATA-2055][Streaming] Support integrating Stream table with Spark Streaming

This closes #1867


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6bb5a2b0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6bb5a2b0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6bb5a2b0

Branch: refs/heads/branch-1.3
Commit: 6bb5a2b0a0a8177f14d90177b44e74d38eb69feb
Parents: cf2390a
Author: Zhang Zhichao <441586683@qq.com>
Authored: Sat Jan 27 00:03:19 2018 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Sat Mar 3 18:04:28 2018 +0530

----------------------------------------------------------------------
 .../CarbonBatchSparkStreamingExample.scala      |   6 +-
 .../CarbonStreamSparkStreamingExample.scala     | 218 +++++++++++++++++++
 ...CarbonStructuredStreamingWithRowParser.scala |   2 +-
 integration/spark2/pom.xml                      |   6 +
 .../spark/sql/CarbonSparkStreamingFactory.scala |  60 +++++
 .../TestStreamingTableWithRowParser.scala       |   2 +-
 streaming/pom.xml                               |   6 +
 .../streaming/parser/CarbonStreamParser.java    |   3 +
 .../CarbonSparkStreamingListener.scala          |  31 +++
 .../streaming/CarbonStreamSparkStreaming.scala  | 187 ++++++++++++++++
 10 files changed, 514 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
index 6ae87b9..ef4dbce 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
@@ -167,15 +167,11 @@ object CarbonBatchSparkStreamingExample {
         .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
 
       batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
-        val df = SparkSession.builder().getOrCreate()
-          .createDataFrame(rdd).toDF("id", "name", "city", "salary")
+        val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary")
         println("at time: " + time.toString() + " the count of received data: " + df.count())
         df.write
           .format("carbondata")
           .option("tableName", tableName)
-          .option("tempCSV", "false")
-          .option("compress", "true")
-          .option("single_pass", "true")
           .mode(SaveMode.Append)
           .save()
       }}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
new file mode 100644
index 0000000..f59a610
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.CarbonSparkStreamingFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.streaming.CarbonSparkStreamingListener
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+/**
+ * This example introduces how to use Spark Streaming to write data
+ * to CarbonData stream table.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+// scalastyle:off println
+object CarbonStreamSparkStreamingExample {
+
+  def main(args: Array[String]): Unit = {
+
+    // setup paths
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val checkpointPath =
+      s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+      System.currentTimeMillis().toString()
+    val streamTableName = s"dstream_stream_table"
+
+    val spark = ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4)
+
+    val requireCreateTable = true
+
+    if (requireCreateTable) {
+      // drop table if exists previously
+      spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+      // Create target carbon table and populate with initial data
+      spark.sql(
+        s"""
+           | CREATE TABLE ${ streamTableName }(
+           | id INT,
+           | name STRING,
+           | city STRING,
+           | salary FLOAT
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES(
+           | 'streaming'='true',
+           | 'sort_columns'='name',
+           | 'dictionary_include'='city')
+           | """.stripMargin)
+      val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+      val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      // batch load
+      val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+      spark.sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$path'
+           | INTO TABLE $streamTableName
+           | OPTIONS('HEADER'='true')
+         """.stripMargin)
+
+      // streaming ingest
+      val serverSocket = new ServerSocket(7071)
+      val thread1 = writeSocket(serverSocket)
+      val thread2 = showTableCount(spark, streamTableName)
+      val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath)
+      // add a Spark Streaming Listener to remove all lock for stream tables when stop app
+      ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
+      // wait for stop signal to stop Spark Streaming App
+      waitForStopSignal(ssc)
+      // it need to start Spark Streaming App in main thread
+      // otherwise it will encounter an not-serializable exception.
+      ssc.start()
+      ssc.awaitTermination()
+      thread1.interrupt()
+      thread2.interrupt()
+      serverSocket.close()
+    }
+
+    spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+    spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate
= false)
+
+    // record(id = 100000001) comes from batch segment_0
+    // record(id = 1) comes from stream segment_1
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    // not filter
+    spark.sql(s"select * " +
+              s"from ${ streamTableName } " +
+              s"where id < 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+    spark.stop()
+    System.out.println("streaming finished")
+  }
+
+  def showTableCount(spark: SparkSession, tableName: String): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        for (_ <- 0 to 1000) {
+          println(System.currentTimeMillis())
+          spark.sql(s"select count(*) from $tableName").show(truncate = false)
+          spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+          Thread.sleep(1000 * 5)
+        }
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def waitForStopSignal(ssc: StreamingContext): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+        new ServerSocket(7072).accept()
+        // don't stop SparkContext here
+        ssc.stop(false, true)
+      }
+    }
+    thread.start()
+    thread
+  }
+
+  def startStreaming(spark: SparkSession, tableName: String,
+      tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+    var ssc: StreamingContext = null
+    try {
+      // recommend: the batch interval must set larger, such as 30s, 1min.
+      ssc = new StreamingContext(spark.sparkContext, Seconds(30))
+      ssc.checkpoint(checkpointPath)
+
+      val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+      val batchData = readSocketDF
+        .map(_.split(","))
+        .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+      println("init carbon table info")
+      batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+        val df = spark.createDataFrame(rdd).toDF()
+        println(System.currentTimeMillis().toString() +
+          " at batch time: " + time.toString() +
+          " the count of received data: " + df.count())
+        CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
+          .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+            CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+          .mode(SaveMode.Append)
+          .writeStreamData(df, time)
+      }}
+    } catch {
+      case ex: Exception =>
+        ex.printStackTrace()
+        println("Done reading and writing streaming data")
+    }
+    ssc
+  }
+
+  def writeSocket(serverSocket: ServerSocket): Thread = {
+    val thread = new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to 1000) {
+          // write 5 records per iteration
+          for (_ <- 0 to 100) {
+            index = index + 1
+            socketWriter.println(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (index * 10000.00).toString +
+                                 ",school_" + index + ":school_" + index + index + "$" +
index)
+          }
+          socketWriter.flush()
+          Thread.sleep(2000)
+        }
+        socketWriter.close()
+        System.out.println("Socket closed")
+      }
+    }
+    thread.start()
+    thread
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
index f134a8d..cce833b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -171,7 +171,7 @@ object CarbonStructuredStreamingWithRowParser {
             .option("dbName", "default")
             .option("tableName", "stream_table_with_row_parser")
             .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-              "org.apache.carbondata.streaming.parser.RowStreamParserImp")
+              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
             .start()
 
           qry.awaitTermination()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 9ac240b..90a5891 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -48,6 +48,12 @@
       <artifactId>spark-repl_${scala.binary.version}</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>${spark.deps.scope}</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
new file mode 100644
index 0000000..15b038b
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.CarbonStreamSparkStreaming
+import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter
+
+/**
+ * Create [[CarbonStreamSparkStreamingWriter]] for stream table
+ * when integrate with Spark Streaming.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+object CarbonSparkStreamingFactory {
+
+  def getStreamSparkStreamingWriter(spark: SparkSession,
+    dbNameStr: String,
+    tableName: String): CarbonStreamSparkStreamingWriter =
+    synchronized {
+    val dbName = if (StringUtils.isEmpty(dbNameStr)) "default" else dbNameStr
+    val key = dbName + "." + tableName
+    if (CarbonStreamSparkStreaming.getTableMap.containsKey(key)) {
+      CarbonStreamSparkStreaming.getTableMap.get(key)
+    } else {
+      if (StringUtils.isEmpty(tableName) || tableName.contains(" ")) {
+        throw new CarbonStreamException("Table creation failed. " +
+                                        "Table name must not be blank or " +
+                                        "cannot contain blank space")
+      }
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName),
+        tableName)(spark)
+      if (!carbonTable.isStreamingTable) {
+        throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+                                        s"${carbonTable.getTableName} is not a streaming
table")
+      }
+      val streamWriter = new CarbonStreamSparkStreamingWriter(spark,
+        carbonTable, spark.sessionState.newHadoopConf())
+      CarbonStreamSparkStreaming.getTableMap.put(key, streamWriter)
+      streamWriter
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index a3df2be..3e3b2c5 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -784,7 +784,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
             .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
             .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
             .option(CarbonStreamParser.CARBON_STREAM_PARSER,
-              "org.apache.carbondata.streaming.parser.RowStreamParserImp")
+              CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
             .start()
           qry.awaitTermination()
         } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 40e3d33..1d4dc7f 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -24,6 +24,12 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>${spark.deps.scope}</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index 643758c..e335626 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -31,6 +31,9 @@ public interface CarbonStreamParser {
   String CARBON_STREAM_PARSER_DEFAULT =
       "org.apache.carbondata.streaming.parser.CSVStreamParserImp";
 
+  String CARBON_STREAM_PARSER_ROW_PARSER =
+      "org.apache.carbondata.streaming.parser.RowStreamParserImp";
+
   void initialize(Configuration configuration, StructType structType);
 
   Object[] parserRow(InternalRow value);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
new file mode 100644
index 0000000..6d1fa45
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.scheduler.SparkListenerApplicationEnd
+
+class CarbonSparkStreamingListener extends SparkListener {
+
+  /**
+   * When Spark Streaming App stops, remove all locks for stream table.
+   */
+  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+    CarbonStreamSparkStreaming.cleanAllLockAfterStop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
new file mode 100644
index 0000000..4aa1517
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.Time
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ * Interface used to write stream data to stream table
+ * when integrate with Spark Streaming.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession,
+    val carbonTable: CarbonTable,
+    val configuration: Configuration) {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  private var isInitialize: Boolean = false
+
+  private var lock: ICarbonLock = null
+  private var carbonAppendableStreamSink: Sink = null
+
+  /**
+   * Acquired the lock for stream table
+   */
+  def lockStreamTable(): Unit = {
+    lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.STREAMING_LOCK)
+    if (lock.lockWithRetries()) {
+      LOGGER.info("Acquired the lock for stream table: " +
+                  carbonTable.getDatabaseName + "." +
+                  carbonTable.getTableName)
+    } else {
+      LOGGER.error("Not able to acquire the lock for stream table:" +
+                   carbonTable.getDatabaseName + "." + carbonTable.getTableName)
+      throw new InterruptedException(
+        "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName +
"." +
+        carbonTable.getTableName)
+    }
+  }
+
+  /**
+   * unlock for stream table
+   */
+  def unLockStreamTable(): Unit = {
+    if (null != lock) {
+      lock.unlock()
+      LOGGER.info("unlock for stream table: " +
+                  carbonTable.getDatabaseName + "." +
+                  carbonTable.getTableName)
+    }
+  }
+
+  def initialize(): Unit = {
+    carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
+      sparkSession,
+      configuration,
+      carbonTable,
+      extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
+
+    lockStreamTable()
+
+    isInitialize = true
+  }
+
+  def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
+    if (!isInitialize) {
+      initialize()
+    }
+    carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
+  }
+
+  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var mode: SaveMode = SaveMode.ErrorIfExists
+
+  this.option("dbName", carbonTable.getDatabaseName)
+  this.option("tableName", carbonTable.getTableName)
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   *   - `SaveMode.Overwrite`: overwrite the existing data.
+   *   - `SaveMode.Append`: append the data.
+   *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
+   *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
+   */
+  def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
+    if (mode == SaveMode.ErrorIfExists) {
+      mode = saveMode
+    }
+    this
+  }
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   *   - `overwrite`: overwrite the existing data.
+   *   - `append`: append the data.
+   *   - `ignore`: ignore the operation (i.e. no-op).
+   *   - `error or default`: default option, throw an exception at runtime.
+   */
+  def mode(saveMode: String): CarbonStreamSparkStreamingWriter = {
+    if (mode == SaveMode.ErrorIfExists) {
+      mode = saveMode.toLowerCase(util.Locale.ROOT) match {
+        case "overwrite" => SaveMode.Overwrite
+        case "append" => SaveMode.Append
+        case "ignore" => SaveMode.Ignore
+        case "error" | "default" => SaveMode.ErrorIfExists
+        case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. "
+
+          "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
+      }
+    }
+    this
+  }
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: String): CarbonStreamSparkStreamingWriter = {
+    if (!extraOptions.contains(key)) {
+      extraOptions += (key -> value)
+    }
+    this
+  }
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter =
+    option(key, value.toString)
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: Long): CarbonStreamSparkStreamingWriter =
+    option(key, value.toString)
+
+  /**
+   * Adds an output option
+   */
+  def option(key: String, value: Double): CarbonStreamSparkStreamingWriter =
+    option(key, value.toString)
+}
+
+object CarbonStreamSparkStreaming {
+
+  @transient private val tableMap =
+    new util.HashMap[String, CarbonStreamSparkStreamingWriter]()
+
+  def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap
+
+  /**
+   * remove all stream lock.
+   */
+  def cleanAllLockAfterStop(): Unit = {
+    tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() }
+    tableMap.clear()
+  }
+}


Mime
View raw message