carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [41/50] [abbrv] carbondata git commit: [CARBONDATA-1977][PARTITION] Fix aggregation table loading after loading of partition table.
Date Tue, 09 Jan 2018 04:02:09 GMT
[CARBONDATA-1977][PARTITION] Fix aggregation table loading after loading of partition table.

Aggregate tables are not loading for the partition tables. Because the load events are not
fired during the partition table load.

This closes #1757


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/829e7aa4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/829e7aa4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/829e7aa4

Branch: refs/heads/branch-1.3
Commit: 829e7aa4e28e065d5162bd0ec48b4edfa4f6dc3d
Parents: cdf2c02
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Jan 3 17:02:33 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Sat Jan 6 02:19:09 2018 +0800

----------------------------------------------------------------------
 .../carbondata/events/OperationContext.java     |   5 +-
 .../hadoop/api/CarbonOutputCommitter.java       |  20 +++
 .../hadoop/api/CarbonTableOutputFormat.java     |   5 +
 .../preaggregate/TestPreAggregateLoad.scala     |  22 +++
 .../apache/carbondata/events/LoadEvents.scala   |  60 --------
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   1 +
 .../management/CarbonLoadDataCommand.scala      |  21 ++-
 .../preaaggregate/PreAggregateListeners.scala   |   8 +-
 .../datasources/CarbonFileFormat.scala          |   4 +
 .../spark/sql/optimizer/CarbonFilters.scala     |  43 +++++-
 .../processing/loading/events/LoadEvents.java   | 152 +++++++++++++++++++
 12 files changed, 272 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/core/src/main/java/org/apache/carbondata/events/OperationContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationContext.java b/core/src/main/java/org/apache/carbondata/events/OperationContext.java
index f6fe676..0fcd438 100644
--- a/core/src/main/java/org/apache/carbondata/events/OperationContext.java
+++ b/core/src/main/java/org/apache/carbondata/events/OperationContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.events;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -23,7 +24,9 @@ import java.util.Map;
  * One OperationContext per one operation.
  * OperationContext active till operation execution completes
  */
-public class OperationContext {
+public class OperationContext implements Serializable {
+
+  private static final long serialVersionUID = -8808813829717624986L;
 
   private Map<String, Object> operationProperties = new HashMap<String, Object>();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index d0a5fd9..525249a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -32,6 +32,10 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
+import org.apache.carbondata.events.OperationContext;
+import org.apache.carbondata.events.OperationListenerBus;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+import org.apache.carbondata.processing.loading.events.LoadEvents;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 
@@ -94,6 +98,22 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     long segmentSize = CarbonLoaderUtil
         .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
     if (segmentSize > 0) {
+      String operationContextStr =
+          context.getConfiguration().get(
+              CarbonTableOutputFormat.OPERATION_CONTEXT,
+              null);
+      if (operationContextStr != null) {
+        OperationContext operationContext =
+            (OperationContext) ObjectSerializationUtil.convertStringToObject(operationContextStr);
+        LoadEvents.LoadTablePreStatusUpdateEvent event =
+            new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(),
+                loadModel);
+        try {
+          OperationListenerBus.getInstance().fireEvent(event, operationContext);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
       mergeCarbonIndexFiles(segmentPath);
       String updateTime =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 897c929..2c72b39 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -103,6 +103,11 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Stri
   public static final String SEGMENTS_TO_BE_DELETED =
       "mapreduce.carbontable.segments.to.be.removed";
 
+  /**
+   * It is used only to fire events in case of any child tables to be loaded.
+   */
+  public static final String OPERATION_CONTEXT = "mapreduce.carbontable.operation.context";
+
   private static final Log LOG = LogFactory.getLog(CarbonTableOutputFormat.class);
 
   private CarbonOutputCommitter committer;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index ff1c330..d794f32 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -216,4 +216,26 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
originalBadRecordsAction)
   }
 
+  test("test partition load into main table with pre-aggregate table") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, city string, age int) partitioned by(name string)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    createAllAggregateTables("maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    checkAnswer(sql(s"select * from maintable_preagg_avg"),
+      Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_count"),
+      Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2)))
+    checkAnswer(sql(s"select * from maintable_preagg_min"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26)))
+    checkAnswer(sql(s"select * from maintable_preagg_max"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29)))
+    sql("drop table if exists maintable")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
deleted file mode 100644
index 022ad72..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.events
-
-import org.apache.spark.sql.SparkSession
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-
-/**
- * Class for handling operations before start of a load process.
- * Example usage: For validation purpose
- */
-case class LoadTablePreExecutionEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier,
-    carbonLoadModel: CarbonLoadModel,
-    factPath: String,
-    isDataFrameDefined: Boolean,
-    optionsFinal: scala.collection.mutable.Map[String, String],
-    // userProvidedOptions are needed if we need only the load options given by user
-    userProvidedOptions: Map[String, String],
-    isOverWriteTable: Boolean) extends Event with LoadEventInfo
-
-/**
- * Class for handling operations after data load completion and before final
- * commit of load operation. Example usage: For loading pre-aggregate tables
- */
-case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier,
-    carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
-
-/**
- * Event for handling operations after data load completion and before final
- * commit of load operation. Example usage: For loading pre-aggregate tables
- */
-case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier,
-    carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
-
-/**
- * Class for handling clean up in case of any failure and abort the operation.
- */
-case class LoadTableAbortExecutionEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier,
-    carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c6a1178..7982071 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -58,10 +58,11 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent,
OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
 import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
@@ -527,11 +528,9 @@ object CarbonDataRDDFactory {
 
       writeDictionary(carbonLoadModel, result, writeAll = false)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
-        LoadTablePreStatusUpdateEvent(
-        sqlContext.sparkSession,
+        new LoadTablePreStatusUpdateEvent(
         carbonTable.getCarbonTableIdentifier,
         carbonLoadModel)
-      operationContext.setProperty("isOverwrite", overwriteTable)
       OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
       val done =
         updateTableStatus(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index b4e11c1..7ee3434 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo}
 import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent,
LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 01bb5b3..0c6879c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -62,12 +62,14 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent,
OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.format
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.{BadRecordFoundException, NoRetryException}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -167,14 +169,15 @@ case class CarbonLoadDataCommand(
       try {
         val operationContext = new OperationContext
         val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
-          LoadTablePreExecutionEvent(sparkSession,
+          new LoadTablePreExecutionEvent(
             table.getCarbonTableIdentifier,
             carbonLoadModel,
             factPath,
             dataFrame.isDefined,
-            optionsFinal,
-            options,
+            optionsFinal.asJava,
+            options.asJava,
             isOverwriteTable)
+        operationContext.setProperty("isOverwrite", isOverwriteTable)
         OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -232,7 +235,7 @@ case class CarbonLoadDataCommand(
             operationContext)
         }
         val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
-          new LoadTablePostExecutionEvent(sparkSession,
+          new LoadTablePostExecutionEvent(
             table.getCarbonTableIdentifier,
             carbonLoadModel)
         OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
@@ -614,7 +617,8 @@ case class CarbonLoadDataCommand(
         sizeInBytes,
         isOverwriteTable,
         carbonLoadModel,
-        sparkSession)
+        sparkSession,
+        operationContext)
       val convertedPlan =
         CarbonReflectionUtils.getInsertIntoCommand(
           table = convertRelation,
@@ -679,7 +683,8 @@ case class CarbonLoadDataCommand(
       sizeInBytes: Long,
       overWrite: Boolean,
       loadModel: CarbonLoadModel,
-      sparkSession: SparkSession): LogicalRelation = {
+      sparkSession: SparkSession,
+      operationContext: OperationContext): LogicalRelation = {
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
     val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType)))
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
@@ -701,6 +706,7 @@ case class CarbonLoadDataCommand(
     val dataSchema =
       StructType(metastoreSchema
         .filterNot(field => partitionSchema.contains(field.name)))
+    val operationContextStr = ObjectSerializationUtil.convertObjectToString(operationContext)
     val options = new mutable.HashMap[String, String]()
     options ++= catalogTable.storage.properties
     options += (("overwrite", overWriteLocal.toString))
@@ -708,6 +714,7 @@ case class CarbonLoadDataCommand(
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
     options += (("staticpartition", partition.nonEmpty.toString))
+    options += (("operationcontext", operationContextStr))
     options ++= this.options
     if (updateModel.isDefined) {
       options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 5e232f6..ea5cfed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
@@ -28,6 +29,7 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent,
LoadTablePreStatusUpdateEvent}
 
 object LoadPostAggregateListener extends OperationEventListener {
   /**
@@ -37,8 +39,8 @@ object LoadPostAggregateListener extends OperationEventListener {
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
-    val sparkSession = loadEvent.sparkSession
-    val carbonLoadModel = loadEvent.carbonLoadModel
+    val sparkSession = SparkSession.getActiveSession.get
+    val carbonLoadModel = loadEvent.getCarbonLoadModel
     val table = CarbonEnv.getCarbonTable(Option(carbonLoadModel.getDatabaseName),
       carbonLoadModel.getTableName)(sparkSession)
     if (CarbonUtil.hasAggregationDataMap(table)) {
@@ -126,7 +128,7 @@ object LoadPreAggregateTablePreListener extends OperationEventListener
{
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
-    val carbonLoadModel = loadEvent.carbonLoadModel
+    val carbonLoadModel = loadEvent.getCarbonLoadModel
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
     if (table.isChildDataMap && !isInternalLoadCall) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 15deae1..4b368de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -124,6 +124,10 @@ with Serializable {
     if (segemntsTobeDeleted.isDefined) {
       conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
     }
+    val operationContextStr = options.get("operationcontext")
+    if (operationContextStr.isDefined) {
+      conf.set(CarbonTableOutputFormat.OPERATION_CONTEXT, operationContextStr.get)
+    }
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 09546cd..5027a66 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.optimizer
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.CastExpressionOptimization
@@ -26,12 +28,14 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression,
Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression,
OrExpression}
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -418,6 +422,11 @@ object CarbonFilters {
   def getPartitions(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
       identifier: TableIdentifier): Seq[String] = {
+    // first try to read partitions in case if the trigger comes from the aggregation table
load.
+    val partitionsForAggTable = getPartitionsForAggTable(sparkSession, identifier)
+    if (partitionsForAggTable.isDefined) {
+      return partitionsForAggTable.get
+    }
     val partitions = {
       try {
         if (CarbonProperties.getInstance().
@@ -448,5 +457,37 @@ object CarbonFilters {
     }.toSet.toSeq
   }
 
+  /**
+   * In case of loading aggregate tables it needs to be get only from the main table load
in
+   * progress segment. So we should read from the partition map file of that segment.
+   */
+  def getPartitionsForAggTable(sparkSession: SparkSession,
+      identifier: TableIdentifier): Option[Seq[String]] = {
+    // when validate segments is disabled then only read from partitionmap
+    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (carbonSessionInfo != null) {
+      val validateSegments = carbonSessionInfo.getSessionParams
+        .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                     CarbonEnv.getDatabaseName(identifier.database)(sparkSession) + "." +
+                     identifier.table, "true").toBoolean
+      if (!validateSegments) {
+        val segmentNumbersFromProperty = CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                       CarbonEnv.getDatabaseName(identifier.database)(sparkSession)
+                       + "." + identifier.table)
+        val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
+        val segmentPath =
+          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNumbersFromProperty)
+        val partitionMapper = new PartitionMapFileStore()
+        partitionMapper.readAllPartitionsOfSegment(segmentPath)
+        Some(partitionMapper.getPartitionMap.asScala.map(_._2).flatMap(_.asScala).toSet.toSeq)
+      } else {
+        None
+      }
+    } else {
+      None
+    }
+  }
+
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
new file mode 100644
index 0000000..b00a67e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.events;
+
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.events.Event;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+public class LoadEvents {
+  /**
+   * Class for handling operations before start of a load process.
+   * Example usage: For validation purpose
+   */
+  public static class LoadTablePreExecutionEvent extends Event {
+    private CarbonTableIdentifier carbonTableIdentifier;
+    private CarbonLoadModel carbonLoadModel;
+    private String factPath;
+    private boolean isDataFrameDefined;
+    private Map<String, String> optionsFinal;
+    // userProvidedOptions are needed if we need only the load options given by user
+    private Map<String, String> userProvidedOptions;
+    private boolean isOverWriteTable;
+
+    public LoadTablePreExecutionEvent(CarbonTableIdentifier carbonTableIdentifier,
+        CarbonLoadModel carbonLoadModel, String factPath, boolean isDataFrameDefined,
+        Map<String, String> optionsFinal, Map<String, String> userProvidedOptions,
+        boolean isOverWriteTable) {
+      this.carbonTableIdentifier = carbonTableIdentifier;
+      this.carbonLoadModel = carbonLoadModel;
+      this.factPath = factPath;
+      this.isDataFrameDefined = isDataFrameDefined;
+      this.optionsFinal = optionsFinal;
+      this.userProvidedOptions = userProvidedOptions;
+      this.isOverWriteTable = isOverWriteTable;
+    }
+
+    public CarbonTableIdentifier getCarbonTableIdentifier() {
+      return carbonTableIdentifier;
+    }
+
+    public CarbonLoadModel getCarbonLoadModel() {
+      return carbonLoadModel;
+    }
+
+    public String getFactPath() {
+      return factPath;
+    }
+
+    public boolean isDataFrameDefined() {
+      return isDataFrameDefined;
+    }
+
+    public Map<String, String> getOptionsFinal() {
+      return optionsFinal;
+    }
+
+    public Map<String, String> getUserProvidedOptions() {
+      return userProvidedOptions;
+    }
+
+    public boolean isOverWriteTable() {
+      return isOverWriteTable;
+    }
+  }
+
+  /**
+   * Class for handling operations after data load completion and before final
+   * commit of load operation. Example usage: For loading pre-aggregate tables
+   */
+
+  public static class LoadTablePostExecutionEvent extends Event {
+    private CarbonTableIdentifier carbonTableIdentifier;
+    private CarbonLoadModel carbonLoadModel;
+
+    public LoadTablePostExecutionEvent(CarbonTableIdentifier carbonTableIdentifier,
+        CarbonLoadModel carbonLoadModel) {
+      this.carbonTableIdentifier = carbonTableIdentifier;
+      this.carbonLoadModel = carbonLoadModel;
+    }
+
+    public CarbonTableIdentifier getCarbonTableIdentifier() {
+      return carbonTableIdentifier;
+    }
+
+    public CarbonLoadModel getCarbonLoadModel() {
+      return carbonLoadModel;
+    }
+  }
+
+  /**
+   * Event for handling operations after data load completion and before final
+   * commit of load operation. Example usage: For loading pre-aggregate tables
+   */
+
+  public static class LoadTablePreStatusUpdateEvent extends Event {
+    private CarbonLoadModel carbonLoadModel;
+    private CarbonTableIdentifier carbonTableIdentifier;
+
+    public LoadTablePreStatusUpdateEvent(CarbonTableIdentifier carbonTableIdentifier,
+        CarbonLoadModel carbonLoadModel) {
+      this.carbonTableIdentifier = carbonTableIdentifier;
+      this.carbonLoadModel = carbonLoadModel;
+    }
+
+    public CarbonLoadModel getCarbonLoadModel() {
+      return carbonLoadModel;
+    }
+
+    public CarbonTableIdentifier getCarbonTableIdentifier() {
+      return carbonTableIdentifier;
+    }
+  }
+
+  /**
+   * Class for handling clean up in case of any failure and abort the operation.
+   */
+
+  public static class LoadTableAbortExecutionEvent extends Event {
+    private CarbonTableIdentifier carbonTableIdentifier;
+    private CarbonLoadModel carbonLoadModel;
+    public LoadTableAbortExecutionEvent(CarbonTableIdentifier carbonTableIdentifier,
+        CarbonLoadModel carbonLoadModel) {
+      this.carbonTableIdentifier = carbonTableIdentifier;
+      this.carbonLoadModel = carbonLoadModel;
+    }
+
+    public CarbonTableIdentifier getCarbonTableIdentifier() {
+      return carbonTableIdentifier;
+    }
+
+    public CarbonLoadModel getCarbonLoadModel() {
+      return carbonLoadModel;
+    }
+  }
+}


Mime
View raw message