carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2674][Streaming]Streaming with merge index enabled does not consider the merge index file while pruning
Date Tue, 03 Jul 2018 08:40:13 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master dac5d3ce3 -> 12c28c946


[CARBONDATA-2674][Streaming]Streaming with merge index enabled does not consider the merge
index file while pruning

This closes #2429


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/12c28c94
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/12c28c94
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/12c28c94

Branch: refs/heads/master
Commit: 12c28c9462550359b849e736323988b6be137a1d
Parents: dac5d3c
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Fri Jun 29 16:14:08 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Jul 3 16:39:50 2018 +0800

----------------------------------------------------------------------
 .../hadoop/api/CarbonTableInputFormat.java      | 20 ++++----
 .../TestStreamingTableWithRowParser.scala       | 53 ++++++++++++++++++++
 2 files changed, 62 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/12c28c94/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index b549b16..bd6b775 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -31,10 +31,10 @@ import java.util.Map;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.SchemaReader;
@@ -341,20 +341,18 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T>
{
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
-        String segmentDir = CarbonTablePath.getSegmentPath(
-            identifier.getTablePath(), segment.getSegmentNo());
+        String segmentDir =
+            CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
-          String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
-          String indexPath = segmentDir + File.separator + indexName;
-          CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
-          // index file exists
-          if (index.exists()) {
-            // data file exists
-            CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+          SegmentIndexFileStore segmentIndexFileStore = new SegmentIndexFileStore();
+          segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
+          Map<String, byte[]> carbonIndexMap = segmentIndexFileStore.getCarbonIndexMap();
+          CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
+          for (byte[] fileData : carbonIndexMap.values()) {
+            indexReader.openThriftReader(fileData);
             try {
               // map block index
-              indexReader.openThriftReader(indexPath);
               while (indexReader.hasNext()) {
                 BlockIndex blockIndex = indexReader.readBlockIndexInfo();
                 String filePath = segmentDir + File.separator + blockIndex.getFile_name();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/12c28c94/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 39d63bf..410853b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -32,9 +32,12 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
 case class FileElement(school: Array[String], age: Integer)
 case class StreamData(id: Integer, name: String, city: String, salary: java.lang.Float,
@@ -61,6 +64,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
     dropTable()
 
     createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true)
+    createTable(tableName = "stream_table_with_mi", streaming = true, withBatchLoad = true)
 
     createTableWithComplexType(
       tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true)
@@ -74,6 +78,8 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
 
   def dropTable(): Unit = {
     sql("drop table if exists streaming1.stream_table_filter")
+    sql("drop table if exists streaming1.stream_table_with_mi")
+
     sql("drop table if exists streaming1.stream_table_filter_complex")
   }
 
@@ -405,6 +411,53 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll
{
     }
 
   }
+  test("query on stream table with dictionary, sort_columns, with merge index applied") {
+    executeStreamingIngest(
+      tableName = "stream_table_with_mi",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      generateBadRecords = true,
+      badRecordAction = "force",
+      autoHandoff = false
+    )
+    val carbonTable: CarbonTable = CarbonMetadata.getInstance
+      .getCarbonTable("streaming1", "stream_table_with_mi")
+    new CarbonIndexFileMergeWriter(carbonTable)
+      .mergeCarbonIndexFilesOfSegment("1", carbonTable.getTablePath, false)
+    // non-filter
+    val result = sql("select * from streaming1.stream_table_with_mi order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(1).isNullAt(0))
+    assert(result(1).getString(1) == "name_6")
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+
+    // filter
+    checkAnswer(
+      sql("select * from stream_table_with_mi where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_with_mi where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_with_mi where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"),
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"))))
+
+    checkAnswer(
+      sql("select * from stream_table_with_mi where name in ('name_9','name_10', 'name_11',
'name_12') and id <> 10 and id not in (11, 12)"),
+      Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"),
Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"))))
+
+  }
 
   test("query on stream table with dictionary, sort_columns and complex column") {
     executeStreamingIngest(


Mime
View raw message