carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-1592] Added preUpdateStatus Event Listeners, corrected event parameters,
Date Thu, 07 Dec 2017 20:46:21 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 6b7217a8d -> 0da0a4f61


[CARBONDATA-1592] Added preUpdateStatus Event Listeners, corrected event parameters,

This closes #1614


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0da0a4f6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0da0a4f6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0da0a4f6

Branch: refs/heads/master
Commit: 0da0a4f614130a30a3edd527c248ec27d6ac5ca8
Parents: 6b7217a
Author: Manohar <manohar.crazy09@gmail.com>
Authored: Tue Dec 5 18:44:29 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Dec 8 02:16:04 2017 +0530

----------------------------------------------------------------------
 .../carbondata/events/AlterTableEvents.scala    | 50 ++++++++++++++------
 .../org/apache/carbondata/events/Events.scala   | 10 +++-
 .../apache/carbondata/events/LoadEvents.scala   |  4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 31 +++++++-----
 .../spark/rdd/CarbonTableCompactor.scala        | 18 +++++--
 .../org/apache/spark/sql/CarbonSession.scala    |  2 +-
 .../CarbonAlterTableCompactionCommand.scala     | 22 +++++++--
 .../management/CarbonLoadDataCommand.scala      | 23 +++++----
 .../preaaggregate/PreAggregateListeners.scala   |  4 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |  6 ++-
 10 files changed, 118 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 7caad43..0457e85 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -16,12 +16,16 @@
  */
 package org.apache.carbondata.events
 
+import java.util
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel,
AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.CompactionType
 
 /**
  *
@@ -144,37 +148,53 @@ case class AlterTableRenameAbortEvent(
     sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
 
 
-case class AlterTableCompactionPreEvent(
+/**
+ * Event for handling pre compaction operations, lister has to implement this event on pre
execution
+ *
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class AlterTableCompactionPreEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     carbonMergerMapping: CarbonMergerMapping,
-    mergedLoadName: String,
-    sqlContext: SQLContext) extends Event with AlterTableCompactionEventInfo
-
+    mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
 
 /**
- *
+ * Compaction Event for handling pre update status file opeartions, lister has to implement
this
+ * event before updating the table status file
+ * @param sparkSession
  * @param carbonTable
  * @param carbonMergerMapping
  * @param mergedLoadName
- * @param sQLContext
  */
-case class AlterTableCompactionPostEvent(
+case class AlterTableCompactionPostEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     carbonMergerMapping: CarbonMergerMapping,
-    mergedLoadName: String,
-    sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
-
+    mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
+/**
+ * Compaction Event for handling pre update status file opeartions, lister has to implement
this
+ * event before updating the table status file
+ * @param sparkSession
+ * @param carbonTable
+ * @param carbonMergerMapping
+ * @param carbonLoadModel
+ * @param mergedLoadName
+ */
+case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    carbonMergerMapping: CarbonMergerMapping,
+    carbonLoadModel: CarbonLoadModel,
+    mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo
 
 /**
- * Class for handling clean up in case of any failure and abort the operation
+ * Compaction Event for handling clean up in case of any compaction failure and abort the
+ * operation, lister has to implement this event to handle failure scenarios
  *
  * @param carbonTable
  * @param carbonMergerMapping
  * @param mergedLoadName
- * @param sQLContext
  */
-case class AlterTableCompactionAbortEvent(
+case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     carbonMergerMapping: CarbonMergerMapping,
-    mergedLoadName: String,
-    sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
+    mergedLoadName: String) extends Event with AlterTableCompactionEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 4af337b..8e69855 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -93,13 +93,21 @@ trait AlterTableAddColumnEventInfo {
 /**
  * event for alter_table_rename
  */
-trait AlterTableCompactionEventInfo {
+trait AlterTableCompactionStatusUpdateEventInfo {
   val carbonTable: CarbonTable
   val carbonMergerMapping: CarbonMergerMapping
   val mergedLoadName: String
 }
 
 /**
+ * event for alter_table_compaction
+ */
+trait AlterTableCompactionEventInfo {
+  val sparkSession: SparkSession
+  val carbonTable: CarbonTable
+}
+
+/**
  * event for DeleteSegmentById
  */
 trait DeleteSegmentbyIdEventInfo {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
index 84dde84..022ad72 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -45,8 +45,8 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
     carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
 
 /**
- * Class for handling operations after data load completion and before final commit of load
- * operation. Example usage: For loading pre-aggregate tables
+ * Event for handling operations after data load completion and before final
+ * commit of load operation. Example usage: For loading pre-aggregate tables
  */
 case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession,
     carbonTableIdentifier: CarbonTableIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1d2934f..8f4af1b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -81,7 +81,8 @@ object CarbonDataRDDFactory {
       storeLocation: String,
       compactionType: CompactionType,
       carbonTable: CarbonTable,
-      compactionModel: CompactionModel): Unit = {
+      compactionModel: CompactionModel,
+      operationContext: OperationContext): Unit = {
     // taking system level lock at the mdt file location
     var configuredMdtPath = CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
@@ -114,7 +115,8 @@ object CarbonDataRDDFactory {
           carbonLoadModel,
           storeLocation,
           compactionModel,
-          lock
+          lock,
+          operationContext
         )
       } catch {
         case e: Exception =>
@@ -150,7 +152,8 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       storeLocation: String,
       compactionModel: CompactionModel,
-      compactionLock: ICarbonLock): Unit = {
+      compactionLock: ICarbonLock,
+      operationContext: OperationContext): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
     if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
@@ -280,14 +283,15 @@ object CarbonDataRDDFactory {
   def loadCarbonData(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       columnar: Boolean,
       partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
       result: Option[DictionaryServer],
       overwriteTable: Boolean,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None,
-      updateModel: Option[UpdateTableModel] = None): Unit = {
+      updateModel: Option[UpdateTableModel] = None,
+      operationContext: OperationContext): Unit = {
+    val storePath: String = carbonLoadModel.getTablePath
     LOGGER.audit(s"Data load request has been received for table" +
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
     // Check if any load need to be deleted before loading new data
@@ -494,10 +498,12 @@ object CarbonDataRDDFactory {
         throw new Exception("No Data to load")
       }
       writeDictionary(carbonLoadModel, result, writeAll = false)
-      val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession,
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        LoadTablePreStatusUpdateEvent(
+        sqlContext.sparkSession,
         carbonTable.getCarbonTableIdentifier,
         carbonLoadModel)
-      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
       val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
       if (!done) {
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)
@@ -518,7 +524,7 @@ object CarbonDataRDDFactory {
       }
       try {
         // compaction handling
-        handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable)
+        handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, operationContext)
       } catch {
         case e: Exception =>
           throw new Exception(
@@ -682,7 +688,8 @@ object CarbonDataRDDFactory {
   private def handleSegmentMerging(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      carbonTable: CarbonTable
+      carbonTable: CarbonTable,
+      operationContext: OperationContext
   ): Unit = {
     LOGGER.info(s"compaction need status is" +
                 s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
@@ -717,7 +724,8 @@ object CarbonDataRDDFactory {
           storeLocation,
           CompactionType.MINOR,
           carbonTable,
-          compactionModel
+          compactionModel,
+          operationContext
         )
       } else {
         val lock = CarbonLockFactory.getCarbonLockObj(
@@ -731,7 +739,8 @@ object CarbonDataRDDFactory {
               carbonLoadModel,
               storeLocation,
               compactionModel,
-              lock
+              lock,
+              operationContext
             )
           } catch {
             case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 3ebc957..5f5a3d1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
 
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent,
OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent,
AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
@@ -147,7 +147,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     // trigger event for compaction
     val operationContext = new OperationContext
     val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
-      AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
+      AlterTableCompactionPreEvent(sqlContext.sparkSession,
+        carbonTable,
+        carbonMergerMapping,
+        mergedLoadName)
     OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
 
     var execInstance = "1"
@@ -195,9 +198,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
 
       // trigger event for compaction
-      val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
-        AlterTableCompactionPostEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
-      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+      val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent
=
+      AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
+        carbonTable,
+        carbonMergerMapping,
+        carbonLoadModel,
+        mergedLoadName)
+      OperationListenerBus.getInstance
+        .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
 
       val endTime = System.nanoTime()
       LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index a9b5455..e6ee535 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -271,7 +271,7 @@ object CarbonSession {
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
       .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
-      .addListener(classOf[AlterTableCompactionPostEvent],
+      .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
         AlterPreAggregateTableCompactionPostListener)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 462b055..5fdf62a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent,
OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
@@ -83,12 +84,18 @@ case class CarbonAlterTableCompactionCommand(
       CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
       System.getProperty("java.io.tmpdir"))
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+    // trigger event for compaction
+    val operationContext = new OperationContext
+    val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+      AlterTableCompactionPreEvent(sparkSession, table, null, null)
+    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
     try {
       alterTableForCompaction(
         sparkSession.sqlContext,
         alterTableModel,
         carbonLoadModel,
-        storeLocation)
+        storeLocation,
+        operationContext)
     } catch {
       case e: Exception =>
         if (null != e.getMessage) {
@@ -99,13 +106,18 @@ case class CarbonAlterTableCompactionCommand(
             "Exception in compaction. Please check logs for more info.")
         }
     }
+    // trigger event for compaction
+    val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+      AlterTableCompactionPostEvent(sparkSession, table, null, null)
+    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
     Seq.empty
   }
 
   private def alterTableForCompaction(sqlContext: SQLContext,
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
-      storeLocation: String): Unit = {
+      storeLocation: String,
+      operationContext: OperationContext): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType)
@@ -167,7 +179,8 @@ case class CarbonAlterTableCompactionCommand(
         storeLocation,
         compactionType,
         carbonTable,
-        compactionModel
+        compactionModel,
+        operationContext
       )
     } else {
       // normal flow of compaction
@@ -194,7 +207,8 @@ case class CarbonAlterTableCompactionCommand(
             carbonLoadModel,
             storeLocation,
             compactionModel,
-            lock
+            lock,
+            operationContext
           )
         } catch {
           case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f642785..ebdaa33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -39,8 +39,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext}
-import org.apache.carbondata.events.{LoadTablePreExecutionEvent, OperationListenerBus}
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent,
OperationContext, OperationListenerBus}
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -190,14 +189,16 @@ case class CarbonLoadDataCommand(
             carbonLoadModel,
             columnar,
             partitionStatus,
-            hadoopConf)
+            hadoopConf,
+            operationContext)
         } else {
           loadData(
             sparkSession,
             carbonLoadModel,
             columnar,
             partitionStatus,
-            hadoopConf)
+            hadoopConf,
+            operationContext)
         }
         val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
           new LoadTablePostExecutionEvent(sparkSession,
@@ -253,7 +254,8 @@ case class CarbonLoadDataCommand(
       carbonLoadModel: CarbonLoadModel,
       columnar: Boolean,
       partitionStatus: SegmentStatus,
-      hadoopConf: Configuration): Unit = {
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       .getCarbonTableIdentifier
@@ -314,14 +316,14 @@ case class CarbonLoadDataCommand(
     }
     CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getTablePath,
       columnar,
       partitionStatus,
       server,
       isOverwriteTable,
       hadoopConf,
       dataFrame,
-      updateModel)
+      updateModel,
+      operationContext)
   }
 
   private def loadData(
@@ -329,7 +331,8 @@ case class CarbonLoadDataCommand(
       carbonLoadModel: CarbonLoadModel,
       columnar: Boolean,
       partitionStatus: SegmentStatus,
-      hadoopConf: Configuration): Unit = {
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
     val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
       val fields = dataFrame.get.schema.fields
       import org.apache.spark.sql.functions.udf
@@ -365,14 +368,14 @@ case class CarbonLoadDataCommand(
     }
     CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getTablePath,
       columnar,
       partitionStatus,
       None,
       isOverwriteTable,
       hadoopConf,
       loadDataFrame,
-      updateModel)
+      updateModel,
+      operationContext)
   }
 
   private def updateTableMetadata(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 9168247..4315e05 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -70,10 +70,10 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
    * @param operationContext
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
+    val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
     val carbonTable = compactionEvent.carbonTable
     val compactionType = compactionEvent.carbonMergerMapping.campactionType
-    val sparkSession = compactionEvent.sQLContext.sparkSession
+    val sparkSession = compactionEvent.sparkSession
     if (carbonTable.hasDataMapSchema) {
       carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
         val childRelationIdentifier = dataMapSchema.getRelationIdentifier

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 6bf55db..1766064 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.execution.command.schema
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
@@ -170,12 +172,14 @@ private[sql] case class CarbonAlterTableRenameCommand(
       AlterTableUtil.releaseLocks(locks)
       // case specific to rename table as after table rename old table path will not be found
       if (carbonTable != null) {
+        val newTablePath = CarbonUtil
+          .getNewTablePath(new Path(carbonTable.getTablePath), newTableName)
         AlterTableUtil
           .releaseLocksManually(locks,
             locksToBeAcquired,
             oldDatabaseName,
             newTableName,
-            carbonTable.getTablePath)
+            newTablePath)
       }
     }
     Seq.empty


Mime
View raw message