carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-1859][CARBONDATA-1861][PARTITION] Support show and drop partitions
Date Wed, 20 Dec 2017 18:17:11 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 64b26513f -> 3ff55a2ee


[CARBONDATA-1859][CARBONDATA-1861][PARTITION] Support show and drop partitions

This closes #1674


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3ff55a2e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3ff55a2e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3ff55a2e

Branch: refs/heads/master
Commit: 3ff55a2eef1e2bc0e61ee909b62d485e5240243a
Parents: 64b2651
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Sat Dec 16 22:38:00 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Wed Dec 20 23:46:05 2017 +0530

----------------------------------------------------------------------
 .../indexstore/BlockletDataMapIndexStore.java   |  15 +-
 .../blockletindex/BlockletDataMap.java          |  26 ++-
 .../blockletindex/BlockletDataMapModel.java     |  13 +-
 .../core/metadata/PartitionMapFileStore.java    | 103 ++++++++++-
 .../hadoop/api/CarbonOutputCommitter.java       |  11 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   7 +-
 .../hadoop/ft/CarbonOutputMapperTest.java       |   5 +-
 .../StandardPartitionTableDropTestCase.scala    | 172 +++++++++++++++++++
 .../spark/rdd/CarbonDropPartitionRDD.scala      | 140 +++++++++++++++
 .../org/apache/spark/sql/CarbonCountStar.scala  |  14 +-
 .../command/mutation/DeleteExecution.scala      |  12 +-
 .../CarbonStandardAlterTableDropPartition.scala | 142 +++++++++++++++
 .../table/CarbonCreateTableCommand.scala        |  10 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  25 ++-
 .../iterator/CarbonOutputIteratorWrapper.java   |   4 +-
 16 files changed, 660 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 59af50b..f3e55e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -72,9 +72,9 @@ public class BlockletDataMapIndexStore
     BlockletDataMap dataMap = (BlockletDataMap) lruCache.get(lruCacheKey);
     if (dataMap == null) {
       try {
-        String segmentPath = CarbonTablePath
-            .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(),
-                identifier.getSegmentId());
+        String segmentPath = CarbonTablePath.getSegmentPath(
+            identifier.getAbsoluteTableIdentifier().getTablePath(),
+            identifier.getSegmentId());
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
         indexFileStore.readAllIIndexOfSegment(segmentPath);
         PartitionMapFileStore partitionFileStore = new PartitionMapFileStore();
@@ -111,9 +111,9 @@ public class BlockletDataMapIndexStore
               segmentIndexFileStoreMap.get(identifier.getSegmentId());
           PartitionMapFileStore partitionFileStore =
               partitionFileStoreMap.get(identifier.getSegmentId());
-          String segmentPath = CarbonTablePath
-              .getSegmentPath(identifier.getAbsoluteTableIdentifier().getTablePath(),
-                  identifier.getSegmentId());
+          String segmentPath = CarbonTablePath.getSegmentPath(
+              identifier.getAbsoluteTableIdentifier().getTablePath(),
+              identifier.getSegmentId());
           if (indexFileStore == null) {
             indexFileStore = new SegmentIndexFileStore();
             indexFileStore.readAllIIndexOfSegment(segmentPath);
@@ -185,7 +185,8 @@ public class BlockletDataMapIndexStore
       dataMap = new BlockletDataMap();
       dataMap.init(new BlockletDataMapModel(identifier.getFilePath(),
           indexFileStore.getFileData(identifier.getCarbonIndexFileName()),
-          partitionFileStore.getPartitions(identifier.getCarbonIndexFileName())));
+          partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()),
+          partitionFileStore.isPartionedSegment()));
       lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
           dataMap.getMemorySize());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 70bae32..f1188cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -105,6 +105,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private int[] columnCardinality;
 
+  private boolean isPartitionedSegment;
+
   @Override
   public void init(DataMapModel dataMapModel) throws IOException, MemoryException {
     long startTime = System.currentTimeMillis();
@@ -113,6 +115,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     List<DataFileFooter> indexInfo = fileFooterConverter
         .getIndexInfo(blockletDataMapInfo.getFilePath(), blockletDataMapInfo.getFileData());
+    isPartitionedSegment = blockletDataMapInfo.isPartitionedSegment();
     DataMapRowImpl summaryRow = null;
     for (DataFileFooter fileFooter : indexInfo) {
       List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
@@ -394,6 +397,14 @@ public class BlockletDataMap implements DataMap, Cacheable {
         new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()]));
   }
 
+  /**
+   * Creates the schema to store summary information or the information which can be stored only
+   * once per datamap. It stores datamap level max/min of each column and partition information of
+   * datamap
+   * @param segmentProperties
+   * @param partitions
+   * @throws MemoryException
+   */
   private void createSummarySchema(SegmentProperties segmentProperties, List<String> partitions)
       throws MemoryException {
     List<CarbonRowSchema> taskMinMaxSchemas = new ArrayList<>(2);
@@ -518,8 +529,15 @@ public class BlockletDataMap implements DataMap, Cacheable {
   }
 
   @Override public List<Blocklet> prune(FilterResolverIntf filterExp, List<String> partitions) {
+    // First get the partitions which are stored inside datamap.
     List<String> storedPartitions = getPartitions();
-    if (storedPartitions != null && storedPartitions.size() > 0 && filterExp != null) {
+    // if it has partitioned datamap but there is no partitioned information stored, it means
+    // partitions are dropped so return empty list.
+    if (isPartitionedSegment && (storedPartitions == null || storedPartitions.size() == 0)) {
+      return new ArrayList<>();
+    }
+    if (storedPartitions != null && storedPartitions.size() > 0) {
+      // Check the exact match of partition information inside the stored partitions.
       boolean found = false;
       if (partitions != null && partitions.size() > 0) {
         found = partitions.containsAll(storedPartitions);
@@ -528,6 +546,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
         return new ArrayList<>();
       }
     }
+    // Prune with filters if the partitions are existed in this datamap
     return prune(filterExp);
   }
 
@@ -705,16 +724,17 @@ public class BlockletDataMap implements DataMap, Cacheable {
   }
 
   private List<String> getPartitions() {
-    List<String> partitions = new ArrayList<>();
     DataMapRow unsafeRow = unsafeMemorySummaryDMStore.getUnsafeRow(0);
     if (unsafeRow.getColumnCount() > 2) {
+      List<String> partitions = new ArrayList<>();
       DataMapRow row = unsafeRow.getRow(2);
       for (int i = 0; i < row.getColumnCount(); i++) {
         partitions.add(
             new String(row.getByteArray(i), CarbonCommonConstants.DEFAULT_CHARSET_CLASS));
       }
+      return partitions;
     }
-    return partitions;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index c98ef33..704e0f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -20,16 +20,23 @@ import java.util.List;
 
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 
+/**
+ * It is the model object to keep the information to build or initialize BlockletDataMap.
+ */
 public class BlockletDataMapModel extends DataMapModel {
 
   private byte[] fileData;
 
   private List<String> partitions;
 
-  public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions) {
+  private boolean partitionedSegment;
+
+  public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions,
+      boolean partitionedSegment) {
     super(filePath);
     this.fileData = fileData;
     this.partitions = partitions;
+    this.partitionedSegment = partitionedSegment;
   }
 
   public byte[] getFileData() {
@@ -39,4 +46,8 @@ public class BlockletDataMapModel extends DataMapModel {
   public List<String> getPartitions() {
     return partitions;
   }
+
+  public boolean isPartitionedSegment() {
+    return partitionedSegment;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 853c729..8578cfe 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -25,6 +25,8 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +49,8 @@ import com.google.gson.Gson;
 public class PartitionMapFileStore {
 
   private Map<String, List<String>> partitionMap = new HashMap<>();
+
+  private boolean partionedSegment = false;
   /**
    * Write partitionmapp file to the segment folder with indexfilename and corresponding partitions.
    *
@@ -105,9 +109,9 @@ public class PartitionMapFileStore {
    * @param segmentPath
    * @throws IOException
    */
-  public void mergePartitionMapFiles(String segmentPath) throws IOException {
+  public void mergePartitionMapFiles(String segmentPath, String mergeFileName) throws IOException {
     CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
-    if (partitionFiles != null && partitionFiles.length > 1) {
+    if (partitionFiles != null && partitionFiles.length > 0) {
       PartitionMapper partitionMapper = null;
       for (CarbonFile file : partitionFiles) {
         PartitionMapper localMapper = readPartitionMap(file.getAbsolutePath());
@@ -119,10 +123,12 @@ public class PartitionMapFileStore {
         }
       }
       if (partitionMapper != null) {
-        String path = segmentPath + "/" + "mergedpartitions" + CarbonTablePath.PARTITION_MAP_EXT;
+        String path = segmentPath + "/" + mergeFileName + CarbonTablePath.PARTITION_MAP_EXT;
         writePartitionFile(partitionMapper, path);
         for (CarbonFile file : partitionFiles) {
-          FileFactory.deleteAllCarbonFilesOfDir(file);
+          if (!FileFactory.deleteAllCarbonFilesOfDir(file)) {
+            throw new IOException("Old partition map files cannot be deleted");
+          }
         }
       }
     }
@@ -146,7 +152,7 @@ public class PartitionMapFileStore {
    * @param partitionMapPath
    * @return
    */
-  public PartitionMapper readPartitionMap(String partitionMapPath) {
+  private PartitionMapper readPartitionMap(String partitionMapPath) {
     Gson gsonObjectToRead = new Gson();
     DataInputStream dataInputStream = null;
     BufferedReader buffReader = null;
@@ -176,9 +182,92 @@ public class PartitionMapFileStore {
   public void readAllPartitionsOfSegment(String segmentPath) {
     CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
     if (partitionFiles != null && partitionFiles.length > 0) {
+      partionedSegment = true;
+      int i = 0;
+      // Get the latest partition map file based on the timestamp of that file.
+      long [] partitionTimestamps = new long[partitionFiles.length];
       for (CarbonFile file : partitionFiles) {
-        PartitionMapper partitionMapper = readPartitionMap(file.getAbsolutePath());
-        partitionMap.putAll(partitionMapper.getPartitionMap());
+        partitionTimestamps[i++] =
+            Long.parseLong(file.getName().substring(
+                0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
+      }
+      Arrays.sort(partitionTimestamps);
+      PartitionMapper partitionMapper = readPartitionMap(
+          segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
+              + CarbonTablePath.PARTITION_MAP_EXT);
+      partitionMap.putAll(partitionMapper.getPartitionMap());
+    }
+  }
+
+  public boolean isPartionedSegment() {
+    return partionedSegment;
+  }
+
+  /**
+   * Drops the partitions from the partition mapper file of the segment and writes to a new file.
+   * @param segmentPath
+   * @param partitionsToDrop
+   * @param uniqueId
+   * @throws IOException
+   */
+  public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String uniqueId)
+      throws IOException {
+    readAllPartitionsOfSegment(segmentPath);
+    List<String> indexesToDrop = new ArrayList<>();
+    for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) {
+      for (String partition: partitionsToDrop) {
+        if (entry.getValue().contains(partition)) {
+          indexesToDrop.add(entry.getKey());
+        }
+      }
+    }
+    if (indexesToDrop.size() > 0) {
+      // Remove the indexes from partition map
+      for (String indexToDrop : indexesToDrop) {
+        partitionMap.remove(indexToDrop);
+      }
+      PartitionMapper mapper = new PartitionMapper();
+      mapper.setPartitionMap(partitionMap);
+      String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT;
+      writePartitionFile(mapper, path);
+    }
+  }
+
+  /**
+   * It deletes the old partition mapper files in case of success. And in case of failure it removes
+   * the old new file.
+   * @param segmentPath
+   * @param uniqueId
+   * @param success
+   */
+  public void commitPartitions(String segmentPath, final String uniqueId, boolean success) {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+    // write partition info to new file.
+    if (carbonFile.exists()) {
+      CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
+        }
+      });
+      CarbonFile latestFile = null;
+      for (CarbonFile mapFile: carbonFiles) {
+        if (mapFile.getName().startsWith(uniqueId)) {
+          latestFile = mapFile;
+        }
+      }
+      if (latestFile != null) {
+        for (CarbonFile mapFile : carbonFiles) {
+          if (latestFile != mapFile) {
+            // Remove old files in case of success scenario
+            if (success) {
+              mapFile.delete();
+            }
+          }
+        }
+      }
+      // If it is failure scenario then remove the new file.
+      if (!success && latestFile != null) {
+        latestFile.delete();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index ec42adf..08fd1ac 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 
@@ -75,16 +76,18 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     super.commitJob(context);
     boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration());
     CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration());
+    String segmentPath =
+        CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId());
+    // Merge all partition files into a single file.
+    new PartitionMapFileStore().mergePartitionMapFiles(segmentPath,
+        loadModel.getFactTimeStamp() + "");
     LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail();
     CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS,
         loadModel.getFactTimeStamp(), true);
     CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(),
         loadModel.getCarbonDataLoadSchema().getCarbonTable());
     CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
-    String segmentPath =
-        CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId());
-    // Merge all partition files into a single file.
-    new PartitionMapFileStore().mergePartitionMapFiles(segmentPath);
+    new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 7db9c0a..6a2349a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -42,7 +42,6 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap;
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -918,8 +917,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
    * @throws IOException
    * @throws KeyGenException
    */
-  public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier)
-      throws IOException, KeyGenException {
+  public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier,
+      List<String> partitions) throws IOException {
     TableDataMap blockletMap = DataMapStoreManager.getInstance()
         .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier);
@@ -931,7 +930,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     // TODO: currently only batch segment is supported, add support for streaming table
     List<String> filteredSegment = getFilteredSegment(job, allSegments.getValidSegments());
 
-    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, null);
+    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, null, partitions);
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
index 006ffd2..22a6e53 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java
@@ -58,12 +58,13 @@ public class CarbonOutputMapperTest extends TestCase {
     assert (file.exists());
     File[] listFiles = file.listFiles(new FilenameFilter() {
       @Override public boolean accept(File dir, String name) {
-        return name.endsWith(".carbondata") || name.endsWith(".carbonindex");
+        return name.endsWith(".carbondata") ||
+            name.endsWith(".carbonindex") ||
+            name.endsWith(".carbonindexmerge");
       }
     });
 
     assert (listFiles.length == 2);
-
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
new file mode 100644
index 0000000..9a9940b
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  test("show partitions on partition table") {
+    sql(
+      """
+        | CREATE TABLE partitionshow (designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int, empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionshow OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkExistence(sql(s"""SHOW PARTITIONS partitionshow"""), true, "empno=11", "empno=12")
+  }
+
+  test("droping on partition table for int partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(
+      sql(s"""select count (*) from partitionone"""),
+      sql(s"""select count (*) from originTable"""))
+
+    checkAnswer(
+      sql(s"""select count (*) from partitionone where empno=11"""),
+      sql(s"""select count (*) from originTable where empno=11"""))
+
+    sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""")
+
+    checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11")
+
+    checkAnswer(
+      sql(s"""select count (*) from partitionone where empno=11"""),
+      Seq(Row(0)))
+  }
+
+    test("dropping partition on table for more partition columns") {
+      sql(
+        """
+          | CREATE TABLE partitionmany (empno int, empname String, designation String,
+          |  workgroupcategory int, workgroupcategoryname String, deptno int,
+          |  projectjoindate Timestamp, projectenddate Date,attendance int,
+          |  utilization int,salary int)
+          | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+      sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""")
+      checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479")
+      checkAnswer(
+        sql(s"""select count (*) from partitionmany where deptname='Learning'"""),
+        Seq(Row(0)))
+    }
+
+  test("dropping all partition on table") {
+    sql(
+      """
+        | CREATE TABLE partitionall (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='configManagement')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='network')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""")
+    assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0)
+    checkAnswer(
+      sql(s"""select count (*) from partitionall"""),
+      Seq(Row(0)))
+  }
+
+  test("dropping static partition on table") {
+    sql(
+      """
+        | CREATE TABLE staticpartition (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,workgroupcategory int, empname String, designation String)
+        | PARTITIONED BY (deptname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into staticpartition PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+
+    checkExistence(sql(s"""SHOW PARTITIONS staticpartition"""), true, "deptname=software")
+    assert(sql(s"""SHOW PARTITIONS staticpartition""").collect().length == 1)
+    sql(s"""ALTER TABLE staticpartition DROP PARTITION(deptname='software')""")
+    checkAnswer(
+      sql(s"""select count (*) from staticpartition"""),
+      Seq(Row(0)))
+    sql(s"""insert into staticpartition select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation,deptname from originTable""")
+    checkExistence(sql(s"""SHOW PARTITIONS staticpartition"""), true, "deptname=protocol")
+    checkAnswer(
+      sql(s"""select count (*) from staticpartition"""),
+      sql(s"""select count (*) from originTable"""))
+
+  }
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists partitionone")
+    sql("drop table if exists partitionall")
+    sql("drop table if exists partitionmany")
+    sql("drop table if exists partitionshow")
+    sql("drop table if exists staticpartition")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
new file mode 100644
index 0000000..09d9da1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String)
+  extends Partition {
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * RDD to drop the partitions from partition mapper files of all segments.
+ * @param sc
+ * @param tablePath
+ * @param segments segments to be merged
+ */
+class CarbonDropPartitionRDD(
+    sc: SparkContext,
+    tablePath: String,
+    segments: Seq[String],
+    partitions: Seq[String],
+    uniqueId: String)
+  extends CarbonRDD[String](sc, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    segments.zipWithIndex.map {s =>
+      CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
+    }.toArray
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
+    val iter = new Iterator[String] {
+      val split = theSplit.asInstanceOf[CarbonDropPartition]
+      logInfo("Dropping partition information from : " + split.segmentPath)
+
+      new PartitionMapFileStore().dropPartitions(
+        split.segmentPath,
+        partitions.toList.asJava,
+        uniqueId)
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): String = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        ""
+      }
+
+    }
+    iter
+  }
+
+}
+
+/**
+ * This RDD is used for committing the partitions which were removed in before step. It just removes
+ * old mapper files and related data files.
+ * @param sc
+ * @param tablePath
+ * @param segments segments to be merged
+ */
+class CarbonDropPartitionRollbackRDD(
+    sc: SparkContext,
+    tablePath: String,
+    segments: Seq[String],
+    uniqueId: String)
+  extends CarbonRDD[String](sc, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    segments.zipWithIndex.map {s =>
+      CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
+    }.toArray
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
+    val iter = new Iterator[String] {
+      val split = theSplit.asInstanceOf[CarbonDropPartition]
+      logInfo("Commit partition information from : " + split.segmentPath)
+
+      new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, false)
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): String = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        ""
+      }
+
+    }
+    iter
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 0f745cc..833c6fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
@@ -24,9 +26,10 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafExecNode
+import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -36,6 +39,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 case class CarbonCountStar(
     attributesRaw: Seq[Attribute],
     carbonTable: CarbonTable,
+    sparkSession: SparkSession,
     outUnsafeRows: Boolean = true) extends LeafExecNode {
 
   override def doExecute(): RDD[InternalRow] = {
@@ -45,7 +49,13 @@ case class CarbonCountStar(
 
     // get row count
     val rowCount = CarbonUpdateUtil.getRowCount(
-      tableInputFormat.getBlockRowCount(job, absoluteTableIdentifier),
+      tableInputFormat.getBlockRowCount(
+        job,
+        absoluteTableIdentifier,
+        CarbonFilters.getPartitions(
+          Seq.empty,
+          sparkSession,
+          TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))).asJava),
       absoluteTableIdentifier)
     val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index b043698..d2e8789 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -92,8 +94,14 @@ object DeleteExecution {
     if (keyRdd.partitions.length == 0) {
       return true
     }
-
-    val blockMappingVO = carbonInputFormat.getBlockRowCount(job, absoluteTableIdentifier)
+    val blockMappingVO =
+      carbonInputFormat.getBlockRowCount(
+        job,
+        absoluteTableIdentifier,
+        CarbonFilters.getPartitions(
+          Seq.empty,
+          sparkSession,
+          TableIdentifier(tableName, databaseNameOp)).asJava)
     val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
new file mode 100644
index 0000000..b787aa7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.spark.rdd.{CarbonDropPartitionRDD, CarbonDropPartitionRollbackRDD}
+
+/**
+ * Drop the partitions from hive and carbon store. It drops the partitions in following steps
+ * 1. Drop the partitions from carbon store, it just create one new mapper file in each segment
+ * with uniqueid.
+ * 2. Drop partitions from hive.
+ * 3. In any above step fails then roll back the newly created files
+ * 4. After success of steps 1 and 2 , it commits the files by removing the old fails.
+ * Here it does not remove any data from store. During compaction the old data won't be considered.
+ * @param tableName
+ * @param specs
+ * @param ifExists
+ * @param purge
+ * @param retainData
+ */
+case class CarbonStandardAlterTableDropPartition(
+    tableName: TableIdentifier,
+    specs: Seq[TablePartitionSpec],
+    ifExists: Boolean,
+    purge: Boolean,
+    retainData: Boolean)
+  extends AtomicRunnableCommand {
+
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    if (table.isHivePartitionTable) {
+      try {
+        specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
+      } catch {
+        case e: Exception =>
+          if (!ifExists) {
+            throw e
+          } else {
+            log.warn(e.getMessage)
+            return Seq.empty[Row]
+          }
+      }
+
+      // Drop the partitions from hive.
+      AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData)
+        .run(sparkSession)
+    }
+    Seq.empty[Row]
+  }
+
+
+  override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+    AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists)
+    val msg = s"Got exception $exception when processing data of drop partition." +
+              "Adding back partitions to the metadata"
+    LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
+    Seq.empty[Row]
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    var locks = List.empty[ICarbonLock]
+    val uniqueId = System.currentTimeMillis().toString
+    try {
+      val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+        LockUsage.COMPACTION_LOCK,
+        LockUsage.DELETE_SEGMENT_LOCK,
+        LockUsage.DROP_TABLE_LOCK,
+        LockUsage.CLEAN_FILES_LOCK,
+        LockUsage.ALTER_PARTITION_LOCK)
+      locks = AlterTableUtil.validateTableAndAcquireLock(
+        table.getDatabaseName,
+        table.getTableName,
+        locksToBeAcquired)(sparkSession)
+      val partitionNames = specs.flatMap { f =>
+        f.map(k => k._1 + "=" + k._2)
+      }.toSet
+      val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
+        .getValidAndInvalidSegments.getValidSegments
+      try {
+        // First drop the partitions from partition mapper files of each segment
+        new CarbonDropPartitionRDD(sparkSession.sparkContext,
+          table.getTablePath,
+          segments.asScala,
+          partitionNames.toSeq,
+          uniqueId).collect()
+      } catch {
+        case e: Exception =>
+          // roll back the drop partitions from carbon store
+          new CarbonDropPartitionRollbackRDD(sparkSession.sparkContext,
+            table.getTablePath,
+            segments.asScala,
+            uniqueId).collect()
+          throw e
+      }
+      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
+        .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
+      CarbonUpdateUtil
+        .updateTableMetadataStatus(segmentSet,
+          table,
+          uniqueId,
+          true,
+          new util.ArrayList[String])
+      DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
+    } finally {
+      AlterTableUtil.releaseLocks(locks)
+    }
+    Seq.empty[Row]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index f86b35f..dac08e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -98,11 +98,11 @@ case class CarbonCreateTableCommand(
           val partitionString =
             if (partitionInfo != null &&
                 partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
-            s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
-              _.getColumnName).mkString(",")})"
-          } else {
-            ""
-          }
+              s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
+                _.getColumnName).mkString(",")})"
+            } else {
+              ""
+            }
           sparkSession.sql(
             s"""CREATE TABLE $dbName.$tableName
                |(${ rawSchema })

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 1e71714..ad519e6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -79,7 +79,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       case CountStarPlan(colAttr, PhysicalOperation(projectList, predicates, l: LogicalRelation))
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) =>
         val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-        CarbonCountStar(colAttr, relation.carbonTable) :: Nil
+        CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 21b81ce..eeadbf6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -23,12 +23,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
-import org.apache.spark.sql.execution.command.partition.CarbonShowCarbonPartitionsCommand
+import org.apache.spark.sql.execution.command.partition.{CarbonShowCarbonPartitionsCommand, CarbonStandardAlterTableDropPartition}
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
 import org.apache.spark.sql.execution.datasources.RefreshTable
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -158,10 +159,30 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(t)(sparkSession)
         if (isCarbonTable) {
-          ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil
+          val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+            .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+          if (!carbonTable.isHivePartitionTable) {
+            ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil
+          } else {
+            ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil
+          }
         } else {
           ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil
         }
+      case adp@AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession)
+        if (isCarbonTable) {
+          ExecutedCommandExec(
+            CarbonStandardAlterTableDropPartition(
+              tableName,
+              specs,
+              ifExists,
+              purge,
+              retainData)) :: Nil
+        } else {
+          ExecutedCommandExec(adp) :: Nil
+        }
       case set@SetCommand(kv) =>
         ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
       case reset@ResetCommand =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3ff55a2e/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 0cd1331..08d1497 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -70,7 +70,9 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
     if (loadBatch.isLoading()) {
       try {
         loadBatch.readyRead();
-        queue.put(loadBatch);
+        if (loadBatch.size > 0) {
+          queue.put(loadBatch);
+        }
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }


Mime
View raw message