carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2736][CARBONSTORE] Kafka integration with Carbon StreamSQL
Date Wed, 18 Jul 2018 01:39:47 GMT
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 96fe233a2 -> 4b96ed8ca


[CARBONDATA-2736][CARBONSTORE] Kafka integration with Carbon StreamSQL

Modification in this PR:
1.Pass source table properties to streamReader.load()
2.Do not pass schema when sparkSession.readStream
3.Remove querySchema validation against sink as dataFrame made from kafka source will not
have schema ( its written in value column of schema )
4.Extract the dataframe from kafka source which contain actual data schema @ writeStream

This closes #2495


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b96ed8c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b96ed8c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b96ed8c

Branch: refs/heads/carbonstore
Commit: 4b96ed8ca2b99a55c51a8a81f0c606e13b06add7
Parents: 96fe233
Author: Ajith <ajith2489@gmail.com>
Authored: Thu Jul 12 09:17:22 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Wed Jul 18 09:36:58 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/table/CarbonTable.java |  9 ++++
 .../command/carbonTableSchemaCommon.scala       |  3 --
 .../carbondata/stream/StreamJobManager.scala    | 32 +++++++++----
 .../stream/CarbonCreateStreamCommand.scala      | 48 ++++++++++++--------
 4 files changed, 59 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b96ed8c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index c302b2b..f16d1bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -1123,4 +1123,13 @@ public class CarbonTable implements Serializable {
       table.setLocalDictionaryEnabled(Boolean.parseBoolean("false"));
     }
   }
+
+  /**
+   * Return the format value defined in table properties
+   * @return String as per table properties, null if not defined
+   */
+  public String getFormat() {
+    return getTableInfo().getFactTable().getTableProperties()
+            .get("format");
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b96ed8c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index a641329..6cb0dcf 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -738,9 +738,6 @@ class TableNewProcessor(cm: TableModel) {
     tableInfo.setFactTable(tableSchema)
     val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT)
     if (format.isDefined) {
-      if (!format.get.equalsIgnoreCase("csv")) {
-        CarbonException.analysisException(s"Currently we only support csv as external file
format")
-      }
       tableInfo.setFormat(format.get)
       val formatProperties = cm.tableProperties.filter(pair =>
         pair._1.startsWith(s"${format.get.toLowerCase}.")).asJava

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b96ed8c/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 59e924d..470d89a 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -52,19 +52,23 @@ object StreamJobManager {
     }
   }
 
-  private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = {
+  private def validateSinkTable(validateQuerySchema: Boolean,
+                                querySchema: StructType, sink: CarbonTable): Unit = {
     if (!sink.isStreamingSink) {
       throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " +
                                                 "streaming sink table " +
                                                 "('streaming' tblproperty is not 'sink' or
'true')")
     }
-    val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
-      StructField(column.getColName,
-        CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
-    }
-    if (!querySchema.equals(StructType(fields))) {
-      throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} "
+
-                                                s"does not match query output")
+    // TODO: validate query schema against sink in kafka (we cannot get schema directly)
+    if (validateQuerySchema) {
+      val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column =>
+        StructField(column.getColName,
+          CarbonScalaUtil.convertCarbonToSparkDataType(column.getDataType))
+      }
+      if (!querySchema.equals(StructType(fields))) {
+        throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName}
" +
+          s"does not match query output")
+      }
     }
   }
 
@@ -102,14 +106,22 @@ object StreamJobManager {
     }
 
     validateSourceTable(sourceTable)
-    validateSinkTable(streamDf.schema, sinkTable)
+
+    // kafka source always have fixed schema, need to get actual schema
+    val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka")
+    val dataFrame = if (isKafka) {
+      streamDf.selectExpr("CAST(value as STRING)")
+    } else {
+      streamDf
+    }
+    validateSinkTable(!isKafka, dataFrame.schema, sinkTable)
 
     // start a new thread to run the streaming ingest job, the job will be running
     // until user stops it by STOP STREAM JOB
     val thread = new Thread(new Runnable {
       override def run(): Unit = {
         try {
-          job = streamDf.writeStream
+          job = dataFrame.writeStream
             .format("carbondata")
             .trigger(options.trigger)
             .option("checkpointLocation", options.checkpointLocation(sinkTable.getTablePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b96ed8c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index d3b178c..c413a62 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.DataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.types.{StringType, StructType}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -53,20 +52,21 @@ case class CarbonCreateStreamCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val df = sparkSession.sql(query)
     var sourceTable: CarbonTable = null
+    var dataFrame: Option[DataFrame] = None
 
-    // find the streaming source table in the query
-    // and replace it with StreamingRelation
-    val streamLp = df.logicalPlan transform {
+    // Prepare the dataframe from the stream source table
+    df.logicalPlan transform {
       case r: LogicalRelation
         if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
=>
-        val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r)
+        val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
         if (sourceTable != null && sourceTable.getTableName != source.getTableName)
{
           throw new MalformedCarbonCommandException(
             "Stream query on more than one stream source table is not supported")
         }
         sourceTable = source
-        streamingRelation
+        dataFrame = Option(resolvedFrame)
+        r
       case plan: LogicalPlan => plan
     }
 
@@ -82,24 +82,37 @@ case class CarbonCreateStreamCommand(
       sourceTable = sourceTable,
       sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession),
       query = query,
-      streamDf = Dataset.ofRows(sparkSession, streamLp),
+      streamDf = dataFrame.getOrElse(Dataset.ofRows(sparkSession, df.logicalPlan)),
       options = new StreamingOption(optionMap)
     )
     Seq(Row(streamName, jobId, "RUNNING"))
   }
 
-  private def prepareStreamingRelation(
+  /**
+   * Create a dataframe from source table of logicalRelation
+   * @param sparkSession
+   * @param logicalRelation
+   * @return sourceTable and its stream dataFrame
+   */
+  private def prepareDataFrame(
       sparkSession: SparkSession,
-      r: LogicalRelation): (CarbonTable, StreamingRelation) = {
-    val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+      logicalRelation: LogicalRelation): (CarbonTable, DataFrame) = {
+    val sourceTable = logicalRelation.relation
+      .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
     val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties
     val format = tblProperty.get("format")
     if (format == null) {
       throw new MalformedCarbonCommandException("Streaming from carbon file is not supported")
     }
-    val streamReader = sparkSession.readStream
-      .schema(getSparkSchema(sourceTable))
-      .format(format)
+    val streamReader = if (format != "kafka") {
+      sparkSession.readStream
+        .schema(getSparkSchema(sourceTable))
+        .format(format)
+    } else {
+      // kafka source fixed schema, it cannot be set to a custom schema
+      sparkSession.readStream
+        .format(format)
+    }
     val dataFrame = format match {
       case "csv" | "text" | "json" | "parquet" =>
         if (!tblProperty.containsKey("path")) {
@@ -108,16 +121,11 @@ case class CarbonCreateStreamCommand(
         }
         streamReader.load(tblProperty.get("path"))
       case "kafka" | "socket" =>
-        streamReader.load()
+        streamReader.options(tblProperty).load()
       case other =>
         throw new MalformedCarbonCommandException(s"Streaming from $format is not supported")
     }
-    val streamRelation = dataFrame.logicalPlan.asInstanceOf[StreamingRelation]
-
-    // Since SparkSQL analyzer will match the UUID in attribute,
-    // create a new StreamRelation and re-use the same attribute from LogicalRelation
-    (sourceTable,
-      StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output))
+    (sourceTable, dataFrame)
   }
 
   private def getSparkSchema(sourceTable: CarbonTable): StructType = {


Mime
View raw message