carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1738] [PreAgg] Block direct insert/load on pre-aggregate table
Date Mon, 04 Dec 2017 10:50:49 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 2fe7758be -> 5ae596b76


[CARBONDATA-1738] [PreAgg] Block direct insert/load on pre-aggregate table

This closes #1508


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5ae596b7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5ae596b7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5ae596b7

Branch: refs/heads/master
Commit: 5ae596b76f1c15bd78f992bec1c51ae76223f635
Parents: 2fe7758
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Thu Nov 16 17:58:50 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Mon Dec 4 16:20:23 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../preaggregate/TestPreAggregateLoad.scala     |   7 +-
 .../TestPreAggregateTableSelection.scala        |   1 -
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   6 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   4 +
 .../org/apache/spark/sql/CarbonSession.scala    |  11 ++
 .../management/CarbonLoadDataCommand.scala      |  46 +++---
 .../CreatePreAggregateTableCommand.scala        |  26 +++-
 .../preaaggregate/PreAggregateListeners.scala   |  65 ++++++--
 .../preaaggregate/PreAggregateUtil.scala        |  12 +-
 .../sql/hive/CarbonPreAggregateRules.scala      | 155 ++++++++++---------
 .../execution/command/CarbonHiveCommands.scala  |   4 +
 .../sql/parser/CarbonSpark2SqlParser.scala      |  12 +-
 .../src/main/spark2.1/CarbonSessionState.scala  |   1 +
 .../src/main/spark2.2/CarbonSessionState.scala  |   1 +
 15 files changed, 239 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a264583..43985b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -67,6 +67,12 @@ public final class CarbonCommonConstants {
   public static final String VALIDATE_CARBON_INPUT_SEGMENTS = "validate.carbon.input.segments.";
 
   /**
+   * Whether load/insert command is fired internally or by the user.
+   * Used to block load/insert on pre-aggregate if fired by user
+   */
+  public static final String IS_INTERNAL_LOAD_CALL = "is.internal.load.call";
+
+  /**
    * location of the carbon member, hierarchy and fact files
    */
   @CarbonProperty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 5ac3534..1502c53 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -162,7 +162,12 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
         Row(2, 27),
         Row(3, 35),
         Row(4, 29)))
-    sql("drop table if exists maintable")
+  }
+
+  test("test to check if exception is thrown for direct load on pre-aggregate table") {
+    assert(intercept[RuntimeException] {
+      sql(s"insert into maintable_preagg_sum values(1, 30)")
+    }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table"))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index 1480ae3..c29beec 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4ad939c..094a629 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -1006,6 +1006,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
         "select preAGG() as preAgg, " + query
     }
 
+  lazy val addPreAggLoad: Parser[String] =
+    SELECT ~> restInput <~ opt(";") ^^ {
+      case query =>
+        "select preAggLoad() as preAggLoad, " + query
+    }
+
   protected lazy val primitiveFieldType: Parser[Field] =
     primitiveTypes ^^ {
       case e1 =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 811442b..53b20c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -57,6 +57,10 @@ class CarbonEnv {
     // added for handling preaggregate table creation. when user will fire create ddl for
     // create table we are adding a udf so no need to apply PreAggregate rules.
     sparkSession.udf.register("preAgg", () => "")
+    // added to apply proper rules for loading data into pre-agg table. If this UDF is present
+    // only then the CarbonPreAggregateDataLoadingRules would be applied to split the average
+    // column to sum and count.
+    sparkSession.udf.register("preAggLoad", () => "")
     synchronized {
       if (!initialized) {
         // update carbon session parameters , preserve thread parameters

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 89cbbe4..0cb6ca6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -232,6 +232,16 @@ object CarbonSession {
     ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo)
   }
 
+  def threadUnset(key: String): Unit = {
+    val currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (currentThreadSessionInfo != null) {
+      val currentThreadSessionInfoClone = currentThreadSessionInfo.clone()
+      val threadParams = currentThreadSessionInfoClone.getThreadParams
+      CarbonSetCommand.unsetValue(threadParams, key)
+      ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfoClone)
+    }
+  }
+
   private[spark] def updateSessionInfoToCurrentThread(sparkSession: SparkSession): Unit =
{
     val carbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.clone()
     val currentThreadSessionInfoOrig = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -260,5 +270,6 @@ object CarbonSession {
       .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener)
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
       .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
+      .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index ff13299..47467df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -39,7 +39,8 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent,
OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext}
+import org.apache.carbondata.events.{LoadTablePreExecutionEvent, OperationListenerBus}
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -58,8 +59,8 @@ case class CarbonLoadDataCommand(
     var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None,
-    var tableInfoOp: Option[TableInfo] = None)
-  extends DataCommand {
+    var tableInfoOp: Option[TableInfo] = None,
+    internalOptions: Map[String, String] = Map.empty) extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -127,6 +128,8 @@ case class CarbonLoadDataCommand(
           CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf)
       }
       carbonLoadModel.setFactFilePath(factPath)
+      carbonLoadModel.setAggLoadRequest(internalOptions
+          .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       DataLoadingUtil.buildCarbonLoadModel(
         table,
         carbonProperty,
@@ -135,17 +138,18 @@ case class CarbonLoadDataCommand(
         carbonLoadModel,
         hadoopConf
       )
-      val operationContext = new OperationContext
-      val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
-        new LoadTablePreExecutionEvent(sparkSession,
-          null,
-          carbonLoadModel,
-          factPath,
-          dataFrame.isDefined,
-          optionsFinal)
-      OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
 
-      try{
+
+      try {
+        val operationContext = new OperationContext
+        val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+          LoadTablePreExecutionEvent(sparkSession,
+            table.getCarbonTableIdentifier,
+            carbonLoadModel,
+            factPath,
+            dataFrame.isDefined,
+            optionsFinal)
+        OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
@@ -345,14 +349,14 @@ case class CarbonLoadDataCommand(
     } else {
       (dataFrame, dataFrame)
     }
-
-    GlobalDictionaryUtil.generateGlobalDictionary(
-      sparkSession.sqlContext,
-      carbonLoadModel,
-      hadoopConf,
-      dictionaryDataFrame)
-    CarbonDataRDDFactory.loadCarbonData(
-      sparkSession.sqlContext,
+    if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
+      GlobalDictionaryUtil.generateGlobalDictionary(
+        sparkSession.sqlContext,
+        carbonLoadModel,
+        hadoopConf,
+        dictionaryDataFrame)
+    }
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
       carbonLoadModel,
       carbonLoadModel.getTablePath,
       columnar,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index a17e745..6cee0e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.execution.command.preaaggregate
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
@@ -114,17 +116,27 @@ case class CreatePreAggregateTableCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // load child table if parent table has existing segments
     val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession)
-    val tableName = tableIdentifier.table
-    val metastorePath = CarbonTablePath.getMetadataPath(
-      CarbonEnv.getTablePath(
-        parentTableIdentifier.database,
-        parentTableIdentifier.table)(sparkSession))
+    val parentCarbonTable = CarbonEnv.getCarbonTable(Some(dbName),
+      parentTableIdentifier.table)(sparkSession)
     // This will be used to check if the parent table has any segments or not. If not then
no
     // need to fire load for pre-aggregate table. Therefore reading the load details for
PARENT
     // table.
-    val loadAvailable = SegmentStatusManager.readLoadMetadata(metastorePath).nonEmpty
+    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath)
+      .nonEmpty
     if (loadAvailable) {
-      sparkSession.sql(s"insert into $dbName.$tableName $queryString")
+      val headers = parentCarbonTable.getTableInfo.getFactTable.getListOfColumns.
+        asScala.map(_.getColumnName).mkString(",")
+      val childDataFrame = sparkSession.sql(
+        new CarbonSpark2SqlParser().addPreAggLoadFunction(queryString))
+      CarbonLoadDataCommand(tableIdentifier.database,
+        tableIdentifier.table,
+        null,
+        Nil,
+        Map("fileheader" -> headers),
+        isOverwriteTable = false,
+        dataFrame = Some(childDataFrame),
+        internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
+        run(sparkSession)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 7bc120b..d314488 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.command.preaaggregate
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.CarbonSession
-import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
@@ -39,23 +40,65 @@ object LoadPostAggregateListener extends OperationEventListener {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     if (table.hasDataMapSchema) {
       for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala)
{
-        CarbonSession
-          .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-                     carbonLoadModel.getDatabaseName + "." +
-                     carbonLoadModel.getTableName,
-            carbonLoadModel.getSegmentId)
-        CarbonSession.threadSet(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-                                carbonLoadModel.getDatabaseName + "." +
-                                carbonLoadModel.getTableName, "false")
+        CarbonSession.threadSet(
+          CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+          carbonLoadModel.getDatabaseName + "." +
+          carbonLoadModel.getTableName,
+          carbonLoadModel.getSegmentId)
+        CarbonSession.threadSet(
+          CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+          carbonLoadModel.getDatabaseName + "." +
+          carbonLoadModel.getTableName, "false")
         val childTableName = dataMapSchema.getRelationIdentifier.getTableName
         val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
-        val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY")
-        sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery")
+        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+          s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad")
+        val headers = dataMapSchema.getChildSchema.getListOfColumns.
+          asScala.map(_.getColumnName).mkString(",")
+        try {
+          CarbonLoadDataCommand(Some(childDatabaseName),
+            childTableName,
+            null,
+            Nil,
+            Map("fileheader" -> headers),
+            isOverwriteTable = false,
+            dataFrame = Some(childDataFrame),
+            internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
+            run(sparkSession)
+        } finally {
+          CarbonSession.threadUnset(
+            CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+            carbonLoadModel.getDatabaseName + "." +
+            carbonLoadModel.getTableName)
+          CarbonSession.threadUnset(
+            CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+            carbonLoadModel.getDatabaseName + "." +
+            carbonLoadModel.getTableName)
+        }
       }
     }
   }
 }
 
+object LoadPreAggregateTablePreListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent]
+    val carbonLoadModel = loadEvent.carbonLoadModel
+    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val isInternalLoadCall = carbonLoadModel.isAggLoadRequest
+    if (table.isChildDataMap && !isInternalLoadCall) {
+      throw new UnsupportedOperationException(
+        "Cannot insert/load data directly into pre-aggregate table")
+    }
+  }
+}
+
 object PreAggregateDataTypeChangePreListener extends OperationEventListener {
   /**
    * Called on a specified event occurrence

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index c50e717..43dc39e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -199,6 +199,13 @@ object PreAggregateUtil {
           carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
           parentTableName,
           parentDatabaseName, parentTableId = parentTableId)
+      case count@Count(Seq(Cast(attr: AttributeReference, _))) =>
+        list += getField(attr.name,
+          attr.dataType,
+          count.prettyName,
+          carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+          parentTableName,
+          parentDatabaseName, parentTableId = parentTableId)
       case min@Min(attr: AttributeReference) =>
         list += getField(attr.name,
           attr.dataType,
@@ -253,8 +260,9 @@ object PreAggregateUtil {
           carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
           parentTableName,
           parentDatabaseName, parentTableId = parentTableId)
-      case _ =>
-        throw new MalformedCarbonCommandException("Un-Supported Aggregation Type")
+      case others@_ =>
+        throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${
+          others.prettyName}")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index baa9008..2875817 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
@@ -79,6 +78,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
       case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") =>
         needAnalysis = false
         al
+      case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAggLoad") =>
+        needAnalysis = false
+        al
       // in case of query if any unresolve alias is present then wait for plan to be resolved
       // return the same plan as we can tranform the plan only when everything is resolved
       case unresolveAlias@UnresolvedAlias(_, _) =>
@@ -752,6 +754,80 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends
Rule
   }
 }
 
+object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
+    plan transform {
+      case aggregate@Aggregate(_, aExp, _) if validateAggregateExpressions(aExp) =>
+        aExp.foreach {
+          case alias: Alias =>
+            validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias)
+          case _: UnresolvedAlias =>
+          case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr)
+        }
+        aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq)
+      case plan: LogicalPlan => plan
+    }
+  }
+
+    /**
+     * This method will split the avg column into sum and count and will return a sequence
of tuple
+     * of unique name, alias
+     *
+     */
+    private def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String,
+      NamedExpression)] = {
+      alias match {
+        case udf@Alias(_: ScalaUDF, name) =>
+          Seq((name, udf))
+        case alias@Alias(attrExpression: AggregateExpression, _) =>
+          attrExpression.aggregateFunction match {
+            case Sum(attr: AttributeReference) =>
+              (attr.name + "_sum", alias) :: Nil
+            case Sum(MatchCast(attr: AttributeReference, _)) =>
+              (attr.name + "_sum", alias) :: Nil
+            case Count(Seq(attr: AttributeReference)) =>
+              (attr.name + "_count", alias) :: Nil
+            case Count(Seq(MatchCast(attr: AttributeReference, _))) =>
+              (attr.name + "_count", alias) :: Nil
+            case Average(attr: AttributeReference) =>
+              Seq((attr.name + "_sum", Alias(attrExpression.
+                copy(aggregateFunction = Sum(attr),
+                  resultId = NamedExpression.newExprId), attr.name + "_sum")()),
+                (attr.name, Alias(attrExpression.
+                  copy(aggregateFunction = Count(attr),
+                    resultId = NamedExpression.newExprId), attr.name + "_count")()))
+            case Average(cast@MatchCast(attr: AttributeReference, _)) =>
+              Seq((attr.name + "_sum", Alias(attrExpression.
+                copy(aggregateFunction = Sum(cast),
+                  resultId = NamedExpression.newExprId),
+                attr.name + "_sum")()),
+                (attr.name, Alias(attrExpression.
+                  copy(aggregateFunction = Count(cast), resultId =
+                    NamedExpression.newExprId), attr.name + "_count")()))
+            case _ => Seq(("", alias))
+          }
+
+      }
+    }
+
+  /**
+   * Called by PreAggregateLoadingRules to validate if plan is valid for applying rules or
not.
+   * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg i.e Query UDF then
it is
+   * valid.
+   *
+   * @param namedExpression
+   * @return
+   */
+  private def validateAggregateExpressions(namedExpression: Seq[NamedExpression]): Boolean
= {
+    val filteredExpressions = namedExpression.filterNot(_.isInstanceOf[UnresolvedAlias])
+    filteredExpressions.exists { expr =>
+          !expr.name.equalsIgnoreCase("PreAgg") && expr.name.equalsIgnoreCase("preAggLoad")
+      }
+  }
+}
+
 /**
  * Insert into carbon table from other source
  */
@@ -769,23 +845,14 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends
Rule[Logi
 
   def castChildOutput(p: InsertIntoTable,
       relation: CarbonDatasourceHadoopRelation,
-      child: LogicalPlan)
-  : LogicalPlan = {
+      child: LogicalPlan): LogicalPlan = {
     if (relation.carbonRelation.output.size > CarbonCommonConstants
       .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
       CarbonException.analysisException("Maximum number of columns supported:" +
         s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
     }
-    val isAggregateTable = !relation.carbonRelation.carbonTable.getTableInfo
-      .getParentRelationIdentifiers.isEmpty
-    // transform logical plan if the load is for aggregate table.
-    val childPlan = if (isAggregateTable) {
-      transformAggregatePlan(child)
-    } else {
-      child
-    }
-    if (childPlan.output.size >= relation.carbonRelation.output.size) {
-      val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
+    if (child.output.size >= relation.carbonRelation.output.size) {
+      val newChildOutput = child.output.zipWithIndex.map { columnWithIndex =>
         columnWithIndex._1 match {
           case attr: Alias =>
             Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
@@ -795,7 +862,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends
Rule[Logi
         }
       }
       val version = sparkSession.version
-      val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
+      val newChild: LogicalPlan = if (newChildOutput == child.output) {
         if (version.startsWith("2.1")) {
           CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
         } else if (version.startsWith("2.2")) {
@@ -804,7 +871,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends
Rule[Logi
           throw new UnsupportedOperationException(s"Spark version $version is not supported")
         }
       } else {
-        Project(newChildOutput, childPlan)
+        Project(newChildOutput, child)
       }
 
       val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
@@ -816,63 +883,5 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends
Rule[Logi
     }
   }
 
-  /**
-   * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
-   *
-   * @param logicalPlan
-   * @return
-   */
-  private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
-    val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
-    logicalPlan transform {
-      case aggregate@Aggregate(_, aExp, _) =>
-        aExp.foreach {
-          case alias: Alias =>
-            validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias)
-          case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr)
-        }
-        aggregate.copy(aggregateExpressions = validExpressionsMap.values.toSeq)
-      case plan: LogicalPlan => plan
-    }
-  }
-
-  /**
-   * This method will split the avg column into sum and count and will return a sequence
of tuple
-   * of unique name, alias
-   *
-   */
-  def validateAggregateFunctionAndGetAlias(alias: Alias): Seq[(String, NamedExpression)]
= {
-    alias match {
-      case alias@Alias(attrExpression: AggregateExpression, _) =>
-        attrExpression.aggregateFunction match {
-          case Sum(attr: AttributeReference) =>
-            (attr.name + "_sum", alias) :: Nil
-          case Sum(Cast(attr: AttributeReference, _)) =>
-            (attr.name + "_sum", alias) :: Nil
-          case Count(Seq(attr: AttributeReference)) =>
-            (attr.name + "_count", alias) :: Nil
-          case Count(Seq(Cast(attr: AttributeReference, _))) =>
-            (attr.name + "_count", alias) :: Nil
-          case Average(attr: AttributeReference) =>
-            Seq((attr.name + "_sum", Alias(attrExpression
-              .copy(aggregateFunction = Sum(attr),
-                resultId = NamedExpression.newExprId), attr.name + "_sum")()),
-              (attr.name, Alias(attrExpression
-                .copy(aggregateFunction = Count(attr),
-                  resultId = NamedExpression.newExprId), attr.name + "_count")()))
-          case Average(cast@Cast(attr: AttributeReference, _)) =>
-            Seq((attr.name + "_sum", Alias(attrExpression
-              .copy(aggregateFunction = Sum(cast),
-                resultId = NamedExpression.newExprId),
-              attr.name + "_sum")()),
-              (attr.name, Alias(attrExpression
-                .copy(aggregateFunction = Count(cast),
-                  resultId = NamedExpression.newExprId), attr.name + "_count")()))
-          case _ => Seq(("", alias))
-        }
-
-    }
-  }
-
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index b358f83..6761e92 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -96,6 +96,10 @@ object CarbonSetCommand {
       sessionParams.addProperty(key.toLowerCase(), value)
     }
   }
+
+  def unsetValue(sessionParams: SessionParams, key: String): Unit = {
+    sessionParams.removeProperty(key)
+  }
 }
 
 case class CarbonResetCommand()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 6f7b89a..b01c6d9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -569,8 +569,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   def addPreAggFunction(sql: String): String = {
     addPreAgg(new lexical.Scanner(sql.toLowerCase)) match {
       case Success(query, _) => query
-      case failureOrError => throw new MalformedCarbonCommandException(
-        s"Unsupported query")
+      case _ =>
+        throw new MalformedCarbonCommandException(s"Unsupported query")
+    }
+  }
+
+  def addPreAggLoadFunction(sql: String): String = {
+    addPreAggLoad(new lexical.Scanner(sql.toLowerCase)) match {
+      case Success(query, _) => query
+      case _ =>
+        throw new MalformedCarbonCommandException(s"Unsupported query")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 9f66737..911c25d 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -156,6 +156,7 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
     catalog.OrcConversions ::
     CarbonPreInsertionCasts(sparkSession) ::
     CarbonPreAggregateQueryRules(sparkSession) ::
+    CarbonPreAggregateDataLoadingRules ::
     CarbonIUDAnalysisRule(sparkSession) ::
     AnalyzeCreateTable(sparkSession) ::
     PreprocessTableInsertion(conf) ::

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ae596b7/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index b3792cd..87aebc0 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -248,6 +248,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
extends
           ctx.partitionColumns,
           ctx.columns,
           ctx.tablePropertyList,
+          ctx.locationSpec(),
           Option(ctx.STRING()).map(string),
           ctx.AS)
     } else {


Mime
View raw message