carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: Fix adding partition information while doing refresh(restore) table. And fix the case sensitivity issue of partition column
Date Fri, 05 Jan 2018 09:52:20 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 08b8af7b0 -> 2dbe6c975


Fix adding partition information while doing refresh(restore) table. And fix the case sensitivity
issue of partition column

Fix comments


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2dbe6c97
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2dbe6c97
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2dbe6c97

Branch: refs/heads/master
Commit: 2dbe6c975b9aec3769750556b4b82a11930ea07d
Parents: 08b8af7
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Thu Jan 4 00:34:33 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Jan 5 17:51:49 2018 +0800

----------------------------------------------------------------------
 .../StandardPartitionTableLoadingTestCase.scala | 70 ++++++++++++++++++++
 .../management/RefreshCarbonTableCommand.scala  | 52 ++++++++++++++-
 .../strategy/CarbonLateDecodeStrategy.scala     | 13 +++-
 3 files changed, 131 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbe6c97/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 72e464e..b399138 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -16,6 +16,9 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import java.io.{File, IOException}
+
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.scalatest.BeforeAndAfterAll
@@ -325,6 +328,71 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     checkAnswer(sql(s"select count(*) from emp1"), rows)
   }
 
+  test("test restore partition table") {
+    sql(
+      """
+        | CREATE TABLE restorepartition (doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE restorepartition""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE restorepartition
PARTITION(empno='99', empname='ravi', designation='xx')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE restorepartition
PARTITION(empno='100', empname='indra', designation='yy')""")
+    val rows = sql("select count(*) from restorepartition").collect()
+    val partitions = sql("show partitions restorepartition").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_restorepartition")
+    val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/"))
+    backUpData(dblocation, "restorepartition")
+    sql("drop table restorepartition")
+    restoreData(dblocation, "restorepartition")
+    sql("refresh table restorepartition")
+    checkAnswer(sql("select count(*) from restorepartition"), rows)
+    checkAnswer(sql("show partitions restorepartition"), partitions)
+  }
+
+  test("test case sensitive on partition columns") {
+    sql(
+      """
+        | CREATE TABLE casesensitivepartition (doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empNo int, empName String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE casesensitivepartition""")
+    checkAnswer(sql("select * from  casesensitivepartition where empNo=17"),
+      sql("select * from  casesensitivepartition where empno=17"))
+  }
+
+  def restoreData(dblocation: String, tableName: String) = {
+    val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      FileUtils.copyDirectory(new File(source), new File(destination))
+      FileUtils.deleteDirectory(new File(source))
+    } catch {
+      case e : Exception =>
+        throw new IOException("carbon table data restore failed.")
+    } finally {
+
+    }
+  }
+  def backUpData(dblocation: String, tableName: String) = {
+    val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val destination = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      FileUtils.copyDirectory(new File(source), new File(destination))
+    } catch {
+      case e : Exception =>
+        throw new IOException("carbon table data backup failed.", e)
+    }
+  }
+
 
   override def afterAll = {
     dropTable
@@ -347,6 +415,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists loadstaticpartitiononeissue")
     sql("drop table if exists loadpartitionwithspecialchar")
     sql("drop table if exists emp1")
+    sql("drop table if exists restorepartition")
+    sql("drop table if exists casesensitivepartition")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbe6c97/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 45ed298..2983ea4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -22,16 +22,20 @@ import java.util
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier,
PartitionMapFileStore}
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent,
RefreshTablePreExecutionEvent}
 import org.apache.carbondata.hadoop.util.SchemaReader
 
@@ -91,6 +95,11 @@ case class RefreshCarbonTableCommand(
           registerAggregates(databaseName, dataMapSchemaList)(sparkSession)
         }
         registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession)
+        // Register partitions to hive metastore in case of hive partitioning carbon table
+        if (tableInfo.getFactTable.getPartitionInfo != null &&
+            tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE)
{
+          registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession)
+        }
       } else {
         LOGGER.audit(
           s"Table registration with Database name [$databaseName] and Table name [$tableName]
" +
@@ -205,4 +214,41 @@ case class RefreshCarbonTableCommand(
       }
     })
   }
+
+  /**
+   * Read all the partition information which is stored in each segment and add to
+   * the hive metastore
+   */
+  private def registerAllPartitionsToHive(
+      absIdentifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Unit = {
+    val metadataDetails =
+      SegmentStatusManager.readLoadMetadata(
+        CarbonTablePath.getMetadataPath(absIdentifier.getTablePath))
+    // First read all partition information from each segment.
+    val allpartitions = metadataDetails.map{ metadata =>
+      if (metadata.getSegmentStatus == SegmentStatus.SUCCESS ||
+          metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+        val mapper = new PartitionMapFileStore()
+        mapper.readAllPartitionsOfSegment(
+          CarbonTablePath.getSegmentPath(absIdentifier.getTablePath, metadata.getLoadName))
+        Some(mapper.getPartitionMap.values().asScala)
+      } else {
+        None
+      }
+    }.filter(_.isDefined).map(_.get)
+    val identifier =
+      TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName))
+    // Register the partition information to the hive metastore
+    allpartitions.foreach { segPartitions =>
+      val specs: Seq[TablePartitionSpec] = segPartitions.map { indexPartitions =>
+        indexPartitions.asScala.map{ p =>
+          val spec = p.split("=")
+          (spec(0), spec(1))
+        }.toMap
+      }.toSeq
+      // Add partition information
+      AlterTableAddPartitionCommand(identifier, specs.map((_, None)), true).run(sparkSession)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbe6c97/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index ad519e6..6ef3d47 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -152,9 +152,20 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
       val partitionKeyFilters =
         ExpressionSet(ExpressionSet(filterPredicates).filter(_.references.subsetOf(partitionSet)))
+      // Update the name with lower case as it is case sensitive while getting partition
info.
+      val updatedPartitionFilters = partitionKeyFilters.map { exp =>
+        exp.transform {
+          case attr: AttributeReference =>
+            AttributeReference(
+              attr.name.toLowerCase,
+              attr.dataType,
+              attr.nullable,
+              attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+        }
+      }
       partitions =
         CarbonFilters.getPartitions(
-          partitionKeyFilters.toSeq,
+          updatedPartitionFilters.toSeq,
           SparkSession.getActiveSession.get,
           relation.catalogTable.get.identifier)
     }


Mime
View raw message