carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2274]fixed Partition table having more than 4 column giving zero record
Date Fri, 30 Mar 2018 14:40:27 GMT
Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 97e7eca64 -> c10353070


[CARBONDATA-2274]fixed Partition table having more than 4 column giving zero record


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b07db43
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b07db43
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b07db43

Branch: refs/heads/branch-1.3
Commit: 4b07db4337dda9b7e9642d87ae3c75f0c7db09f7
Parents: 97e7eca
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Fri Mar 23 20:19:43 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Fri Mar 30 20:12:28 2018 +0530

----------------------------------------------------------------------
 .../StandardPartitionTableLoadingTestCase.scala | 21 ++++++++++++++++++++
 .../carbondata/spark/util/CarbonScalaUtil.scala | 18 ++++++++++-------
 .../datasources/CarbonFileFormat.scala          | 18 +++++++++++------
 3 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b07db43/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index c8f7be3..e577fc6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -137,6 +137,26 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname,
deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary
from originTable order by empno"))
   }
 
+  test("data loading for partition table for five partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionfive (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int)
+        | PARTITIONED BY (utilization int,salary int,workgroupcategory int, empname String,
+        | designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionfive OPTIONS('DELIMITER'=
',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_partitionfive", "0", 10)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname,
deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary
from partitionfive order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname,
deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary
from originTable order by empno"))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname,
deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary
from partitionfive where empno>15 order by empno "),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname,
deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary
from originTable where empno>15 order by empno"))
+  }
 
   test("multiple data loading for partition table for three partition column") {
     sql(
@@ -523,6 +543,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists partitionone")
     sql("drop table if exists partitiontwo")
     sql("drop table if exists partitionthree")
+    sql("drop table if exists partitionfive")
     sql("drop table if exists partitionmultiplethree")
     sql("drop table if exists insertpartitionthree")
     sql("drop table if exists staticpartitionone")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b07db43/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 773ea16..3e6aded 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -23,6 +23,8 @@ import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import scala.collection.mutable
+
 import com.univocity.parsers.common.TextParsingException
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
@@ -337,13 +339,12 @@ object CarbonScalaUtil {
    * Update partition values as per the right date and time format
    * @return updated partition spec
    */
-  def updatePartitions(
-      partitionSpec: Map[String, String],
-  table: CarbonTable): Map[String, String] = {
+  def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
+      table: CarbonTable): mutable.LinkedHashMap[String, String] = {
     val cacheProvider: CacheProvider = CacheProvider.getInstance
     val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
       cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
-    partitionSpec.map{ case (col, pvalue) =>
+    partitionSpec.map { case (col, pvalue) =>
       // replace special string with empty value.
       val value = if (pvalue == null) {
         hivedefaultpartition
@@ -383,11 +384,14 @@ object CarbonScalaUtil {
   def updatePartitions(
       parts: Seq[CatalogTablePartition],
       table: CarbonTable): Seq[CatalogTablePartition] = {
-    parts.map{ f =>
+    parts.map { f =>
+      val specLinkedMap: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap
+        .empty[String, String]
+      f.spec.foreach(fSpec => specLinkedMap.put(fSpec._1, fSpec._2))
       val changedSpec =
         updatePartitions(
-          f.spec,
-          table)
+          specLinkedMap,
+          table).toMap
       f.copy(spec = changedSpec)
     }.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b07db43/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index bff65be..79b0ac4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -280,10 +280,14 @@ private class CarbonOutputWriter(path: String,
     }
   }
   var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+    val linkedMap = mutable.LinkedHashMap[String, String]()
     val updatedPartitions = partitions.map(splitPartition)
-    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+    updatedPartitions.foreach {
+      case (k, v) => linkedMap.put(k, v)
+    }
+    (linkedMap, updatePartitions(updatedPartitions.map(_._2)))
   } else {
-    (Map.empty[String, String].toArray, Array.empty)
+    (mutable.LinkedHashMap.empty[String, String], Array.empty)
   }
 
   private def splitPartition(p: String) = {
@@ -309,8 +313,10 @@ private class CarbonOutputWriter(path: String,
       val index = currPartitions.indexOf(writeSpec)
       if (index > -1) {
         val spec = currPartitions.get(index)
-        updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
-        partitionData = updatePartitions(updatedPartitions.map(_._2))
+        spec.getPartitions.asScala.map(splitPartition).foreach {
+          case (k, v) => updatedPartitions.put(k, v)
+        }
+        partitionData = updatePartitions(updatedPartitions.map(_._2).toSeq)
       }
     }
     updatedPath
@@ -397,7 +403,7 @@ private class CarbonOutputWriter(path: String,
     val formattedPartitions =
     // All dynamic partitions need to be converted to proper format
       CarbonScalaUtil.updatePartitions(
-        updatedPartitions.toMap,
+        updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]],
         model.getCarbonDataLoadSchema.getCarbonTable)
     formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
     SegmentFileStore.writeSegmentFile(
@@ -415,7 +421,7 @@ private class CarbonOutputWriter(path: String,
       val formattedPartitions =
       // All dynamic partitions need to be converted to proper format
         CarbonScalaUtil.updatePartitions(
-          updatedPartitions.toMap,
+          updatedPartitions.asInstanceOf[mutable.LinkedHashMap[String, String]],
           model.getCarbonDataLoadSchema.getCarbonTable)
       val partitionstr = formattedPartitions.map{p =>
         ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)


Mime
View raw message