carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [06/15] carbondata git commit: [CARBONDATA-1572][Streaming] Support streaming ingest and query
Date Fri, 10 Nov 2017 05:16:27 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index f4f8b75..4580f22 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -26,27 +26,30 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.execution.command.{TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.strategy.CarbonLateDecodeStrategy
+import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
 
 /**
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
  */
 class CarbonSource extends CreatableRelationProvider with RelationProvider
-  with SchemaRelationProvider with DataSourceRegister {
+  with SchemaRelationProvider with StreamSinkProvider with DataSourceRegister {
 
   override def shortName(): String = "carbondata"
 
@@ -208,6 +211,35 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
   }
 
+  /**
+   * produce a streaming `Sink` for a specific format
+   * now it will create a default sink(CarbonAppendableStreamSink) for row format
+   */
+  override def createSink(sqlContext: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+
+    // check "tablePath" option
+    val tablePathOption = parameters.get("tablePath")
+    if (tablePathOption.isDefined) {
+      val sparkSession = sqlContext.sparkSession
+      val identifier: AbsoluteTableIdentifier =
+        AbsoluteTableIdentifier.fromTablePath(tablePathOption.get)
+      val carbonTable =
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.
+          createCarbonRelation(parameters, identifier, sparkSession).tableMeta.carbonTable
+
+      // create sink
+      StreamSinkFactory.createStreamTableSink(
+        sqlContext.sparkSession,
+        carbonTable,
+        parameters)
+    } else {
+      throw new CarbonStreamException("Require tablePath option for the write stream")
+    }
+  }
+
 }
 
 object CarbonSource {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 39dc565..8b1212d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <snappy.version>1.1.2.6</snappy.version>
-    <hadoop.version>2.2.0</hadoop.version>
+    <hadoop.version>2.7.2</hadoop.version>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.deps.scope>compile</spark.deps.scope>
     <scala.deps.scope>compile</scala.deps.scope>
@@ -438,6 +438,7 @@
         <module>integration/hive</module>
         <module>integration/presto</module>
         <module>examples/flink</module>
+        <module>streaming</module>
       </modules>
       <build>
         <plugins>
@@ -452,13 +453,6 @@
       </build>
     </profile>
     <profile>
-      <id>hadoop-2.2.0</id>
-      <!-- default -->
-      <properties>
-        <hadoop.version>2.2.0</hadoop.version>
-      </properties>
-    </profile>
-    <profile>
       <id>hadoop-2.7.2</id>
       <properties>
         <hadoop.version>2.7.2</hadoop.version>
@@ -478,6 +472,7 @@
         <module>integration/spark2</module>
         <module>integration/hive</module>
         <module>integration/presto</module>
+        <module>streaming</module>
         <module>examples/spark2</module>
       </modules>
       <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
index 61646ec..9a37421 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -64,7 +64,7 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
   public static final String QUOTE_DEFAULT = "\"";
   public static final String ESCAPE = "carbon.csvinputformat.escape";
   public static final String ESCAPE_DEFAULT = "\\";
-  public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
+  public static final String HEADER_PRESENT = "carbon.csvinputformat.header.present";
   public static final boolean HEADER_PRESENT_DEFAULT = false;
   public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
   public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
@@ -171,6 +171,24 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
     configuration.set(NUMBER_OF_COLUMNS, numberOfColumns);
   }
 
+  public static CsvParserSettings extractCsvParserSettings(Configuration job) {
+    CsvParserSettings parserSettings = new CsvParserSettings();
+    parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
+    parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
+    parserSettings.setLineSeparatorDetectionEnabled(true);
+    parserSettings.setNullValue("");
+    parserSettings.setEmptyValue("");
+    parserSettings.setIgnoreLeadingWhitespaces(false);
+    parserSettings.setIgnoreTrailingWhitespaces(false);
+    parserSettings.setSkipEmptyLines(false);
+    parserSettings.setMaxCharsPerColumn(MAX_CHARS_PER_COLUMN_DEFAULT);
+    String maxColumns = job.get(MAX_COLUMNS, "" + DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+    parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
+    parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
+    parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
+    return parserSettings;
+  }
+
   /**
    * Treats value as line in file. Key is null.
    */
@@ -232,30 +250,13 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
       }
       reader = new InputStreamReader(inputStream,
           Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-      csvParser = new CsvParser(extractCsvParserSettings(job));
-      csvParser.beginParsing(reader);
-    }
-
-    private CsvParserSettings extractCsvParserSettings(Configuration job) {
-      CsvParserSettings parserSettings = new CsvParserSettings();
-      parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
-      parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
-      parserSettings.setLineSeparatorDetectionEnabled(true);
-      parserSettings.setNullValue("");
-      parserSettings.setEmptyValue("");
-      parserSettings.setIgnoreLeadingWhitespaces(false);
-      parserSettings.setIgnoreTrailingWhitespaces(false);
-      parserSettings.setSkipEmptyLines(false);
-      parserSettings.setMaxCharsPerColumn(MAX_CHARS_PER_COLUMN_DEFAULT);
-      String maxColumns = job.get(MAX_COLUMNS);
-      parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
-      parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
-      parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
+      CsvParserSettings settings = extractCsvParserSettings(job);
       if (start == 0) {
-        parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
+        settings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
             HEADER_PRESENT_DEFAULT));
       }
-      return parserSettings;
+      csvParser = new CsvParser(settings);
+      csvParser.beginParsing(reader);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 972e414..1b6ba72 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -392,7 +392,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter {
   protected abstract void fillBlockIndexInfoDetails(long numberOfRows, String carbonDataFileName,
       long currentPosition);
 
-  protected List<org.apache.carbondata.format.ColumnSchema> getColumnSchemaListAndCardinality(
+  public static List<org.apache.carbondata.format.ColumnSchema> getColumnSchemaListAndCardinality(
       List<Integer> cardinality, int[] dictionaryColumnCardinality,
       List<ColumnSchema> wrapperColumnSchemaList) {
     List<org.apache.carbondata.format.ColumnSchema> columnSchemaList =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 1c7f9e7..761867c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -42,7 +42,9 @@ import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -259,6 +261,23 @@ public final class CarbonDataProcessorUtil {
         .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
   }
 
+  public static boolean[] getNoDictionaryMapping(CarbonColumn[] carbonColumns) {
+    List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
+    for (CarbonColumn column : carbonColumns) {
+      // for  complex type need to break the loop
+      if (column.isComplex()) {
+        break;
+      }
+      if (!column.hasEncoding(Encoding.DICTIONARY) && column.isDimension()) {
+        noDictionaryMapping.add(true);
+      } else if (column.isDimension()) {
+        noDictionaryMapping.add(false);
+      }
+    }
+    return ArrayUtils
+        .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
+  }
+
   /**
    * Preparing the boolean [] to map whether the dimension use inverted index or not.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 0b88684..3cf851f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -186,7 +186,7 @@ public final class CarbonLoaderUtil {
     }
   }
 
-  private static void deleteStorePath(String path) {
+  public static void deleteStorePath(String path) {
     try {
       FileType fileType = FileFactory.getFileType(path);
       if (FileFactory.isFileExist(path, fileType)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
new file mode 100644
index 0000000..d9dac75
--- /dev/null
+++ b/streaming/pom.xml
@@ -0,0 +1,127 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>carbondata-parent</artifactId>
+    <groupId>org.apache.carbondata</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>carbondata-streaming</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache CarbonData :: Streaming</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <dev.path>${basedir}/../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>3.8.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <resources>
+      <resource>
+        <directory>src/resources</directory>
+      </resource>
+      <resource>
+        <directory>.</directory>
+        <includes>
+          <include>CARBON_STREAMING_INTERFACELogResource.properties</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.18</version>
+        <!-- Note config is repeated in scalatest config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamException.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamException.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamException.java
new file mode 100644
index 0000000..602cef7
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+/**
+ * Stream exception
+ */
+public class CarbonStreamException extends Exception {
+
+  public CarbonStreamException(String message) {
+    super(message);
+  }
+
+  public CarbonStreamException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
new file mode 100644
index 0000000..eed3fd5
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.parser;
+
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+/**
+ * CSV Stream Parser, it is also the default parser.
+ */
+public class CSVStreamParserImp implements CarbonStreamParser {
+
+  private CsvParser csvParser;
+
+  @Override public void initialize(Configuration configuration) {
+    CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(configuration);
+    csvParser = new CsvParser(settings);
+  }
+
+  @Override public Object[] parserRow(InternalRow row) {
+    return csvParser.parseLine(row.getString(0));
+  }
+
+  @Override public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
new file mode 100644
index 0000000..a3b5592
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.parser;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+/**
+ * Stream parser interface
+ */
+public interface CarbonStreamParser {
+
+  String CARBON_STREAM_PARSER = "carbon.stream.parser";
+
+  String CARBON_STREAM_PARSER_DEFAULT = "org.apache.carbondata.streaming.parser.CSVStreamParserImp";
+
+  void initialize(Configuration configuration);
+
+  Object[] parserRow(InternalRow value);
+
+  void close();
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
new file mode 100644
index 0000000..32ba332
--- /dev/null
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.segment;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletIndex;
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat;
+import org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter;
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * streaming segment manager
+ */
+public class StreamSegment {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(StreamSegment.class.getName());
+
+  public static final long STREAM_SEGMENT_MAX_SIZE = 1024L * 1024 * 1024;
+
+  /**
+   * get stream segment or create new stream segment if not exists
+   */
+  public static String open(CarbonTable table) throws IOException {
+    CarbonTablePath tablePath =
+        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
+    SegmentStatusManager segmentStatusManager =
+        new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                + " for stream table get or create segment");
+
+        LoadMetadataDetails[] details =
+            SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath());
+        LoadMetadataDetails streamSegment = null;
+        for (LoadMetadataDetails detail : details) {
+          if (FileFormat.rowformat == detail.getFileFormat()) {
+            if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING.equals(detail.getLoadStatus())) {
+              streamSegment = detail;
+              break;
+            }
+          }
+        }
+        if (null == streamSegment) {
+          int segmentId = SegmentStatusManager.createNewSegmentId(details);
+          LoadMetadataDetails newDetail = new LoadMetadataDetails();
+          newDetail.setPartitionCount("0");
+          newDetail.setLoadName("" + segmentId);
+          newDetail.setFileFormat(FileFormat.rowformat);
+          newDetail.setLoadStartTime(System.currentTimeMillis());
+          newDetail.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_STREAMING);
+
+          LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
+          int i = 0;
+          for (; i < details.length; i++) {
+            newDetails[i] = details[i];
+          }
+          newDetails[i] = newDetail;
+          SegmentStatusManager
+              .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+          return newDetail.getLoadName();
+        } else {
+          return streamSegment.getLoadName();
+        }
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for stream table get or create segment for table " + table
+                .getDatabaseName() + "." + table.getFactTableName());
+        throw new IOException("Failed to get stream segment");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after stream table get or create segment" + table
+            .getDatabaseName() + "." + table.getFactTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock table lock for stream table" + table.getDatabaseName() + "." + table
+                .getFactTableName() + " during stream table get or create segment");
+      }
+    }
+  }
+
+  /**
+   * marker old stream segment to finished status and create new stream segment
+   */
+  public static String close(CarbonTable table, String segmentId)
+      throws IOException {
+    CarbonTablePath tablePath =
+        CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier());
+    SegmentStatusManager segmentStatusManager =
+        new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                + " for stream table finish segment");
+
+        LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(tablePath.getPath());
+        for (LoadMetadataDetails detail : details) {
+          if (segmentId.equals(detail.getLoadName())) {
+            detail.setLoadEndTime(System.currentTimeMillis());
+            detail.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH);
+            break;
+          }
+        }
+
+        int newSegmentId = SegmentStatusManager.createNewSegmentId(details);
+        LoadMetadataDetails newDetail = new LoadMetadataDetails();
+        newDetail.setPartitionCount("0");
+        newDetail.setLoadName("" + newSegmentId);
+        newDetail.setFileFormat(FileFormat.rowformat);
+        newDetail.setLoadStartTime(System.currentTimeMillis());
+        newDetail.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_STREAMING);
+
+        LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
+        int i = 0;
+        for (; i < details.length; i++) {
+          newDetails[i] = details[i];
+        }
+        newDetails[i] = newDetail;
+        SegmentStatusManager
+            .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails);
+        return newDetail.getLoadName();
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for stream table status updation for table " + table
+                .getDatabaseName() + "." + table.getFactTableName());
+        throw new IOException("Failed to get stream segment");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info(
+            "Table unlocked successfully after table status updation" + table.getDatabaseName()
+                + "." + table.getFactTableName());
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+            .getFactTableName() + " during table status updation");
+      }
+    }
+  }
+
+  /**
+   * invoke CarbonStreamOutputFormat to append batch data to existing carbondata file
+   */
+  public static void appendBatchData(CarbonIterator<Object[]> inputIterators,
+      TaskAttemptContext job) throws Exception {
+    CarbonStreamRecordWriter writer = null;
+    try {
+      writer = (CarbonStreamRecordWriter)new CarbonStreamOutputFormat().getRecordWriter(job);
+      // at the begin of each task, should recover file if necessary
+      // here can reuse some information of record writer
+      recoverFileIfRequired(
+          writer.getSegmentDir(),
+          writer.getFileName(),
+          CarbonTablePath.getCarbonStreamIndexFileName());
+
+      while (inputIterators.hasNext()) {
+        writer.write(null, inputIterators.next());
+      }
+      inputIterators.close();
+    } finally {
+      if (writer != null) {
+        writer.close(job);
+      }
+    }
+  }
+
+  /**
+   * check the health of stream segment and try to recover segment from job fault
+   * this method will be invoked in following scenarios.
+   * 1. at the begin of the streaming (StreamSinkFactory.getStreamSegmentId)
+   * 2. after job failed (CarbonAppendableStreamSink.writeDataFileJob)
+   */
+  public static void recoverSegmentIfRequired(String segmentDir) throws IOException {
+    FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+    if (FileFactory.isFileExist(segmentDir, fileType)) {
+      String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
+      String indexPath = segmentDir + File.separator + indexName;
+      CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+      CarbonFile[] files = listDataFiles(segmentDir, fileType);
+      // TODO better to check backup index at first
+      // index file exists
+      if (index.exists()) {
+        // data file exists
+        if (files.length > 0) {
+          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+          try {
+            // map block index
+            indexReader.openThriftReader(indexPath);
+            Map<String, Long> tableSizeMap = new HashMap<>();
+            while (indexReader.hasNext()) {
+              BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+              tableSizeMap.put(blockIndex.getFile_name(), blockIndex.getFile_size());
+            }
+            // recover each file
+            for (CarbonFile file : files) {
+              Long size = tableSizeMap.get(file.getName());
+              if (null == size || size == 0) {
+                file.delete();
+              } else if (size < file.getSize()) {
+                FileFactory.truncateFile(file.getCanonicalPath(), fileType, size);
+              }
+            }
+          } finally {
+            indexReader.closeThriftReader();
+          }
+        }
+      } else {
+        if (files.length > 0) {
+          for (CarbonFile file : files) {
+            file.delete();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * check the health of stream data file and try to recover data file from task fault
+   *  this method will be invoked in following scenarios.
+   *  1. at the begin of writing data file task
+   */
+  public static void recoverFileIfRequired(
+      String segmentDir,
+      String fileName,
+      String indexName) throws IOException {
+
+    FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+    String filePath = segmentDir + File.separator + fileName;
+    CarbonFile file = FileFactory.getCarbonFile(filePath, fileType);
+    String indexPath = segmentDir + File.separator + indexName;
+    CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+    if (file.exists() && index.exists()) {
+      CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+      try {
+        indexReader.openThriftReader(indexPath);
+        while (indexReader.hasNext()) {
+          BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+          if (blockIndex.getFile_name().equals(fileName)) {
+            if (blockIndex.getFile_size() == 0) {
+              file.delete();
+            } else if (blockIndex.getFile_size() < file.getSize()) {
+              FileFactory.truncateFile(filePath, fileType, blockIndex.getFile_size());
+            }
+          }
+        }
+      } finally {
+        indexReader.closeThriftReader();
+      }
+    }
+  }
+
+
+  /**
+   * list all carbondata files of a segment
+   */
+  public static CarbonFile[] listDataFiles(String segmentDir, FileFactory.FileType fileType) {
+    CarbonFile carbonDir = FileFactory.getCarbonFile(segmentDir, fileType);
+    if (carbonDir.exists()) {
+      return carbonDir.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return CarbonTablePath.isCarbonDataFile(file.getName());
+        }
+      });
+    } else {
+      return new CarbonFile[0];
+    }
+  }
+
+  /**
+   * update carbonindex file after after a stream batch.
+   */
+  public static void updateIndexFile(String segmentDir) throws IOException {
+    FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+    String filePath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+    String tempFilePath = filePath + CarbonCommonConstants.TEMPWRITEFILEEXTENSION;
+    CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
+    try {
+      writer.openThriftWriter(tempFilePath);
+      CarbonFile[] files = listDataFiles(segmentDir, fileType);
+      BlockIndex blockIndex;
+      for (CarbonFile file : files) {
+        blockIndex = new BlockIndex();
+        blockIndex.setFile_name(file.getName());
+        blockIndex.setFile_size(file.getSize());
+        // TODO need to collect these information
+        blockIndex.setNum_rows(-1);
+        blockIndex.setOffset(-1);
+        blockIndex.setBlock_index(new BlockletIndex());
+        writer.writeThrift(blockIndex);
+      }
+      writer.close();
+      CarbonFile tempFile = FileFactory.getCarbonFile(tempFilePath, fileType);
+      if (!tempFile.renameForce(filePath)) {
+        throw new IOException(
+            "temporary file renaming failed, src=" + tempFilePath + ", dest=" + filePath);
+      }
+    } catch (IOException ex) {
+      try {
+        writer.close();
+      } catch (IOException t) {
+        LOGGER.error(t);
+      }
+      throw ex;
+    }
+  }
+
+  /**
+   * calculate the size of the segment by the accumulation of data sizes in index file
+   */
+  public static long size(String segmentDir) throws IOException {
+    long size = 0;
+    FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
+    if (FileFactory.isFileExist(segmentDir, fileType)) {
+      String indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir);
+      CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
+      if (index.exists()) {
+        CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+        try {
+          indexReader.openThriftReader(indexPath);
+          while (indexReader.hasNext()) {
+            BlockIndex blockIndex = indexReader.readBlockIndexInfo();
+            size += blockIndex.getFile_size();
+          }
+        } finally {
+          indexReader.closeThriftReader();
+        }
+      }
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
new file mode 100644
index 0000000..3ac19d9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.util.DataLoadingUtil
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * Stream sink factory
+ */
+object StreamSinkFactory {
+
+  def createStreamTableSink(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      parameters: Map[String, String]): Sink = {
+    validateParameters(parameters)
+
+    // prepare the stream segment
+    val segmentId = getStreamSegmentId(carbonTable)
+    // build load model
+    val carbonLoadModel = buildCarbonLoadModelForStream(
+      sparkSession,
+      carbonTable,
+      parameters,
+      segmentId)
+    // start server if necessary
+    val server = startDictionaryServer(
+      sparkSession,
+      carbonTable,
+      carbonLoadModel.getDictionaryServerPort)
+    if (server.isDefined) {
+      carbonLoadModel.setUseOnePass(true)
+      carbonLoadModel.setDictionaryServerPort(server.get.getPort)
+    } else {
+      carbonLoadModel.setUseOnePass(false)
+    }
+    // default is carbon appended stream sink
+    new CarbonAppendableStreamSink(
+      sparkSession,
+      carbonTable,
+      segmentId,
+      parameters,
+      carbonLoadModel,
+      server)
+  }
+
+  private def validateParameters(parameters: Map[String, String]): Unit = {
+    // TODO require to validate parameters
+  }
+
+  /**
+   * get current stream segment id
+   * @return
+   */
+  private def getStreamSegmentId(carbonTable: CarbonTable): String = {
+    val segmentId = StreamSegment.open(carbonTable)
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    val segmentDir = carbonTablePath.getSegmentDir("0", segmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (FileFactory.isFileExist(segmentDir, fileType)) {
+      // recover fault
+      StreamSegment.recoverSegmentIfRequired(segmentDir)
+    } else {
+      FileFactory.mkdirs(segmentDir, fileType)
+    }
+    segmentId
+  }
+
+  def startDictionaryServer(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      port: Int): Option[DictionaryServer] = {
+    // start dictionary server when use one pass load and dimension with DICTIONARY
+    // encoding is present.
+    val allDimensions = carbonTable.getAllDimensions.asScala.toList
+    val createDictionary = allDimensions.exists {
+      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+    }
+    val server: Option[DictionaryServer] = if (createDictionary) {
+      val dictionaryServer = DictionaryServer.getInstance(port, carbonTable)
+      sparkSession.sparkContext.addSparkListener(new SparkListener() {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+          dictionaryServer.shutdown()
+        }
+      })
+      Some(dictionaryServer)
+    } else {
+      None
+    }
+    server
+  }
+
+  private def buildCarbonLoadModelForStream(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      parameters: Map[String, String],
+      segmentId: String): CarbonLoadModel = {
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
+    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
+    optionsFinal.put("sort_scope", "no_sort")
+    if (parameters.get("fileheader").isEmpty) {
+      optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getFactTableName)
+        .asScala.map(_.getColName).mkString(","))
+    }
+    val carbonLoadModel = new CarbonLoadModel()
+    DataLoadingUtil.buildCarbonLoadModel(
+      carbonTable,
+      carbonProperty,
+      parameters,
+      optionsFinal,
+      carbonLoadModel
+    )
+    carbonLoadModel.setSegmentId(segmentId)
+    // stream should use one pass
+    val dictionaryServerPort = parameters.getOrElse(
+      CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+      carbonProperty.getProperty(
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT))
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+    carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt)
+    carbonLoadModel
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
new file mode 100644
index 0000000..844423a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.Date
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.{SparkHadoopWriter, TaskContext}
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+import org.apache.carbondata.common.CarbonIterator
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.dictionary.server.DictionaryServer
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * an implement of stream sink, it persist each batch to disk by appending the batch data to
+ * data files.
+ */
+class CarbonAppendableStreamSink(
+    sparkSession: SparkSession,
+    val carbonTable: CarbonTable,
+    var currentSegmentId: String,
+    parameters: Map[String, String],
+    carbonLoadModel: CarbonLoadModel,
+    server: Option[DictionaryServer]) extends Sink {
+
+  private val carbonTablePath = CarbonStorePath
+    .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+  private val fileLogPath = carbonTablePath.getStreamingLogDir
+  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
+  // prepare configuration
+  private val hadoopConf = {
+    val conf = sparkSession.sessionState.newHadoopConf()
+    CarbonStreamOutputFormat.setCarbonLoadModel(conf, carbonLoadModel)
+    // put all parameters into hadoopConf
+    parameters.foreach { entry =>
+      conf.set(entry._1, entry._2)
+    }
+    conf
+  }
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
+      CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
+    } else {
+      checkOrHandOffSegment()
+
+      // committer will record how this spark job commit its output
+      val committer = FileCommitProtocol.instantiate(
+        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
+        jobId = batchId.toString,
+        outputPath = fileLogPath,
+        isAppend = false)
+
+      committer match {
+        case manifestCommitter: ManifestFileCommitProtocol =>
+          manifestCommitter.setupManifestOptions(fileLog, batchId)
+        case _ => // Do nothing
+      }
+
+      CarbonAppendableStreamSink.writeDataFileJob(
+        sparkSession,
+        carbonTable,
+        parameters,
+        batchId,
+        currentSegmentId,
+        data.queryExecution,
+        committer,
+        hadoopConf,
+        server)
+    }
+  }
+
+  /**
+   * if the directory size of current segment beyond the threshold, hand off new segment
+   */
+  private def checkOrHandOffSegment(): Unit = {
+    val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+    val fileType = FileFactory.getFileType(segmentDir)
+    if (StreamSegment.STREAM_SEGMENT_MAX_SIZE <=
+        StreamSegment.size(segmentDir)) {
+      val newSegmentId =
+        StreamSegment.close(carbonTable, currentSegmentId)
+      currentSegmentId = newSegmentId
+      val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+      FileFactory.mkdirs(newSegmentDir, fileType)
+    }
+
+    // TODO trigger hand off operation
+  }
+}
+
+object CarbonAppendableStreamSink {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * package the hadoop configuration and it will be passed to executor side from driver side
+   */
+  case class WriteDataFileJobDescription(
+      serializableHadoopConf: SerializableConfiguration,
+      batchId: Long,
+      segmentId: String)
+
+  /**
+   * Run a spark job to append the newly arrived data to the existing row format
+   * file directly.
+   * If there are failure in the task, spark will re-try the task and
+   * carbon will do recovery by HDFS file truncate. (see StreamSegment.tryRecoverFromTaskFault)
+   * If there are job level failure, every files in the stream segment will do truncate
+   * if necessary. (see StreamSegment.tryRecoverFromJobFault)
+   */
+  def writeDataFileJob(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      parameters: Map[String, String],
+      batchId: Long,
+      segmentId: String,
+      queryExecution: QueryExecution,
+      committer: FileCommitProtocol,
+      hadoopConf: Configuration,
+      server: Option[DictionaryServer]): Unit = {
+
+    // create job
+    val job = Job.getInstance(hadoopConf)
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+
+    val description = WriteDataFileJobDescription(
+      serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
+      batchId,
+      segmentId
+    )
+
+    // run write data file job
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+      var result: Array[TaskCommitMessage] = null
+      try {
+        committer.setupJob(job)
+        // initialize dictionary server
+        if (server.isDefined) {
+          server.get.initializeDictionaryGenerator(carbonTable)
+        }
+
+        // write data file
+        result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
+          (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
+            writeDataFileTask(
+              description,
+              sparkStageId = taskContext.stageId(),
+              sparkPartitionId = taskContext.partitionId(),
+              sparkAttemptNumber = taskContext.attemptNumber(),
+              committer,
+              iterator
+            )
+          })
+
+        // write dictionary
+        if (server.isDefined) {
+          try {
+            server.get.writeTableDictionary(carbonTable.getCarbonTableIdentifier.getTableId)
+          } catch {
+            case _: Exception =>
+              LOGGER.error(
+                s"Error while writing dictionary file for ${carbonTable.getTableUniqueName}")
+              throw new Exception(
+                "Streaming ingest failed due to error while writing dictionary file")
+          }
+        }
+
+        // update data file info in index file
+        val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+        StreamSegment.updateIndexFile(tablePath.getSegmentDir("0", segmentId))
+
+      } catch {
+        // catch fault of executor side
+        case t: Throwable =>
+          val tablePath =
+            CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+          val segmentDir = tablePath.getSegmentDir("0", segmentId)
+          StreamSegment.recoverSegmentIfRequired(segmentDir)
+          LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
+          committer.abortJob(job)
+          throw new CarbonStreamException("Job failed to write data file", t)
+      }
+      committer.commitJob(job, result)
+      LOGGER.info(s"Job ${ job.getJobID } committed.")
+    }
+  }
+
+  /**
+   * execute a task for each partition to write a data file
+   */
+  def writeDataFileTask(
+      description: WriteDataFileJobDescription,
+      sparkStageId: Int,
+      sparkPartitionId: Int,
+      sparkAttemptNumber: Int,
+      committer: FileCommitProtocol,
+      iterator: Iterator[InternalRow]
+  ): TaskCommitMessage = {
+
+    val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the attempt context required to use in the output committer.
+    val taskAttemptContext: TaskAttemptContext = {
+      // Set up the configuration object
+      val hadoopConf = description.serializableHadoopConf.value
+      hadoopConf.set("mapred.job.id", jobId.toString)
+      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
+      hadoopConf.setBoolean("mapred.task.is.map", true)
+      hadoopConf.setInt("mapred.task.partition", 0)
+      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+    }
+
+    committer.setupTask(taskAttemptContext)
+
+    try {
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+
+        val parserName = taskAttemptContext.getConfiguration.get(
+          CarbonStreamParser.CARBON_STREAM_PARSER,
+          CarbonStreamParser.CARBON_STREAM_PARSER_DEFAULT)
+
+        val streamParser =
+          Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
+        streamParser.initialize(taskAttemptContext.getConfiguration)
+
+        StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
+          taskAttemptContext)
+      })(catchBlock = {
+        committer.abortTask(taskAttemptContext)
+        LOGGER.error(s"Job $jobId aborted.")
+      })
+      committer.commitTask(taskAttemptContext)
+    } catch {
+      case t: Throwable =>
+        throw new CarbonStreamException("Task failed while writing rows", t)
+    }
+  }
+
+  /**
+   * convert spark iterator to carbon iterator, so that java module can use it.
+   */
+  class InputIterator(rddIter: Iterator[InternalRow], streamParser: CarbonStreamParser)
+    extends CarbonIterator[Array[Object]] {
+
+    override def hasNext: Boolean = rddIter.hasNext
+
+    override def next: Array[Object] = {
+      streamParser.parserRow(rddIter.next())
+    }
+
+    override def close(): Unit = {
+      streamParser.close()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7393da9/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
new file mode 100644
index 0000000..80936d1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util
+import java.util.UUID
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.streaming.StreamingQueryListener
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+
+class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryListener {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  private val cache = new util.HashMap[UUID, ICarbonLock]()
+
+  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+    val qry = spark.streams.get(event.id).asInstanceOf[StreamExecution]
+    if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) {
+      LOGGER.info("Carbon streaming query started: " + event.id)
+      val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
+      val carbonTable = sink.carbonTable
+      val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getCarbonTableIdentifier,
+        LockUsage.STREAMING_LOCK)
+      if (lock.lockWithRetries()) {
+        LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." +
+                    carbonTable.getFactTableName)
+        cache.put(event.id, lock)
+      } else {
+        LOGGER.error("Not able to acquire the lock for stream table:" +
+                     carbonTable.getDatabaseName + "." + carbonTable.getFactTableName)
+        throw new InterruptedException(
+          "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
+          carbonTable.getFactTableName)
+      }
+    }
+  }
+
+  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+  }
+
+  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
+    val lock = cache.remove(event.id)
+    if (null != lock) {
+      LOGGER.info("Carbon streaming query: " + event.id)
+      lock.unlock()
+    }
+  }
+}


Mime
View raw message