carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-1882] select with group by and insertoverwrite to another carbon table
Date Sat, 16 Dec 2017 16:48:03 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 9a2b33ebd -> 4a7fc666c


[CARBONDATA-1882] select with group by and insertoverwrite to another carbon table

This closes #1641


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a7fc666
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a7fc666
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a7fc666

Branch: refs/heads/master
Commit: 4a7fc666c479dc3ce9b7c569eb0bbc3311dde49b
Parents: 9a2b33e
Author: kushalsaha <kushalsaha1988@gmail.com>
Authored: Mon Dec 11 21:11:21 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Sat Dec 16 22:16:28 2017 +0530

----------------------------------------------------------------------
 .../sdv/generated/DataLoadingTestCase.scala     |  46 +++----
 .../test/resources/overwriteTable1_noRecord.csv |   1 +
 .../resources/overwriteTable1_someRecord.csv    |   6 +
 .../test/resources/overwriteTable2_noRecord.csv |   1 +
 .../resources/overwriteTable2_someRecord.csv    |   4 +
 .../InsertIntoCarbonTableTestCase.scala         | 122 +++++++++++++++++++
 .../apache/spark/sql/test/util/QueryTest.scala  |   1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |  40 ++++--
 .../management/CarbonLoadDataCommand.scala      |   8 --
 .../processing/util/CarbonLoaderUtil.java       |  23 +++-
 10 files changed, 206 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
index c8c88e2..365547e 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
@@ -227,38 +227,42 @@ class DataLoadingTestCase extends QueryTest with BeforeAndAfterAll {
 
   //Data load-->Extra_Column_incsv
   test("BadRecord_Dataload_019", Include) {
-     sql(s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY
'org.apache.carbondata.format'""").collect
-  intercept[Exception] {
-    sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table
exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE',
'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='CUST_NAME,date')""").collect
-    checkAnswer(
-      s"""select count(*) from exceed_column_in_Csv """,
-      Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019")
-  }
-     sql(s"""drop table exceed_column_in_Csv """).collect
+    sql(
+      s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org.apache.carbondata.format'""".stripMargin).collect
+      sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table
exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE',
'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='CUST_NAME,date')""".stripMargin).collect
+      checkAnswer(s"""select count(*) from exceed_column_in_Csv """,Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019")
+      sql(s"""drop table exceed_column_in_Csv """).collect
   }
 
 
   //Data load-->Timestamp Exceed Range
   test("BadRecord_Dataload_020", Include) {
-     sql(s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata.format'""").collect
-    intercept[Exception] {
-      sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table
timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE',
'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='date')""").collect
-    }
-    checkAnswer(s"""select count(*) from timestamp_range""",
-      Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_020")
-     sql(s"""drop table timestamp_range""").collect
+    sql(
+      s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata.format'""".stripMargin).collect
+      sql(
+        s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table timestamp_range
OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='date')""".stripMargin).collect
+    checkAnswer(s"""select count(*) from timestamp_range""",Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_020")
+    sql(s"""drop table timestamp_range""").collect
   }
 
 
   //Show loads-->Delimeter_check
   test("BadRecord_Dataload_021", Include) {
-     sql(s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column
decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'""").collect
-  intercept[Exception] {
-    sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table
bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='FALSE',
'BAD_RECORDS_ACTION'='IGNORE','FILEHEADER'='String_col,integer_col,decimal_column,date,double_col')
""").collect
-  }
-    checkAnswer(s"""select count(*) from bad_records_test5""",
+    sql(
+      s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column
+         |decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'"""
+        .stripMargin)
+      .collect
+      sql(
+        s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table
+           |bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"',
+           |'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE',
+           |'FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """.stripMargin)
+        .collect
+    checkAnswer(
+      s"""select count(*) from bad_records_test5""",
       Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_021")
-     sql(s"""drop table bad_records_test5 """).collect
+    sql(s"""drop table bad_records_test5 """).collect
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv
b/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv
new file mode 100644
index 0000000..5aa5c94
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv
@@ -0,0 +1 @@
+id,name,salary

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv
b/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv
new file mode 100644
index 0000000..4f02689
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv
@@ -0,0 +1,6 @@
+id,name,salary
+1,hello,2300
+2,hi,2500
+3,xyz,4000
+4,xyz1,5000
+5,xyz2,6000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv
b/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv
new file mode 100644
index 0000000..a5da1cf
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv
@@ -0,0 +1 @@
+id,name,salary,age
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv
b/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv
new file mode 100644
index 0000000..3798b2a
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv
@@ -0,0 +1,4 @@
+id,name,salary,age
+9,abc,48,20
+10,abc1,90,21
+11,abc2,99,22
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 137e1cb..c9c8a59 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -18,12 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries
 
 import java.io.File
 
+import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -38,6 +41,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("drop table if exists THive")
     sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor
string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked
string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime
string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string,
deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string,
oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string,
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId
string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string,
Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions
st
 ring, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId
string, Latest_country string, Latest_province string, Latest_city string, Latest_district
string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion
string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string,
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion
string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription
string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY
','")
     sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive")
+    sql("drop table if exists OverwriteTable_t1")
+    sql("drop table if exists OverwriteTable_t2")
   }
 
   test("insert from hive") {
@@ -276,6 +281,121 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     }
     sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite
options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
     assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count())
+
+  }
+
+  test("insert overwrite in group by scenario with t1 no record and t2 no record") {
+    queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv")
+    sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98'
as age from OverwriteTable_t1 group by id,name,salary")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(!exists_t1)
+    assert(!exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select * from OverwriteTable_t2"),
+      Seq())
+    checkAnswer(sql("select * from OverwriteTable_t1"),
+      sql("select * from OverwriteTable_t2"))
+  }
+
+
+  test("insert overwrite in group by scenario with t1 no record and t2 some record") {
+    queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv")
+    sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98'
as age from OverwriteTable_t1 group by id,name,salary")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(!exists_t1)
+    assert(!exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select * from OverwriteTable_t2"),
+      Seq())
+    checkAnswer(sql("select * from OverwriteTable_t1"),
+      sql("select * from OverwriteTable_t2"))
+  }
+
+  test("insert overwrite in group by scenario having record in both table") {
+    queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv")
+    sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98'
as age from OverwriteTable_t1 group by id,name,salary")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(exists_t1)
+    assert(exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from
OverwriteTable_t2"))
+  }
+
+  test("insert overwrite in group by scenario t1 some record and t2 no record") {
+    queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_noRecord.csv")
+    sql("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98'
as age from OverwriteTable_t1 group by id,name,salary")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(exists_t1)
+    assert(exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from
OverwriteTable_t2"))
+  }
+
+  test("insert overwrite without group by scenario t1 no record and t2 no record") {
+    queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv")
+    sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98'
as age from OverwriteTable_t1")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(!exists_t1)
+    assert(!exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select * from OverwriteTable_t2"),
+      Seq())
+    checkAnswer(sql("select * from OverwriteTable_t1"),
+      sql("select * from OverwriteTable_t2"))
+  }
+
+
+  test("insert overwrite without group by scenario with t1 no record and t2 some record")
{
+    queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv")
+    sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98'
as age from OverwriteTable_t1")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(!exists_t1)
+    assert(!exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select * from OverwriteTable_t2"),
+      Seq())
+    checkAnswer(sql("select * from OverwriteTable_t1"),
+      sql("select * from OverwriteTable_t2"))
+  }
+
+  test("insert overwrite without group by scenario having record in both table") {
+    queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv")
+    sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98'
as age from OverwriteTable_t1")
+    val exists_t1 = checkSegment("OverwriteTable_t1")
+    val exists_t2 = checkSegment("OverwriteTable_t2")
+    assert(exists_t1)
+    assert(exists_t2)
+    assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
+    checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from
OverwriteTable_t2"))
+  }
+
+  private def queryExecution(csvFileName1: String , csvFileName2: String) : Unit ={
+    sql("drop table if exists OverwriteTable_t1")
+    sql("drop table if exists OverwriteTable_t2")
+    sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata'
")
+    sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName1' INTO table OverwriteTable_t1")
+    sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored
by 'carbondata'")
+    sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName2' INTO table OverwriteTable_t2")
+  }
+
+
+
+  private def checkSegment(tableName: String) : Boolean ={
+    val storePath_t1 = metastoredb + s"/warehouse/${tableName.toLowerCase()}/Fact/Part0"
+    val carbonFile_t1: CarbonFile = FileFactory
+      .getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1))
+    var exists: Boolean = carbonFile_t1.exists()
+    if (exists) {
+      val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles()
+      exists = listFiles.size > 0
+    }
+    exists
   }
 
   test("test show segments after clean files for insert overwrite") {
@@ -309,6 +429,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("DROP TABLE IF EXISTS student")
     sql("DROP TABLE IF EXISTS uniqdata")
     sql("DROP TABLE IF EXISTS show_insert")
+    sql("drop table if exists OverwriteTable_t1")
+    sql("drop table if exists OverwriteTable_t2")
 
     if (timeStampPropOrig != null) {
       CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index c01cfef..0079d1e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -104,6 +104,7 @@ class QueryTest extends PlanTest {
   lazy val storeLocation = CarbonProperties.getInstance().
     getProperty(CarbonCommonConstants.STORE_LOCATION)
   val resourcesPath = TestQueryExecutor.resourcesPath
+  val metastoredb = TestQueryExecutor.metastoredb
   val integrationPath = TestQueryExecutor.integrationPath
   val dblocation = TestQueryExecutor.location
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 36a2e09..4933a45 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -376,7 +376,17 @@ object CarbonDataRDDFactory {
               }
           }
         } else {
-          loadStatus = SegmentStatus.LOAD_FAILURE
+          // if no value is there in data load, make load status Success
+          // and data load flow executes
+          if (dataFrame.isDefined && updateModel.isEmpty) {
+            val rdd = dataFrame.get.rdd
+            if (rdd.partitions == null || rdd.partitions.length == 0) {
+              LOGGER.warn("DataLoading finished. No data was loaded.")
+              loadStatus = SegmentStatus.SUCCESS
+            }
+          } else {
+            loadStatus = SegmentStatus.LOAD_FAILURE
+          }
         }
 
         if (loadStatus != SegmentStatus.LOAD_FAILURE &&
@@ -486,20 +496,21 @@ object CarbonDataRDDFactory {
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
         throw new Exception(status(0)._2._2.errorMsg)
       }
-      // if segment is empty then fail the data load
+      // as no record loaded in new segment, new segment should be deleted
+      var newEntryLoadStatus =
       if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
           !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt))
{
-        // update the load entry in table status file for changing the status to marked for
delete
-        CommonUtil.updateTableStatusForFailure(carbonLoadModel)
-        LOGGER.info("********starting clean up**********")
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-        LOGGER.info("********clean up done**********")
+
         LOGGER.audit(s"Data load is failed for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}" +
                      " as there is no data to load")
         LOGGER.warn("Cannot write load metadata file as data load failed")
-        throw new Exception("No Data to load")
+
+        SegmentStatus.MARKED_FOR_DELETE
+      } else {
+        loadStatus
       }
+
       writeDictionary(carbonLoadModel, result, writeAll = false)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
         LoadTablePreStatusUpdateEvent(
@@ -508,7 +519,13 @@ object CarbonDataRDDFactory {
         carbonLoadModel)
       operationContext.setProperty("isOverwrite", overwriteTable)
       OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-      val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
+      val done =
+        updateTableStatus(
+          status,
+          carbonLoadModel,
+          loadStatus,
+          newEntryLoadStatus,
+          overwriteTable)
       if (!done) {
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
@@ -769,17 +786,18 @@ object CarbonDataRDDFactory {
       status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
       carbonLoadModel: CarbonLoadModel,
       loadStatus: SegmentStatus,
+      newEntryLoadStatus: SegmentStatus,
       overwriteTable: Boolean
   ): Boolean = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val metadataDetails = if (status != null && status(0) != null) {
+    val metadataDetails = if (status != null && status.size > 0 && status(0)
!= null) {
       status(0)._2._1
     } else {
       new LoadMetadataDetails
     }
     CarbonLoaderUtil.populateNewLoadMetaEntry(
       metadataDetails,
-      loadStatus,
+      newEntryLoadStatus,
       carbonLoadModel.getFactTimeStamp,
       true)
     CarbonUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 0affe79..0c6aeda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -65,14 +65,6 @@ case class CarbonLoadDataCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    if (dataFrame.isDefined && updateModel.isEmpty) {
-      val rdd = dataFrame.get.rdd
-      if (rdd.partitions == null || rdd.partitions.length == 0) {
-        LOGGER.warn("DataLoading finished. No data was loaded.")
-        return Seq.empty
-      }
-    }
-
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a7fc666/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 6ac8307..b0c690b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -236,17 +236,18 @@ public final class CarbonLoaderUtil {
                 entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
                 // For insert overwrite, we will delete the old segment folder immediately
                 // So collect the old segments here
-                String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
-                // add to the deletion list only if file exist else HDFS file system will
throw
-                // exception while deleting the file if file path does not exist
-                if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-                  staleFolders.add(FileFactory.getCarbonFile(path));
-                }
+                addToStaleFolders(carbonTablePath, staleFolders, entry);
               }
             }
           }
           listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
         }
+        // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
+        // so empty segment folder should be deleted
+        if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) {
+          addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
+        }
+
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
             .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
         // Delete all old stale segment folders
@@ -278,6 +279,16 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
+  private static void addToStaleFolders(CarbonTablePath carbonTablePath,
+      List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException
{
+    String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
+    // add to the deletion list only if file exist else HDFS file system will throw
+    // exception while deleting the file if file path does not exist
+    if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+      staleFolders.add(FileFactory.getCarbonFile(path));
+    }
+  }
+
   /**
    * Method to create new entry for load in table status file
    *


Mime
View raw message