carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1398] Support query from specified segments
Date Fri, 10 Nov 2017 04:45:39 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 2de240632 -> fd0bdf6f6


[CARBONDATA-1398] Support query from specified segments

A new property will introduce to set the segment no.
User will set property(carbon.input.segments. <database_name> .<table_name>) to
specify segment no.
During CarbonScan data will be read from from specified segments only.
If property is not set, all segments will be caonsidered as default behavior.

This closes #1367


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fd0bdf6f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fd0bdf6f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fd0bdf6f

Branch: refs/heads/master
Commit: fd0bdf6f6c9ebe124789d4041ed98a9a790f38c1
Parents: 2de2406
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Thu Sep 14 18:44:09 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri Nov 10 10:15:26 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   6 +
 .../apache/carbondata/core/util/CarbonUtil.java |  25 ++
 .../carbondata/core/util/SessionParams.java     |  12 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  61 +++-
 .../src/test/resources/data1.csv                |  11 +
 .../org/apache/carbondata/api/CarbonStore.scala |   8 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   8 +
 .../spark/sql/CarbonCatalystOperators.scala     |   0
 .../spark/sql/CarbonCatalystOperators.scala     |   3 +-
 .../command/mutation/IUDCommonUtil.scala        |  78 ++++
 .../mutation/ProjectForDeleteCommand.scala      |   1 +
 .../mutation/ProjectForUpdateCommand.scala      |   1 +
 .../execution/command/CarbonHiveCommands.scala  |  12 +
 .../spark/sql/internal/CarbonSqlConf.scala      |   5 +
 .../org/apache/spark/util/ShowSegments.scala    |  10 +-
 .../segmentreading/TestSegmentReading.scala     | 352 +++++++++++++++++++
 16 files changed, 578 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 17936d9..5f63cc1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -50,6 +50,12 @@ public final class CarbonCommonConstants {
    * measure meta data file name
    */
   public static final String MEASURE_METADATA_FILE_NAME = "/msrMetaData_";
+
+  /**
+   * set the segment ids to query from the table
+   */
+  public static final String CARBON_INPUT_SEGMENTS = "carbon.input.segments.";
+
   /**
    * location of the carbon member, hierarchy and fact files
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 016b8b3..51b6f06 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -2011,6 +2012,30 @@ public final class CarbonUtil {
     }
   }
 
+  public static boolean validateRangeOfSegmentList(String segmentId)
+      throws InvalidConfigurationException {
+    String[] values = segmentId.split(",");
+    try {
+      if (values.length == 0) {
+        throw new InvalidConfigurationException(
+            "carbon.input.segments.<database_name>.<table_name> value can't be
empty.");
+      }
+      for (String value : values) {
+        if (!value.equalsIgnoreCase("*")) {
+          Float aFloatValue = Float.parseFloat(value);
+          if (aFloatValue < 0 || aFloatValue > Float.MAX_VALUE) {
+            throw new InvalidConfigurationException(
+                "carbon.input.segments.<database_name>.<table_name> value range
should be greater "
+                    + "than 0 and less than " + Float.MAX_VALUE);
+          }
+        }
+      }
+    } catch (NumberFormatException nfe) {
+      throw new InvalidConfigurationException(
+          "carbon.input.segments.<database_name>.<table_name> value range is
not valid");
+    }
+    return true;
+  }
   /**
    * Below method will be used to check whether bitset applied on previous filter
    * can be used to apply on next column filter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 6d8c900..1878416 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
@@ -144,8 +145,15 @@ public class SessionParams implements Serializable {
         isValid = true;
         break;
       default:
-        throw new InvalidConfigurationException(
-            "The key " + key + " not supported for dynamic configuration.");
+        if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+          isValid = CarbonUtil.validateRangeOfSegmentList(value);
+          if (!isValid) {
+            throw new InvalidConfigurationException("Invalid CARBON_INPUT_SEGMENT_IDs");
+          }
+        } else {
+          throw new InvalidConfigurationException(
+              "The key " + key + " not supported for dynamic configuration.");
+        }
     }
     return isValid;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e22a5c6..57359fc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -27,9 +27,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
@@ -108,6 +110,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   // comma separated list of input segment numbers
   public static final String INPUT_SEGMENT_NUMBERS =
       "mapreduce.input.carboninputformat.segmentnumbers";
+  public static final String VALIDATE_INPUT_SEGMENT_IDs =
+            "mapreduce.input.carboninputformat.validsegments";
   // comma separated list of input files
   public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
   public static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
@@ -253,6 +257,21 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   }
 
   /**
+   * set list of segment to access
+   */
+  public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate)
{
+    configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+  }
+
+  /**
+   * get list of segment to access
+   */
+  public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+    return configuration.get(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+        .equalsIgnoreCase("true");
+  }
+
+  /**
    * Set list of files to access
    */
   public static void setFilesToAccess(Configuration configuration, List<String> validFiles)
{
@@ -280,24 +299,51 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
    */
   @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
     TableDataMap blockletMap =
         DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME,
             BlockletDataMapFactory.class.getName());
     List<String> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
-    List<String> validSegments = Arrays.asList(getSegmentsToAccess(job));
     List<String> streamSegments = null;
-    // get all valid segments and set them into the configuration
-    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
-    if (validSegments.size() == 0) {
+    List<String> filteredSegmentToAccess = new ArrayList<>();
+    if (getValidateSegmentsToAccess(job.getConfiguration())) {
+
+      String[] segmentsToAccess = getSegmentsToAccess(job);
+      Set<String> segmentToAccessSet = new HashSet<>();
+      for (String segmentToAccess : segmentsToAccess) {
+        segmentToAccessSet.add(segmentToAccess);
+      }
+      // get all valid segments and set them into the configuration
       SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
       SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
           segmentStatusManager.getValidAndInvalidSegments();
-      validSegments = segments.getValidSegments();
+      List<String> validSegments = segments.getValidSegments();
       streamSegments = segments.getStreamSegments();
       if (validSegments.size() == 0) {
         return getSplitsOfStreaming(job, identifier, streamSegments);
       }
+      if (segmentsToAccess.length == 0 || segmentsToAccess[0].equalsIgnoreCase("*")) {
+        filteredSegmentToAccess.addAll(validSegments);
+      } else {
+        for (String validSegment : validSegments) {
+          if (segmentToAccessSet.contains(validSegment)) {
+            filteredSegmentToAccess.add(validSegment);
+          }
+        }
+        if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) {
+          List<String> filteredSegmentToAccessTemp = new ArrayList<>();
+          filteredSegmentToAccessTemp.addAll(filteredSegmentToAccess);
+          filteredSegmentToAccessTemp.removeAll(segmentToAccessSet);
+          LOG.info(
+              "Segments ignored are : " + Arrays.toString(filteredSegmentToAccessTemp.toArray()));
+        }
+      }
+      if (filteredSegmentToAccess.size() == 0) {
+        return new ArrayList<>(0);
+      } else {
+        setSegmentsToAccess(job.getConfiguration(), filteredSegmentToAccess);
+      }
       // remove entry in the segment index if there are invalid segments
       invalidSegments.addAll(segments.getInvalidSegments());
       for (String invalidSegmentId : invalidSegments) {
@@ -320,7 +366,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
       }
     }
     // Clean segments if refresh is needed
-    for (String segment : validSegments) {
+    for (String segment : filteredSegmentToAccess) {
       if (DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
           .isRefreshNeeded(segment)) {
         toBeCleanedSegments.add(segment);
@@ -357,7 +403,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
 
     // do block filtering and get split
     List<InputSplit> splits =
-        getSplits(job, filterInterface, validSegments, matchedPartitions, partitionInfo,
null);
+        getSplits(job, filterInterface, filteredSegmentToAccess, matchedPartitions, partitionInfo,
+            null);
     // pass the invalid segment to task side in order to remove index entry in task side
     if (invalidSegments.size() > 0) {
       for (InputSplit split : splits) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark-common-test/src/test/resources/data1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/data1.csv b/integration/spark-common-test/src/test/resources/data1.csv
new file mode 100644
index 0000000..9813184
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/data1.csv
@@ -0,0 +1,11 @@
+empno,empname,designation,doj,workgroupcategory,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary
+101,arvind,SE,17-01-2007,1,developer,10,network,928478,17-02-2007,29-11-2016,96,96,5040
+120,krithin,SSE,29-05-2008,1,developer,11,protocol,928378,29-06-2008,30-12-2016,85,95,7124
+103,madhan,TPL,07-07-2009,2,tester,10,network,928478,07-08-2009,30-12-2016,88,99,9054
+140,anandh,SA,29-12-2010,3,manager,11,protocol,928278,29-01-2011,29-06-2016,77,92,11248
+15,anu,SSA,09-11-2011,1,developer,12,security,928375,09-12-2011,29-05-2016,99,91,13245
+160,pramod,SE,14-10-2012,1,developer,13,configManagement,928478,14-11-2012,29-12-2016,86,93,5040
+107,gawrav,PL,22-09-2013,2,tester,12,security,928778,22-10-2013,15-11-2016,78,97,9574
+181,sibi,TL,15-08-2014,2,tester,14,Learning,928176,15-09-2014,29-05-2016,84,98,7245
+119,shivani,PL,12-05-2015,1,developer,10,network,928977,12-06-2015,12-11-2016,88,91,11254
+210,bill,PM,01-12-2015,3,manager,14,Learning,928479,01-01-2016,30-11-2016,75,94,13547

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index e77f5c3..a95bc01 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -66,11 +66,17 @@ object CarbonStore {
       loadMetadataDetailsSortedArray
           .filter(_.getVisibility.equalsIgnoreCase("true"))
           .map { load =>
+            val mergedTo = if (load.getMergedLoadName != null) {
+         load.getMergedLoadName
+       } else {
+         ""
+       }
             Row(
               load.getLoadName,
               load.getLoadStatus,
               new java.sql.Timestamp(load.getLoadStartTime),
-              new java.sql.Timestamp(load.getLoadEndTime)
+              new java.sql.Timestamp(load.getLoadEndTime),
+              mergedTo
             )
           }.toSeq
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7ec6b7b..5e46417 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -349,6 +349,14 @@ class CarbonScanRDD(
         CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
       CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName.toLowerCase
+    val tbName = identifier.getCarbonTableIdentifier.getTableName.toLowerCase
+    val segmentNumbersFromProperty = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*")
+    if (!segmentNumbersFromProperty.trim.equals("*")) {
+      CarbonTableInputFormat
+        .setSegmentsToAccess(conf, segmentNumbersFromProperty.split(",").toList.asJava)
+    }
     format
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index bf86aca..4acb82c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -85,7 +85,8 @@ case class ShowLoadsCommand(
     Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
       AttributeReference("Status", StringType, nullable = false)(),
       AttributeReference("Load Start Time", TimestampType, nullable = false)(),
-      AttributeReference("Load End Time", TimestampType, nullable = false)())
+      AttributeReference("Load End Time", TimestampType, nullable = false)(),
+      AttributeReference("Merged To", StringType, nullable = false)())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
new file mode 100644
index 0000000..b18ab78
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.mutation
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.HiveSessionCatalog
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Util for IUD common function
+ */
+object IUDCommonUtil {
+
+  /**
+   * iterates the plan  and check whether CarbonCommonConstants.CARBON_INPUT_SEGMENTS set
for any
+   * any table or not
+   * @param sparkSession
+   * @param logicalPlan
+   */
+  def checkIfSegmentListIsSet(sparkSession: SparkSession, logicalPlan: LogicalPlan): Unit
= {
+    val carbonProperties = CarbonProperties.getInstance()
+    logicalPlan.foreach {
+      case unresolvedRelation: UnresolvedRelation =>
+        val dbAndTb =
+          sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase
+
+          "." + unresolvedRelation.tableIdentifier.table
+        val segmentProperties = carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
+        if (!(segmentProperties.equals("") || segmentProperties.trim.equals("*"))) {
+          throw new MalformedCarbonCommandException("carbon.input.segments." + dbAndTb +
+                                                    "should not be set for table used in
DELETE " +
+                                                    "query. Please reset the property to
carbon" +
+                                                    ".input.segments." +
+                                                    dbAndTb + "=*")
+        }
+      case logicalRelation: LogicalRelation if (logicalRelation.relation
+        .isInstanceOf[CarbonDatasourceHadoopRelation]) =>
+        val dbAndTb =
+          logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+            .getDatabaseName + "." +
+          logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+            .getFactTableName
+        val sementProperty = carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbAndTb, "")
+        if (!(sementProperty.equals("") || sementProperty.trim.equals("*"))) {
+          throw new MalformedCarbonCommandException("carbon.input.segments." + dbAndTb +
+                                                    "should not be set for table used in
UPDATE " +
+                                                    "query. Please reset the property to
carbon" +
+                                                    ".input.segments." +
+                                                    dbAndTb + "=*")
+        }
+      case filter: Filter => filter.subqueries.toList
+        .foreach(subquery => checkIfSegmentListIsSet(sparkSession, subquery))
+      case _ =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index 10e6785..af971d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -42,6 +42,7 @@ private[sql] case class ProjectForDeleteCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+      IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val dataFrame = Dataset.ofRows(sparkSession, plan)
     //    dataFrame.show(truncate = false)
     //    dataFrame.collect().foreach(println)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 2088396..faeb3af 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -43,6 +43,7 @@ private[sql] case class ProjectForUpdateCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(ProjectForUpdateCommand.getClass.getName)
+    IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val res = plan find {
       case relation: LogicalRelation if relation.relation
         .isInstanceOf[CarbonDatasourceHadoopRelation] =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 1d8bb8a..56560fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -22,7 +22,9 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {
@@ -61,6 +63,16 @@ case class CarbonSetCommand(command: SetCommand)
         if (isCarbonProperty) {
           sessionParms.addProperty(key, value)
         }
+        else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+          if (key.split("\\.").length == 5) {
+            sessionParms.addProperty(key.toLowerCase(), value)
+          }
+          else {
+            throw new MalformedCarbonCommandException(
+              "property should be in \" carbon.input.segments.<database_name>" +
+              ".<table_name>=<seg_id list> \" format.")
+          }
+        }
       case _ =>
 
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
index 51b29a1..6c91e7e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
@@ -101,6 +101,11 @@ class CarbonSQLConf(sparkSession: SparkSession) {
         .doc("Property to configure data format for date type columns.")
         .stringConf
         .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+    val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+      "carbon.input.segments.<database_name>.<table_name>")
+      .doc("Property to configure the list of segments to query.").stringConf
+      .createWithDefault(carbonProperties
+        .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index 19d7dce..b98973e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -40,9 +40,9 @@ object ShowSegments {
   def showString(rows: Seq[Row]): String = {
     val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.s")
     val sb = new StringBuilder
-    sb.append("+------------+------------------+----------------------+----------------------+\n")
-      .append("|SegmentId   |Status            |Load Start Time       |Load End Time    
    |\n")
-      .append("+------------+------------------+----------------------+----------------------+\n")
+    sb.append("+------------+------------------+----------------------+----------------------+----------------------+\n")
+      .append("|SegmentId   |Status            |Load Start Time       |Load End Time    
    |Merged To             |\n")
+      .append("+------------+------------------+----------------------+----------------------+----------------------+\n")
       rows.foreach{row =>
         sb.append("|")
           .append(StringUtils.rightPad(row.getString(0), 12))
@@ -52,9 +52,11 @@ object ShowSegments {
           .append(sdf.format(row.getAs[java.sql.Timestamp](2)))
           .append("|")
           .append(sdf.format(row.getAs[java.sql.Timestamp](3)))
+          .append("|")
+          .append(StringUtils.rightPad(row.getString(4), 18))
           .append("|\n")
       }
-    sb.append("+------------+------------------+----------------------+----------------------+\n")
+    sb.append("+------------+------------------+----------------------+----------------------+----------------------+\n")
     sb.toString
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd0bdf6f/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
new file mode 100644
index 0000000..8309b34
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -0,0 +1,352 @@
+package org.apache.carbondata.spark.testsuite.segmentreading
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Created by rahul on 19/9/17.
+ */
+class TestSegmentReading extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    sql("drop table if exists carbon_table")
+    sql(
+      "create table carbon_table(empno int, empname String, designation String, doj Timestamp,"
+
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,"
+
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table OPTIONS
+          |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data1.csv' INTO TABLE carbon_table OPTIONS
+          |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+  }
+
+  test("test SET -V for segment reading property") {
+    try {
+      checkExistence(sql("SET -v"), true, "Property to configure the list of segments to
query.")
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test count(*) for segment reading property") {
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test SET propertyname for segment reading property") {
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("SET carbon.input.segments.default.carbon_table"),
+        Seq(Row("carbon.input.segments.default.carbon_table", "1"))
+      )
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("set valid segments and query from table") {
+    try {
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(20)))
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(20)))
+      sql("SET carbon.input.segments.default.carbon_table=0")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test Multiple times set segment") {
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=0")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+      sql("SET carbon.input.segments.default.carbon_table=1,0,1")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(20)))
+      sql("SET carbon.input.segments.default.carbon_table=2,0")
+      checkAnswer(sql("select count(*) from carbon_table"), Seq(Row(10)))
+      val trapped = intercept[Exception] {
+        sql("SET carbon.input.segments.default.carbon_table=2,a")
+      }
+      val trappedAgain = intercept[Exception] {
+        sql("SET carbon.input.segments.default.carbon_table=,")
+      }
+      assert(trapped.getMessage
+        .equalsIgnoreCase(
+          "carbon.input.segments.<database_name>.<table_name> value range is
not valid"))
+      assert(trappedAgain.getMessage
+        .equalsIgnoreCase("carbon.input.segments.<database_name>.<table_name>
value can't be empty."))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test filter with segment reading"){
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      checkAnswer(sql("select count(empno) from carbon_table where empno = 15"),Seq(Row(2)))
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select count(empno) from carbon_table where empno = 15"),Seq(Row(1)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test group by with segment reading") {
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      checkAnswer(sql("select empno,count(empname) from carbon_table where empno = 15 group
by empno"),Seq(Row(15,2)))
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select empno,count(empname) from carbon_table where empno = 15 group
by empno"),Seq(Row(15,1)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test join with segment reading"){
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      sql("drop table if exists carbon_table_join")
+      sql(
+        "create table carbon_table_join(empno int, empname String, designation String, doj
Timestamp," +
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql("insert into carbon_table_join select * from carbon_table").show()
+      checkAnswer(sql("select count(a.empno) from carbon_table a inner join carbon_table_join
b on a.empno = b.empno"),Seq(Row(22)))
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select count(a.empno) from carbon_table a inner join carbon_table_join
b on a.empno = b.empno"),Seq(Row(11)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test aggregation with segment reading") {
+    try {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      checkAnswer(sql("select sum(empno) from carbon_table"), Seq(Row(1411)))
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      checkAnswer(sql("select sum(empno) from carbon_table"), Seq(Row(1256)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test update query with segment reading"){
+    try {
+      sql("drop table if exists carbon_table_update")
+      sql(
+        "create table carbon_table_update(empno int, empname String, designation String,
doj Timestamp," +
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_update
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data1.csv' INTO TABLE carbon_table_update
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      intercept[MalformedCarbonCommandException]{
+        sql("update carbon_table_update a set(a.empname) = (select b.empname from carbon_table
b where a.empno=b.empno)").show()
+      }
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      sql("SET carbon.input.segments.default.carbon_table_update=1")
+      intercept[MalformedCarbonCommandException]{
+        sql("update carbon_table_update a set(a.empname) = (select b.empname from carbon_table
b where a.empno=b.empno)").show()
+      }
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      sql("SET carbon.input.segments.default.carbon_table_update=*")
+      checkAnswer(sql("select count(*) from carbon_table_update where empname='rahul'"),
Seq(Row(0)))
+      sql("update carbon_table_update a set(a.empname) = ('rahul')").show()
+      checkAnswer(sql("select count(*) from carbon_table_update where empname='rahul'"),
Seq(Row(20)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test delete query with segment reading"){
+    try {
+      sql("drop table if exists carbon_table_delete")
+      sql(
+        "create table carbon_table_delete(empno int, empname String, designation String,
doj Timestamp," +
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_delete
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_delete
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      sql("SET carbon.input.segments.default.carbon_table=1")
+      intercept[MalformedCarbonCommandException]{
+        sql("delete from carbon_table_delete where empno IN (select empno from carbon_table
where empname='ayushi')").show()
+      }
+      sql("SET carbon.input.segments.default.carbon_table_delete=1")
+      intercept[MalformedCarbonCommandException]{
+        sql("delete from carbon_table_delete where empno IN (select empno from carbon_table
where empname='ayushi')").show()
+      }
+
+      sql("SET carbon.input.segments.default.carbon_table=*")
+      sql("SET carbon.input.segments.default.carbon_table_delete=*")
+      checkAnswer(sql("select count(*) from carbon_table_delete"), Seq(Row(20)))
+      sql("delete from carbon_table_delete where empno IN (select empno from carbon_table
where empname='ayushi')").show()
+      checkAnswer(sql("select count(*) from carbon_table_delete"), Seq(Row(18)))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test show segments"){
+    try {
+      sql("drop table if exists carbon_table_show_seg")
+      sql(
+        "create table carbon_table_show_seg(empno int, empname String, designation String,
doj Timestamp," +
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      sql("alter table carbon_table_show_seg compact 'major'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+      val df = sql("SHOW SEGMENTS for table carbon_table_show_seg")
+      val col = df.collect().map{
+        row => Row(row.getString(0),row.getString(1),row.getString(4))
+      }.toSeq
+      assert(col.equals(Seq(Row("2","Success",""),
+        Row("1","Compacted","0.1"),
+        Row("0.1","Success",""),
+        Row("0","Compacted","0.1"))))
+    }
+    finally {
+      sql("SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+  test("test segment reading after compaction"){
+    sql("drop table if exists carbon_table_compact")
+    sql(
+      "create table carbon_table_compact(empno int, empname String, designation String, doj
Timestamp," +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,"
+
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_compact
OPTIONS
+          |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_compact
OPTIONS
+          |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    sql("alter table carbon_table_compact compact 'major'")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_compact
OPTIONS
+          |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    checkAnswer(sql("select count(*) from carbon_table_compact"),Seq(Row(30)))
+    sql(" SET carbon.input.segments.default.carbon_table_compact=0.1")
+    checkAnswer(sql("select count(*) from carbon_table_compact"),Seq(Row(20)))
+  }
+  test("set segment id then alter table name and check select query") {
+    try {
+      sql("drop table if exists carbon_table_alter")
+      sql("drop table if exists carbon_table_alter_new")
+      sql(
+        "create table carbon_table_alter(empno int, empname String, designation String, doj
" +
+        "Timestamp," +
+
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_alter
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
+          stripMargin)
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_alter
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
+          stripMargin)
+      checkAnswer(sql("select count(*) from carbon_table_alter"),
+        Seq(Row(20)))
+      sql(
+        "SET carbon.input.segments.default.carbon_table_alter=1")
+      checkAnswer(sql(
+        "select count(*) from carbon_table_alter"), Seq(Row(10)))
+      sql(
+        "alter table carbon_table_alter rename to carbon_table_alter_new")
+      checkAnswer(sql(
+        "select count(*) from carbon_table_alter_new")
+        , Seq(Row(20)))
+    }
+    finally {
+      sql(
+        "SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+
+  test("drop and recreate table to check segment reading") {
+    try {
+      sql("drop table if exists carbon_table_recreate")
+      sql(
+        "create table carbon_table_recreate(empno int, empname String, designation String,
doj " +
+        "Timestamp," +
+
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
+          stripMargin)
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
+          stripMargin)
+      checkAnswer(sql("select count(*) from carbon_table_recreate"),
+        Seq(Row(20)))
+      sql(
+        "SET carbon.input.segments.default.carbon_table_recreate=1")
+      checkAnswer(sql(
+        "select count(*) from carbon_table_recreate"), Seq(Row(10)))
+      sql("drop table if exists carbon_table_recreate")
+      sql(
+        "create table carbon_table_recreate(empno int, empname String, designation String,
doj " +
+        "Timestamp," +
+
+        "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,"
+
+        "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int," +
+        "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
+          stripMargin)
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_recreate
OPTIONS
+            |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".
+          stripMargin)
+      checkAnswer(sql(
+        "select count(*) from carbon_table_recreate"), Seq(Row(10)))
+    }
+    finally {
+      sql(
+        "SET carbon.input.segments.default.carbon_table=*")
+    }
+  }
+}


Mime
View raw message