carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3506]Fix alter table failures on parition table with hive.metastore.disallow.incompatible.col.type.changes as true
Date Thu, 12 Sep 2019 07:26:28 GMT
This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 26f2c77  [CARBONDATA-3506]Fix alter table failures on parition table with hive.metastore.disallow.incompatible.col.type.changes
as true
26f2c77 is described below

commit 26f2c778e5b8c10b2249862877250afdd0062a41
Author: akashrn5 <akashnilugal@gmail.com>
AuthorDate: Wed Aug 28 12:05:13 2019 +0530

    [CARBONDATA-3506]Fix alter table failures on parition table with hive.metastore.disallow.incompatible.col.type.changes
as true
    
    Problem:
    In case of spark2.2 and above and , when we call alterExternalCatalogForTableWithUpdatedSchema
to update the new schema to external catalog
    in case of add column, spark gets the catalog table and then it itself adds the partition
columns if the table is partition table for all the
    new data schema sent by carbon, so there will be duplicate partition columns, so validation
fails in hive
    When the table has only two columns and one of them is partition column, then dropping
non partition column is invalid because,
     if we allow it is like table with all columns as partition columns. So with the above
property as true, drop column will fail to update the hive metastore.
    in spark2.2 and above if the datatype change is done on partition column, with the above
property as true, it also fails,
     as we are not sending partition column for schema alter in hive
    
    Solution:
    when sending the new schema to spark to update in catalog, do not send the partition columns
in case of spark2.2 and above,
    as spark will take care of adding parition columns to new schema sent by us.
    In the above scenario of drop, do not allow drop column, if after dropping the specific
column, if table has only partition columns.
    Block the operation on datatype change on partition column on spark2.2 and above.
    
    This closes #3367
---
 .../StandardPartitionTableQueryTestCase.scala      | 29 +++++++++++++----
 .../schema/CarbonAlterTableAddColumnCommand.scala  | 20 +++++++++---
 ...nAlterTableColRenameDataTypeChangeCommand.scala | 36 +++++++++++++++++++---
 .../schema/CarbonAlterTableDropColumnCommand.scala | 35 +++++++++++++++++----
 4 files changed, 99 insertions(+), 21 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index c19c0b9..fb4b511 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -21,8 +21,10 @@ import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
@@ -439,18 +441,32 @@ test("Creation of partition table should fail if the colname in table
schema and
 
   test("validate data in partition table after dropping and adding a column") {
     sql("drop table if exists par")
-    sql("create table par(name string) partitioned by (age double) stored by " +
+    sql("create table par(name string, add string) partitioned by (age double) stored by
" +
               "'carbondata' TBLPROPERTIES('cache_level'='blocklet')")
-    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options"
+
-        s"('header'='false')")
+    sql("insert into par select 'joey','NY',32 union all select 'chandler','NY',32")
     sql("alter table par drop columns(name)")
     sql("alter table par add columns(name string)")
-    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options"
+
-        s"('header'='false')")
-    checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null), Row(null)))
+    sql("insert into par select 'joey','NY',32 union all select 'joey','NY',32")
+    checkAnswer(sql("select name from par"), Seq(Row("NY"),Row("NY"), Row(null), Row(null)))
     sql("drop table if exists par")
   }
 
+  test("test drop column when after dropping only partition column remains and datatype change
on partition column") {
+    sql("drop table if exists onlyPart")
+    sql("create table onlyPart(name string) partitioned by (age int) stored by " +
+        "'carbondata' TBLPROPERTIES('cache_level'='blocklet')")
+    val ex1 = intercept[MalformedCarbonCommandException] {
+      sql("alter table onlyPart drop columns(name)")
+    }
+    assert(ex1.getMessage.contains("alter table drop column is failed, cannot have the table
with all columns as partition column"))
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val ex2 = intercept[MalformedCarbonCommandException] {
+        sql("alter table onlyPart change age age bigint")
+      }
+      assert(ex2.getMessage.contains("Alter datatype of the partition column age is not allowed"))
+    }
+    sql("drop table if exists onlyPart")
+  }
 
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
@@ -488,6 +504,7 @@ test("Creation of partition table should fail if the colname in table
schema and
     sql("drop table if exists staticpartitionextlocload_new")
     sql("drop table if exists staticpartitionlocloadother_new")
     sql("drop table if exists par")
+    sql("drop table if exists onlyPart")
   }
 
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 3fab2fb..6b0d709 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator,
MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -102,12 +102,24 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
+      // In case of spark2.2 and above and , when we call
+      // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external
catalog
+      // in case of add column, spark gets the catalog table and then it itself adds the
partition
+      // columns if the table is partition table for all the new data schema sent by carbon,
+      // so there will be duplicate partition columns, so send the columns without partition
columns
+      val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable)
{
+        val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+        val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains
+        (col))
+        Some(carbonColumnsWithoutPartition ++ sortedColsBasedActualSchemaOrder)
+      } else {
+        Some(carbonColumns ++ sortedColsBasedActualSchemaOrder)
+      }
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
-        tableIdentifier, schemaParts, Some(carbonColumns ++ sortedColsBasedActualSchemaOrder))
+        tableIdentifier, schemaParts, cols)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
-        new AlterTableAddColumnPostEvent(sparkSession,
-          carbonTable, alterTableAddColumnsModel)
+        AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel)
       OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
       LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
     } catch {
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 49f651e..d3d63eb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -21,10 +21,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo,
-  MetadataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo,
MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -162,6 +161,18 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
         isDataTypeChange = true
       }
       if (isDataTypeChange) {
+        // if column datatype change operation is on partition column, then fail the datatype
change
+        // operation
+        if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo)
{
+          val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList
+          partitionColumns.asScala.foreach {
+            col =>
+              if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
+                throw new MalformedCarbonCommandException(
+                  s"Alter datatype of the partition column $newColumnName is not allowed")
+              }
+          }
+        }
         validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
           oldCarbonColumn.head)
       }
@@ -257,7 +268,7 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
    * @param carbonTable              carbonTable
    * @param tableInfo                tableInfo
    * @param addColumnSchema          added column schema
-   * @param schemaEvolutionEntryList new SchemaEvolutionEntry
+   * @param schemaEvolutionEntry     new SchemaEvolutionEntry
    */
   private def updateSchemaAndRefreshTable(sparkSession: SparkSession,
       carbonTable: CarbonTable,
@@ -275,12 +286,27 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
     // update the schema changed column at the specific index in carbonColumns based on schema
order
     carbonColumns
       .update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
+    // In case of spark2.2 and above and , when we call
+    // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external
catalog
+    // in case of rename column or change datatype, spark gets the catalog table and then
it itself
+    // adds the partition columns if the table is partition table for all the new data schema
sent
+    // by carbon, so there will be duplicate partition columns, so send the columns without
+    // partition columns
+    val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+                      carbonTable.isHivePartitionTable) {
+      val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+      val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains(
+        col))
+      Some(carbonColumnsWithoutPartition)
+    } else {
+      Some(carbonColumns)
+    }
     val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
       carbonTable,
       schemaEvolutionEntry,
       tableInfo)(sparkSession)
     sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-      .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, Some(carbonColumns))
+      .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, columns)
     sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
   }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 31cfdaf..8a2e837 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -63,6 +63,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           "alter table drop column is not supported for index datamap")
       }
       val partitionInfo = carbonTable.getPartitionInfo(tableName)
+      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       if (partitionInfo != null) {
         val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
           .map(_.getColumnName)
@@ -74,9 +75,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           throwMetadataException(dbName, tableName, "Partition columns cannot be dropped:
" +
                                                   s"$partitionColumns")
         }
+
+        // this check is added because, when table have only two columns, one is partition
and one
+        // is non partition, then dropping one column means, having table with only partition
+        // column, which is wrong
+        if (tableColumns.filterNot(col => alterTableDropColumnModel.columns
+          .contains(col.getColName)).map(_.getColName).equals(partitionColumnSchemaList))
{
+          throw new MalformedCarbonCommandException(
+            "alter table drop column is failed, cannot have the table with all columns as
" +
+            "partition columns")
+        }
       }
 
-      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
       .ColumnSchema]()
       // TODO: if deleted column list includes bucketted column throw an error
@@ -139,11 +149,24 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         schemaEvolutionEntry,
         tableInfo)(sparkSession)
       // get the columns in schema order and filter the dropped column in the column set
-      val cols = Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
-        .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema}
-        .filterNot(column => delCols.contains(column)))
+      val cols = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
+        .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema
}
+        .filterNot(column => delCols.contains(column))
+      // In case of spark2.2 and above and , when we call
+      // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external
catalog
+      // in case of drop column, spark gets the catalog table and then it itself adds the
partition
+      // columns if the table is partition table for all the new data schema sent by carbon,
+      // so there will be duplicate partition columns, so send the columns without partition
columns
+      val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+                        carbonTable.isHivePartitionTable) {
+        val partitionColumns = partitionInfo.getColumnSchemaList.asScala
+        val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col))
+        Some(carbonColumnsWithoutPartition)
+      } else {
+        Some(cols)
+      }
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-        .alterDropColumns(tableIdentifier, schemaParts, cols)
+        .alterDropColumns(tableIdentifier, schemaParts, columns)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
       // delete dictionary files for dictionary column and clear dictionary cache from memory


Mime
View raw message