carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajan...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-4066] data mismatch observed with SI and without SI when SI global sort and SI segment merge is true
Date Tue, 01 Dec 2020 16:02:25 GMT
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 09be330  [CARBONDATA-4066] data mismatch observed with SI and without SI when SI
global sort and SI segment merge is true
09be330 is described below

commit 09be330c2785216ae19f510185b50f7e10b7b170
Author: Mahesh Raju Somalaraju <mahesh.somalaraju@huawei.com>
AuthorDate: Tue Dec 1 16:42:42 2020 +0530

    [CARBONDATA-4066] data mismatch observed with SI and without SI when SI global sort and
SI segment merge is true
    
    Why is this PR needed?
    data mismatch observed with SI and without SI when SI global sort and SI segment merge
is true. After merge si data files position reference is also sorted and due to this pointing
to wrong position reference causing data mismatch with SI and without SI
    
    What changes were proposed in this PR?
    no need to calculate the position references after data files merge should use existed
position reference column from SI table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4033
---
 .../TestSIWithComplexArrayType.scala               | 119 +++++++++++++++++++++
 .../secondaryindex/util/SecondaryIndexUtil.scala   |  11 +-
 .../spark/src/test/resources/secindex/array.csv    |   8 +-
 3 files changed, 126 insertions(+), 12 deletions(-)

diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
index 53ac7a2..f1156be 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -20,17 +20,27 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
 
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 
 class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
   // scalastyle:off lineLength
   override def beforeEach(): Unit = {
     sql("drop table if exists complextable")
+    sql("drop table if exists complextable2")
+    sql("drop table if exists complextable3")
+    sql("drop table if exists complextable4")
+    sql("drop table if exists complextable5")
   }
 
   override def afterEach(): Unit = {
     sql("drop index if exists index_1 on complextable")
     sql("drop table if exists complextable")
+    sql("drop table if exists complextable2")
+    sql("drop table if exists complextable3")
+    sql("drop table if exists complextable4")
+    sql("drop table if exists complextable5")
   }
 
   test("test array<string> on secondary index") {
@@ -100,6 +110,115 @@ class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach
{
     checkAnswer(result, df)
   }
 
+  test("test SI global sort with si segment merge enabled for complex data types") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+    sql("create table complextable2 (id int, name string, country array<string>) stored
as " +
+      "carbondata tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table complextable2 options('delimiter'=',',"
+
+        "'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable2 where array_contains(country,'china')")
+    sql("create index index_2 on table complextable2(country) as 'carbondata' properties"
+
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable2 where array_contains(country,'china')"),
+      sql("select count(*) from complextable2 where ni(array_contains(country,'china'))"))
+    val df = sql(" select * from complextable2 where array_contains(country,'china')")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+  }
+
+  test("test SI global sort with si segment merge enabled for primitive data types") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+    sql("create table complextable3 (id int, name string, country array<string>) stored
as " +
+      "carbondata tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table complextable3 options('delimiter'=',',"
+
+        "'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+        "'global_sort_partitions'='10')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table complextable3 options('delimiter'=',',"
+
+        "'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable3 where name='abc'")
+    sql("create index index_3 on table complextable3(name) as 'carbondata' properties" +
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable3 where name='abc'"),
+      sql("select count(*) from complextable3 where ni(name='abc')"))
+    val df = sql(" select * from complextable3 where name='abc'")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+  }
+
+  test("test SI global sort with si segment merge complex data types by rebuild command")
{
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+    sql("create table complextable4 (id int, name string, country array<string>) stored
as " +
+      "carbondata tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table complextable4 options('delimiter'=',',"
+
+        "'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable4 where array_contains(country,'china')")
+    sql("create index index_4 on table complextable4(country) as 'carbondata' properties"
+
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable4 where array_contains(country,'china')"),
+      sql("select count(*) from complextable4 where ni(array_contains(country,'china'))"))
+    sql("REFRESH INDEX index_4 ON TABLE complextable4")
+    checkAnswer(sql("select count(*) from complextable4 where array_contains(country,'china')"),
+      sql("select count(*) from complextable4 where ni(array_contains(country,'china'))"))
+    val df = sql(" select * from complextable4 where array_contains(country,'china')")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test SI global sort with si segment merge primitive data types by rebuild command")
{
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+    sql("create table complextable5 (id int, name string, country array<string>) stored
as " +
+      "carbondata tblproperties('sort_scope'='global_sort','sort_columns'='name')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table complextable5 options('delimiter'=',',"
+
+        "'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+        "'global_sort_partitions'='10')")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table complextable5 options('delimiter'=',',"
+
+        "'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$',"
+
+        "'global_sort_partitions'='10')")
+    val result = sql(" select * from complextable5 where name='abc'")
+    sql("create index index_5 on table complextable5(name) as 'carbondata' properties" +
+      "('sort_scope'='global_sort')")
+    checkAnswer(sql("select count(*) from complextable5 where name='abc'"),
+      sql("select count(*) from complextable5 where ni(name='abc')"))
+    sql("REFRESH INDEX index_5 ON TABLE complextable5")
+    checkAnswer(sql("select count(*) from complextable5 where name='abc'"),
+      sql("select count(*) from complextable5 where ni(name='abc')"))
+    val df = sql(" select * from complextable5 where name='abc'")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
   test("test si creation with struct and map type") {
     sql("create table complextable (country struct<b:string>, name string, id Map<string,
string>, arr1 array<string>, arr2 array<string>) stored as carbondata")
     intercept[RuntimeException] {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 3506aac..c44e359 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -667,10 +667,6 @@ object SecondaryIndexUtil {
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
-    val proj = indexCarbonTable.getCreateOrderColumn
-      .asScala
-      .map(_.getColName)
-      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
     var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
     val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
     val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
@@ -682,10 +678,9 @@ object SecondaryIndexUtil {
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
     validSegments.foreach { segment =>
       outputModel.setSegmentId(segment.getSegmentNo)
-      val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(sparkSession,
-        indexCarbonTable,
-        proj.mkString(","),
-        Array(segment.getSegmentNo))
+      val dataFrame = SparkSQLUtil.createInputDataFrame(
+        sparkSession,
+        indexCarbonTable)
       SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
       val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
       segList.add(segment)
diff --git a/integration/spark/src/test/resources/secindex/array.csv b/integration/spark/src/test/resources/secindex/array.csv
index 7fbc89a..d739415 100644
--- a/integration/spark/src/test/resources/secindex/array.csv
+++ b/integration/spark/src/test/resources/secindex/array.csv
@@ -1,4 +1,4 @@
-1,'abc',china$india$us
-2,'xyz',sri$can
-3,'mno',rus$china
-4,'lok',hk$bang
\ No newline at end of file
+1,abc,china$india$us
+2,xyz,sri$can
+3,mno,rus$china
+4,lok,hk$bang


Mime
View raw message