carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2285] Spark integration code refactor
Date Fri, 30 Mar 2018 14:12:12 GMT
Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 e14c20e65 -> b835e76a7


[CARBONDATA-2285] Spark integration code refactor

This closes #2117


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b835e76a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b835e76a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b835e76a

Branch: refs/heads/branch-1.3
Commit: b835e76a7e22506269b717128298a12f347c9bb8
Parents: e14c20e
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Mon Mar 26 16:36:29 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Fri Mar 30 19:34:36 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  6 ++
 .../hadoop/api/CarbonTableInputFormat.java      | 21 +++--
 .../spark/util/CarbonReflectionUtils.scala      |  6 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  8 +-
 .../spark/sql/hive/CarbonSessionCatalog.scala   | 63 +++++++++++++
 .../org/apache/spark/util/AlterTableUtil.scala  |  2 +-
 .../spark/sql/hive/CarbonSessionState.scala     | 18 ++--
 .../spark/sql/hive/CarbonSessionState.scala     | 66 ++++++-------
 .../spark/sql/hive/CarbonSessionUtil.scala      | 97 ++++++++++++++++++++
 .../spark/sql/common/util/Spark2QueryTest.scala |  4 +-
 10 files changed, 231 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index f210408..605d711 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1276,6 +1276,12 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_SESSIONSTATE_CLASSNAME = "spark.carbon.sessionstate.classname";
 
+  /**
+   * This property will be used to configure the sqlastbuilder class.
+   */
+  public static final String CARBON_SQLASTBUILDER_CLASSNAME =
+      "spark.carbon.sqlastbuilder.classname";
+
   public static final String CARBON_COMMON_LISTENER_REGISTER_CLASSNAME =
       "spark.carbon.common.listener.register.classname";
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 2d6c03a..094b8c3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -364,14 +364,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
       List<Segment> validSegments = segments.getValidSegments();
       streamSegments = segments.getStreamSegments();
-      streamSegments = getFilteredSegment(job,streamSegments);
+      streamSegments = getFilteredSegment(job,streamSegments, true);
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
 
-
-
-      List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments());
+      List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments(),
+          true);
       if (filteredSegmentToAccess.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       } else {
@@ -393,7 +392,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     validAndInProgressSegments.addAll(segments.getListOfInProgressSegments());
     // get updated filtered list
     List<Segment> filteredSegmentToAccess =
-        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments));
+        getFilteredSegment(job, new ArrayList<>(validAndInProgressSegments), false);
     // Clean the updated segments from memory if the update happens on segments
     List<Segment> toBeCleanedSegments = new ArrayList<>();
     for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
@@ -462,7 +461,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
    * Return segment list after filtering out valid segments and segments set by user by
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */
-  private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments)
{
+  private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments,
+      boolean validationRequired) {
     Segment[] segmentsToAccess = getSegmentsToAccess(job);
     List<Segment> segmentToAccessSet =
         new ArrayList<>(new HashSet<>(Arrays.asList(segmentsToAccess)));
@@ -482,6 +482,13 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
           }
         }
       }
+      if (filteredSegmentToAccess.size() != segmentToAccessSet.size() && !validationRequired)
{
+        for (Segment segment : segmentToAccessSet) {
+          if (!filteredSegmentToAccess.contains(segment)) {
+            filteredSegmentToAccess.add(segment);
+          }
+        }
+      }
       if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
         List<Segment> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess);
         filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
@@ -944,7 +951,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     Map<String, Long> segmentAndBlockCountMapping = new HashMap<>();
 
     // TODO: currently only batch segment is supported, add support for streaming table
-    List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments());
+    List<Segment> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments(),
false);
 
     List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 2f15263..69eb021 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -183,8 +183,10 @@ object CarbonReflectionUtils {
       sqlParser: Object,
       sparkSession: SparkSession): AstBuilder = {
     if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
-      createObject(
-        "org.apache.spark.sql.hive.CarbonSqlAstBuilder",
+      val className = sparkSession.sparkContext.conf.get(
+        CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
+        "org.apache.spark.sql.hive.CarbonSqlAstBuilder")
+      createObject(className,
         conf,
         sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 0180d6a..4647bee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -118,7 +118,7 @@ object CarbonEnv {
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
     if (sparkSession.isInstanceOf[CarbonSession]) {
-      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv()
     } else {
       var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
       if (carbonEnv == null) {
@@ -198,7 +198,11 @@ object CarbonEnv {
   def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: SparkSession):
Boolean = {
     var isRefreshed = false
     val carbonEnv = getInstance(sparkSession)
-    if (carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) {
+    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+      identifier.database.getOrElse("default"), identifier.table)
+    if (table.isEmpty ||
+        (table.isDefined && carbonEnv.carbonMetastore
+          .checkSchemasModifiedTimeAndReloadTable(identifier))) {
       sparkSession.sessionState.catalog.refreshTable(identifier)
       DataMapStoreManager.getInstance().
         clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
new file mode 100644
index 0000000..92b220d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -0,0 +1,63 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition}
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+/**
+ * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
+ * but are not defined in SessionCatalog or HiveSessionCatalog to give contract to the
+ * Concrete implementation classes.
+ * For example CarbonSessionCatalog defined in 2.1 and 2.2.
+ *
+ */
+trait CarbonSessionCatalog {
+  /**
+   * implementation to be provided by each CarbonSessionCatalog based on on used ExternalCatalog
+   *
+   * @return
+   */
+  def getClient(): org.apache.spark.sql.hive.client.HiveClient
+
+  /**
+   * The method returns the CarbonEnv instance
+   *
+   * @return
+   */
+  def getCarbonEnv(): CarbonEnv
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions
from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(partitionFilters: Seq[Expression], sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition]
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(path: Path, storage: CatalogStorageFormat): CatalogStorageFormat
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index c59b1a3..cff641f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
 import org.apache.carbondata.common.logging.LogServiceFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index d381144..5c0f118 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -55,7 +55,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  * @param conf
  * @param hadoopConf
  */
-class CarbonSessionCatalog(
+class CarbonHiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     globalTempViewManager: GlobalTempViewManager,
     sparkSession: SparkSession,
@@ -70,14 +70,20 @@ class CarbonSessionCatalog(
     functionResourceLoader,
     functionRegistry,
     conf,
-    hadoopConf) {
+    hadoopConf) with CarbonSessionCatalog {
 
   lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
     env
   }
-
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
+    carbonEnv
+  }
   // Initialize all listeners to the Operation bus.
   CarbonEnv.init(sparkSession)
 
@@ -138,7 +144,7 @@ class CarbonSessionCatalog(
    *
    * @return
    */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
   }
 
@@ -187,7 +193,7 @@ class CarbonSessionCatalog(
   /**
    * Update the storageformat with new location information
    */
-  def updateStorageLocation(
+  override def updateStorageLocation(
       path: Path,
       storage: CatalogStorageFormat): CatalogStorageFormat = {
     storage.copy(locationUri = Some(path.toString))
@@ -254,7 +260,7 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
    * Internal catalog for managing table and database states.
    */
   override lazy val catalog = {
-    new CarbonSessionCatalog(
+    new CarbonHiveSessionCatalog(
       sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
       sparkSession.sharedState.globalTempViewManager,
       sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index e82b485..182ba76 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -64,7 +64,7 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
  * @param conf
  * @param hadoopConf
  */
-class CarbonSessionCatalog(
+class CarbonHiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     globalTempViewManager: GlobalTempViewManager,
     functionRegistry: FunctionRegistry,
@@ -73,7 +73,7 @@ class CarbonSessionCatalog(
     hadoopConf: Configuration,
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog(
+  extends HiveSessionCatalog (
     externalCatalog,
     globalTempViewManager,
     new HiveMetastoreCatalog(sparkSession),
@@ -82,15 +82,18 @@ class CarbonSessionCatalog(
     hadoopConf,
     parser,
     functionResourceLoader
-  ) {
+  ) with CarbonSessionCatalog {
 
   lazy val carbonEnv = {
     val env = new CarbonEnv
     env.init(sparkSession)
     env
   }
-
-  def getCarbonEnv() : CarbonEnv = {
+  /**
+   * return's the carbonEnv instance
+   * @return
+   */
+  override def getCarbonEnv() : CarbonEnv = {
     carbonEnv
   }
 
@@ -102,28 +105,9 @@ class CarbonSessionCatalog(
 
   override def lookupRelation(name: TableIdentifier): LogicalPlan = {
     val rtnRelation = super.lookupRelation(name)
-    var toRefreshRelation = false
-    rtnRelation match {
-      case SubqueryAlias(_,
-      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
-        toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
-      case SubqueryAlias(_, relation) if
-      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation")
||
-      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation")
||
-      relation.getClass.getName.equals(
-        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
-        val catalogTable =
-          CarbonReflectionUtils.getFieldOfCatalogTable(
-            "tableMeta",
-            relation).asInstanceOf[CatalogTable]
-        toRefreshRelation =
-          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
-      case _ =>
-    }
-
-    if (toRefreshRelation) {
+    val isRelationRefreshed =
+      CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+    if (isRelationRefreshed) {
       super.lookupRelation(name)
     } else {
       rtnRelation
@@ -135,7 +119,7 @@ class CarbonSessionCatalog(
    *
    * @return
    */
-  def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
     sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
       .asInstanceOf[HiveExternalCatalog].client
   }
@@ -162,21 +146,16 @@ class CarbonSessionCatalog(
    * @param identifier
    * @return
    */
-  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
       identifier: TableIdentifier) = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
-    ExternalCatalogUtils.prunePartitionsByFilter(
-      sparkSession.sessionState.catalog.getTableMetadata(identifier),
-      allPartitions,
-      partitionFilters,
-      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
   }
 
   /**
    * Update the storageformat with new location information
    */
-  def updateStorageLocation(
+  override def updateStorageLocation(
       path: Path,
       storage: CatalogStorageFormat): CatalogStorageFormat = {
     storage.copy(locationUri = Some(path.toUri))
@@ -222,8 +201,8 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
   /**
    * Create a [[CarbonSessionCatalogBuild]].
    */
-  override protected lazy val catalog: CarbonSessionCatalog = {
-    val catalog = new CarbonSessionCatalog(
+  override protected lazy val catalog: CarbonHiveSessionCatalog = {
+    val catalog = new CarbonHiveSessionCatalog(
       externalCatalog,
       session.sharedState.globalTempViewManager,
       functionRegistry,
@@ -285,6 +264,13 @@ class CarbonOptimizer(
   extends SparkOptimizer(catalog, conf, experimentalMethods) {
 
   override def execute(plan: LogicalPlan): LogicalPlan = {
+    val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+    super.execute(transFormedPlan)
+  }
+}
+
+object CarbonOptimizerUtil {
+  def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
     // In case scalar subquery add flag in relation to skip the decoder plan in optimizer
rule, And
     // optimize whole plan at once.
     val transFormedPlan = plan.transform {
@@ -314,10 +300,10 @@ class CarbonOptimizer(
                 lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
                 lr
             }
-            In(value, Seq(ListQuery(tPlan, l.children , exprId)))
+            In(value, Seq(ListQuery(tPlan, l.children, exprId)))
         }
     }
-    super.execute(transFormedPlan)
+    transFormedPlan
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
new file mode 100644
index 0000000..ac702ea
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+
+/**
+ * This class refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table.
+ */
+object CarbonSessionUtil {
+
+  val LOGGER = LogServiceFactory.getLogService("CarbonSessionUtil")
+
+  /**
+   * The method refreshes the cache entry
+   *
+   * @param rtnRelation [[LogicalPlan]] represents the given table or view.
+   * @param name        tableName
+   * @param sparkSession
+   * @return
+   */
+  def refreshRelation(rtnRelation: LogicalPlan, name: TableIdentifier)
+    (sparkSession: SparkSession): Boolean = {
+    var isRelationRefreshed = false
+    rtnRelation match {
+      case SubqueryAlias(_,
+      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)
+      ) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+      case SubqueryAlias(_, relation) if
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation")
||
+      relation.getClass.getName
+        .equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation"
+      ) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable(
+            "tableMeta",
+            relation
+          ).asInstanceOf[CatalogTable]
+        isRelationRefreshed =
+          CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+      case _ =>
+    }
+    isRelationRefreshed
+  }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions
from
+   * hive and then apply filter instead of querying hive along with filters.
+   *
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b835e76a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
index 169bcf2..ff64c05 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.sql.common.util
 
 import org.apache.spark.sql.CarbonSession
-import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonHiveSessionCatalog, HiveExternalCatalog}
 import org.apache.spark.sql.test.util.QueryTest
 
 
 class Spark2QueryTest extends QueryTest {
 
-  val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+  val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonHiveSessionCatalog]
     .getClient()
 
 }
\ No newline at end of file


Mime
View raw message