carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-2642] Added configurable Lock path property
Date Fri, 06 Jul 2018 12:14:08 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master c82e3e85f -> cb10d03a7


[CARBONDATA-2642] Added configurable Lock path property

A new property is being exposed which will allow the user to configure the lock path "carbon.lock.path"
Refactored code to create a separate implementation for S3CarbonFile.

This closes #2642


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cb10d03a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cb10d03a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cb10d03a

Branch: refs/heads/master
Commit: cb10d03a79ae0aed7367250f5f7c2a0d50ea4dc7
Parents: c82e3e8
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Thu Jun 21 15:40:05 2018 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jul 6 17:42:33 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 +
 .../core/datamap/DataMapStoreManager.java       |   2 +-
 .../status/DiskBasedDataMapStatusProvider.java  |   2 +-
 .../filesystem/AbstractDFSCarbonFile.java       |  25 ++--
 .../datastore/filesystem/AlluxioCarbonFile.java |  13 --
 .../core/datastore/filesystem/CarbonFile.java   |   2 +-
 .../datastore/filesystem/HDFSCarbonFile.java    |  17 ---
 .../datastore/filesystem/LocalCarbonFile.java   |   4 +-
 .../core/datastore/filesystem/S3CarbonFile.java | 127 +++++++++++++++++++
 .../datastore/filesystem/ViewFSCarbonFile.java  |  13 --
 .../datastore/impl/DefaultFileTypeProvider.java |   7 +-
 .../core/datastore/impl/FileFactory.java        |  20 ++-
 .../core/datastore/impl/FileTypeInterface.java  |   1 -
 .../core/locks/CarbonLockFactory.java           |  62 ++++++---
 .../carbondata/core/locks/HdfsFileLock.java     |  17 ---
 .../carbondata/core/locks/LocalFileLock.java    |  10 --
 .../core/metadata/SegmentFileStore.java         |  10 +-
 .../core/metadata/schema/table/CarbonTable.java |   9 +-
 .../core/writer/CarbonIndexFileMergeWriter.java |   3 +-
 .../carbon/AbsoluteTableIdentifierTest.java     |   5 +-
 .../TestBlockletDataMapFactory.java             |   6 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   7 +-
 docs/configuration-parameters.md                |   1 +
 .../org/apache/carbondata/api/CarbonStore.scala |   4 +-
 .../spark/rdd/AlterTableDropColumnRDD.scala     |   1 -
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   1 +
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   2 +
 .../carbondata/spark/util/CommonUtil.scala      |   7 +-
 .../sql/test/ResourceRegisterAndCopier.scala    |   2 +-
 .../scala/org/apache/spark/util/FileUtils.scala |   6 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   5 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   1 -
 .../datamap/CarbonDropDataMapCommand.scala      |   8 +-
 .../command/table/CarbonDropTableCommand.scala  |  13 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |   3 +-
 .../carbondata/lcm/locks/LocalFileLockTest.java |  37 +++++-
 .../lcm/locks/ZooKeeperLockingTest.java         |   3 +-
 .../sdk/file/CarbonReaderBuilder.java           |   4 +-
 38 files changed, 300 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7470603..c420853 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -734,6 +734,13 @@ public final class CarbonCommonConstants {
   public static final String LOCK_TYPE = "carbon.lock.type";
 
   /**
+   * Specifies the path where the lock files have to be created.
+   * By default, lock files are created in table path.
+   */
+  @CarbonProperty
+  public static final String LOCK_PATH = "carbon.lock.path";
+
+  /**
    * ZOOKEEPER_ENABLE_DEFAULT the default value for zookeeper will be true for carbon
    */
   public static final String LOCK_TYPE_DEFAULT = "LOCALLOCK";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 0d3e40d..dd8af27 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -407,7 +407,7 @@ public final class DataMapStoreManager {
       try {
         carbonTable = CarbonTable
             .buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(),
-                identifier.getTablePath());
+                identifier.getTablePath(), identifier.getCarbonTableIdentifier().getTableId());
       } catch (IOException e) {
         LOGGER.error("failed to get carbon table from table Path");
         // ignoring exception

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
index 83e141d..41d98fb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
@@ -188,7 +188,7 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi
 
   private static ICarbonLock getDataMapStatusLock() {
     return CarbonLockFactory
-        .getCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
+        .getSystemLevelCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
             LockUsage.DATAMAP_STATUS_LOCK);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index 05f96c5..5128022 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -269,17 +269,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
       // append to a file only if file already exists else file not found
       // exception will be thrown by hdfs
       if (CarbonUtil.isFileExists(path)) {
-        if (FileFactory.FileType.S3 == fileType) {
-          DataInputStream dataInputStream = fileSystem.open(pt);
-          int count = dataInputStream.available();
-          // create buffer
-          byte[] byteStreamBuffer = new byte[count];
-          int bytesRead = dataInputStream.read(byteStreamBuffer);
-          stream = fileSystem.create(pt, true, bufferSize);
-          stream.write(byteStreamBuffer, 0, bytesRead);
-        } else {
-          stream = fileSystem.append(pt, bufferSize);
-        }
+        stream = fileSystem.append(pt, bufferSize);
       } else {
         stream = fileSystem.create(pt, true, bufferSize);
       }
@@ -461,7 +451,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     return fs.delete(path, true);
   }
 
-  @Override public boolean mkdirs(String filePath, FileFactory.FileType fileType)
+  @Override public boolean mkdirs(String filePath)
       throws IOException {
     filePath = filePath.replace("\\", "/");
     Path path = new Path(filePath);
@@ -549,8 +539,15 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
    */
   protected abstract CarbonFile[] getFiles(FileStatus[] listStatus);
 
-  protected abstract List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
-      throws IOException;
+  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
+      throws IOException {
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    while (listStatus.hasNext()) {
+      Path filePath = listStatus.next().getPath();
+      carbonFiles.add(FileFactory.getCarbonFile(filePath.toString()));
+    }
+    return carbonFiles;
+  }
 
   @Override
   public String[] getLocations() throws IOException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
index 61316f8..eabfa48 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AlluxioCarbonFile.java
@@ -27,9 +27,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 
@@ -70,17 +68,6 @@ public class AlluxioCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
-  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
-      throws IOException {
-    List<CarbonFile> carbonFiles = new ArrayList<>();
-    while (listStatus.hasNext()) {
-      Path filePath = listStatus.next().getPath();
-      carbonFiles.add(new AlluxioCarbonFile(filePath));
-    }
-    return carbonFiles;
-  }
-
-  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
index a104137..abfed37 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/CarbonFile.java
@@ -144,7 +144,7 @@ public interface CarbonFile {
 
   boolean deleteFile(String filePath, FileFactory.FileType fileType) throws IOException;
 
-  boolean mkdirs(String filePath, FileFactory.FileType fileType) throws IOException;
+  boolean mkdirs(String filePath) throws IOException;
 
   DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
       throws IOException;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
index fc5420d..ee1388f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFile.java
@@ -27,9 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 
 public class HDFSCarbonFile extends AbstractDFSCarbonFile {
@@ -76,17 +74,6 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
-  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
-      throws IOException {
-    List<CarbonFile> carbonFiles = new ArrayList<>();
-    while (listStatus.hasNext()) {
-      Path filePath = listStatus.next().getPath();
-      carbonFiles.add(new HDFSCarbonFile(filePath));
-    }
-    return carbonFiles;
-  }
-
-  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {
@@ -120,10 +107,6 @@ public class HDFSCarbonFile extends AbstractDFSCarbonFile {
         ((DistributedFileSystem) fs).rename(fileStatus.getPath(), new Path(changetoName),
             org.apache.hadoop.fs.Options.Rename.OVERWRITE);
         return true;
-      } else if (fileStatus.getPath().toString().startsWith("s3n")
-          || fileStatus.getPath().toString().startsWith("s3a")) {
-        fs.delete(new Path(changetoName), true);
-        return fs.rename(fileStatus.getPath(), new Path(changetoName));
       } else {
         return fs.rename(fileStatus.getPath(), new Path(changetoName));
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
index 5b6f657..f0794f4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java
@@ -433,10 +433,10 @@ public class LocalCarbonFile implements CarbonFile {
     return FileFactory.deleteAllFilesOfDir(file);
   }
 
-  @Override public boolean mkdirs(String filePath, FileFactory.FileType fileType)
+  @Override public boolean mkdirs(String filePath)
       throws IOException {
     filePath = filePath.replace("\\", "/");
-    filePath = FileFactory.getUpdatedFilePath(filePath, fileType);
+    filePath = FileFactory.getUpdatedFilePath(filePath);
     File file = new File(filePath);
     return file.mkdirs();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java
new file mode 100644
index 0000000..8c80065
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/S3CarbonFile.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.filesystem;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class S3CarbonFile extends HDFSCarbonFile {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(HDFSCarbonFile.class.getName());
+
+  public S3CarbonFile(String filePath) {
+    super(filePath);
+  }
+
+  public S3CarbonFile(String filePath, Configuration hadoopConf) {
+    super(filePath, hadoopConf);
+  }
+
+  public S3CarbonFile(Path path, Configuration hadoopConf) {
+    super(path, hadoopConf);
+  }
+
+  /**
+    TODO: The current implementation of renameForce is not correct as it deletes the destination
+          object and then performs rename(copy).
+          If the copy fails then there is not way to recover the old file.
+           This can happen when tablestatus.write is renamed to tablestatus.
+          One solution can be to read the content and rewrite the file as write with the same name
+          will overwrite the file by default. Need to discuss.
+          Refer CARBONDATA-2670 for tracking this.
+   */
+  @Override
+  public boolean renameForce(String changetoName) {
+    FileSystem fs;
+    try {
+      deleteFile(changetoName, FileFactory.getFileType(changetoName));
+      fs = fileStatus.getPath().getFileSystem(hadoopConf);
+      return fs.rename(fileStatus.getPath(), new Path(changetoName));
+    } catch (IOException e) {
+      LOGGER.error("Exception occured: " + e.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * @param listStatus
+   * @return
+   */
+  @Override
+  protected CarbonFile[] getFiles(FileStatus[] listStatus) {
+    if (listStatus == null) {
+      return new CarbonFile[0];
+    }
+    CarbonFile[] files = new CarbonFile[listStatus.length];
+    for (int i = 0; i < files.length; i++) {
+      files[i] = new HDFSCarbonFile(listStatus[i]);
+    }
+    return files;
+  }
+
+  @Override
+  public DataOutputStream getDataOutputStreamUsingAppend(String path, FileFactory.FileType fileType)
+      throws IOException {
+    return getDataOutputStream(path, fileType, CarbonCommonConstants.BYTEBUFFER_SIZE, true);
+  }
+
+  @Override public DataOutputStream getDataOutputStream(String path, FileFactory.FileType fileType,
+      int bufferSize, boolean append) throws IOException {
+    Path pt = new Path(path);
+    FileSystem fileSystem = pt.getFileSystem(FileFactory.getConfiguration());
+    FSDataOutputStream stream;
+    if (append) {
+      // append to a file only if file already exists else file not found
+      // exception will be thrown by hdfs
+      if (CarbonUtil.isFileExists(path)) {
+        DataInputStream dataInputStream = fileSystem.open(pt);
+        int count = dataInputStream.available();
+        // create buffer
+        byte[] byteStreamBuffer = new byte[count];
+        int bytesRead = dataInputStream.read(byteStreamBuffer);
+        stream = fileSystem.create(pt, true, bufferSize);
+        stream.write(byteStreamBuffer, 0, bytesRead);
+      } else {
+        stream = fileSystem.create(pt, true, bufferSize);
+      }
+    } else {
+      stream = fileSystem.create(pt, true, bufferSize);
+    }
+    return stream;
+  }
+
+  @Override
+  public CarbonFile getParentFile() {
+    Path parent = fileStatus.getPath().getParent();
+    return null == parent ? null : new S3CarbonFile(parent, hadoopConf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
index c6c5206..6650b9c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/ViewFSCarbonFile.java
@@ -26,9 +26,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.viewfs.ViewFileSystem;
 
 public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
@@ -67,17 +65,6 @@ public class ViewFSCarbonFile extends AbstractDFSCarbonFile {
   }
 
   @Override
-  protected List<CarbonFile> getFiles(RemoteIterator<LocatedFileStatus> listStatus)
-      throws IOException {
-    List<CarbonFile> carbonFiles = new ArrayList<>();
-    while (listStatus.hasNext()) {
-      Path filePath = listStatus.next().getPath();
-      carbonFiles.add(new ViewFSCarbonFile(filePath));
-    }
-    return carbonFiles;
-  }
-
-  @Override
   public CarbonFile[] listFiles(final CarbonFileFilter fileFilter) {
     CarbonFile[] files = listFiles();
     if (files != null && files.length >= 1) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
index f54e9af..c4761c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/DefaultFileTypeProvider.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.filesystem.AlluxioCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.HDFSCarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.S3CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.ViewFSCarbonFile;
 
 import org.apache.hadoop.conf.Configuration;
@@ -47,8 +48,9 @@ public class DefaultFileTypeProvider implements FileTypeInterface {
       case LOCAL:
         return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
       case HDFS:
-      case S3:
         return new HDFSCarbonFile(path);
+      case S3:
+        return new S3CarbonFile(path);
       case ALLUXIO:
         return new AlluxioCarbonFile(path);
       case VIEWFS:
@@ -63,8 +65,9 @@ public class DefaultFileTypeProvider implements FileTypeInterface {
       case LOCAL:
         return new LocalCarbonFile(FileFactory.getUpdatedFilePath(path, fileType));
       case HDFS:
-      case S3:
         return new HDFSCarbonFile(path, conf);
+      case S3:
+        return new S3CarbonFile(path, conf);
       case ALLUXIO:
         return new AlluxioCarbonFile(path);
       case VIEWFS:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 5c46bcf..e353623 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -88,9 +88,9 @@ public final class FileFactory {
   public static CarbonFile getCarbonFile(String path, FileType fileType) {
     return fileFileTypeInterface.getCarbonFile(path, fileType);
   }
-  public static CarbonFile getCarbonFile(String path, FileType fileType,
+  public static CarbonFile getCarbonFile(String path,
       Configuration hadoopConf) {
-    return fileFileTypeInterface.getCarbonFile(path, fileType, hadoopConf);
+    return fileFileTypeInterface.getCarbonFile(path, getFileType(path), hadoopConf);
   }
 
   public static DataInputStream getDataInputStream(String path, FileType fileType)
@@ -256,7 +256,11 @@ public final class FileFactory {
   }
 
   public static boolean mkdirs(String filePath, FileType fileType) throws IOException {
-    return getCarbonFile(filePath).mkdirs(filePath, fileType);
+    return getCarbonFile(filePath).mkdirs(filePath);
+  }
+
+  public static boolean mkdirs(String filePath, Configuration configuration) throws IOException {
+    return getCarbonFile(filePath, configuration).mkdirs(filePath);
   }
 
   /**
@@ -269,15 +273,7 @@ public final class FileFactory {
    */
   public static DataOutputStream getDataOutputStreamUsingAppend(String path, FileType fileType)
       throws IOException {
-    if (FileType.S3 == fileType) {
-      CarbonFile carbonFile = getCarbonFile(path);
-      if (carbonFile.exists()) {
-        carbonFile.delete();
-      }
-      return carbonFile.getDataOutputStream(path,fileType);
-    } else {
-      return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
-    }
+    return getCarbonFile(path).getDataOutputStreamUsingAppend(path, fileType);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
index 84da148..358d2ef 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileTypeInterface.java
@@ -27,6 +27,5 @@ public interface FileTypeInterface {
   FileReader getFileHolder(FileFactory.FileType fileType);
   CarbonFile getCarbonFile(String path, FileFactory.FileType fileType);
   CarbonFile getCarbonFile(String path, FileFactory.FileType fileType, Configuration configuration);
-
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
index 3226a63..769e752 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockFactory.java
@@ -39,6 +39,10 @@ public class CarbonLockFactory {
    */
   private static String lockTypeConfigured;
 
+  private static String lockPath = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.LOCK_PATH, "")
+      .toLowerCase();
+
   static {
     CarbonLockFactory.getLockTypeConfigured();
   }
@@ -52,44 +56,56 @@ public class CarbonLockFactory {
    */
   public static ICarbonLock getCarbonLockObj(AbsoluteTableIdentifier absoluteTableIdentifier,
       String lockFile) {
-
-    String tablePath = absoluteTableIdentifier.getTablePath();
+    String absoluteLockPath;
+    if (lockPath.isEmpty()) {
+      absoluteLockPath = absoluteTableIdentifier.getTablePath();
+    } else {
+      if (absoluteTableIdentifier
+          .getCarbonTableIdentifier().getTableId().isEmpty()) {
+        throw new RuntimeException("Table id is empty");
+      }
+      absoluteLockPath =
+          getLockpath(absoluteTableIdentifier.getCarbonTableIdentifier().getTableId());
+    }
     if (lockTypeConfigured.equals(CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER)) {
-      return new ZooKeeperLocking(absoluteTableIdentifier, lockFile);
-    } else if (tablePath.startsWith(CarbonCommonConstants.S3A_PREFIX) ||
-        tablePath.startsWith(CarbonCommonConstants.S3N_PREFIX) ||
-            tablePath.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      return new ZooKeeperLocking(absoluteLockPath, lockFile);
+    } else if (absoluteLockPath.startsWith(CarbonCommonConstants.S3A_PREFIX) || absoluteLockPath
+        .startsWith(CarbonCommonConstants.S3N_PREFIX) || absoluteLockPath
+        .startsWith(CarbonCommonConstants.S3_PREFIX)) {
       lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_S3;
-      return new S3FileLock(absoluteTableIdentifier, lockFile);
-    } else if (tablePath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+      return new S3FileLock(absoluteLockPath,
+          lockFile);
+    } else if (absoluteLockPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
       lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS;
-      return new HdfsFileLock(absoluteTableIdentifier, lockFile);
+      return new HdfsFileLock(absoluteLockPath, lockFile);
     } else {
       lockTypeConfigured = CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL;
-      return new LocalFileLock(absoluteTableIdentifier, lockFile);
+      return new LocalFileLock(absoluteLockPath, lockFile);
     }
   }
 
   /**
-   *
+   * If user has configured carbon.lock.path the same property will be used to store lock files.
+   * If not configured then use locFileLocation parameter.
    * @param locFileLocation
-   * @param lockFile
    * @return carbon lock
    */
-  public static ICarbonLock getCarbonLockObj(String locFileLocation, String lockFile) {
+  public static ICarbonLock getSystemLevelCarbonLockObj(String locFileLocation, String lockFile) {
+    String lockFileLocation;
+    if (lockPath.isEmpty()) {
+      lockFileLocation = locFileLocation;
+    } else {
+      lockFileLocation = getLockpath("1");
+    }
     switch (lockTypeConfigured) {
       case CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL:
-        return new LocalFileLock(locFileLocation, lockFile);
-
+        return new LocalFileLock(lockFileLocation, lockFile);
       case CarbonCommonConstants.CARBON_LOCK_TYPE_ZOOKEEPER:
-        return new ZooKeeperLocking(locFileLocation, lockFile);
-
+        return new ZooKeeperLocking(lockFileLocation, lockFile);
       case CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS:
-        return new HdfsFileLock(locFileLocation, lockFile);
-
+        return new HdfsFileLock(lockFileLocation, lockFile);
       case CarbonCommonConstants.CARBON_LOCK_TYPE_S3:
-        return new S3FileLock(locFileLocation, lockFile);
-
+        return new S3FileLock(lockFileLocation, lockFile);
       default:
         throw new UnsupportedOperationException("Not supported the lock type");
     }
@@ -105,4 +121,8 @@ public class CarbonLockFactory {
     LOGGER.info("Configured lock type is: " + lockTypeConfigured);
   }
 
+  public static String getLockpath(String tableId) {
+    return lockPath + CarbonCommonConstants.FILE_SEPARATOR + tableId;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index 3c28f9d..ade4212 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
@@ -57,22 +56,6 @@ public class HdfsFileLock extends AbstractCarbonLock {
     initRetry();
   }
 
-  /**
-   * @param lockFilePath
-   */
-  public HdfsFileLock(String lockFilePath) {
-    this.lockFilePath = lockFilePath;
-    initRetry();
-  }
-
-  /**
-   * @param absoluteTableIdentifier
-   * @param lockFile
-   */
-  public HdfsFileLock(AbsoluteTableIdentifier absoluteTableIdentifier, String lockFile) {
-    this(absoluteTableIdentifier.getTablePath(), lockFile);
-  }
-
   /* (non-Javadoc)
    * @see org.apache.carbondata.core.locks.ICarbonLock#lock()
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 6983562..1148ae2 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -27,7 +27,6 @@ import java.nio.file.StandardOpenOption;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -73,15 +72,6 @@ public class LocalFileLock extends AbstractCarbonLock {
   }
 
   /**
-   * @param tableIdentifier
-   * @param lockFile
-   */
-  public LocalFileLock(AbsoluteTableIdentifier tableIdentifier, String lockFile) {
-    this(tableIdentifier.getTablePath(), lockFile);
-    initRetry();
-  }
-
-  /**
    * Lock API for locking of the file channel of the lock file.
    *
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 0b1c1e3..3d3b245 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -93,7 +93,7 @@ public class SegmentFileStore {
     String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
     CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
     if (!carbonFile.exists()) {
-      carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+      carbonFile.mkdirs(writePath);
     }
     CarbonFile tempFolder =
         FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
@@ -178,7 +178,7 @@ public class SegmentFileStore {
       String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
       if (!carbonFile.exists()) {
-        carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder));
+        carbonFile.mkdirs(segmentFileFolder);
       }
       String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
       // write segment info to new file.
@@ -280,8 +280,8 @@ public class SegmentFileStore {
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
-      throws IOException {
+  public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile,
+      String tableId) throws IOException {
     boolean status = false;
     String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
     if (!FileFactory.isFileExist(tableStatusPath)) {
@@ -289,7 +289,7 @@ public class SegmentFileStore {
     }
     String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.from(tablePath, null, null);
+        AbsoluteTableIdentifier.from(tablePath, null, null, tableId);
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     int retryCount = CarbonLockUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 6f6ef81..be42f3f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -271,11 +271,12 @@ public class CarbonTable implements Serializable {
     return CarbonTable.buildFromTableInfo(tableInfoInfer);
   }
 
-  public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath)
-      throws IOException {
-    return SchemaReader
-        .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, dbName, tableName));
+  public static CarbonTable buildFromTablePath(String tableName, String dbName, String tablePath,
+      String tableId) throws IOException {
+    return SchemaReader.readCarbonTableFromStore(
+        AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableId));
   }
+
   /**
    * @param tableInfo
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index cb53c0b..c293064 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -154,7 +154,8 @@ public class CarbonIndexFileMergeWriter {
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path);
-    SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName);
+    SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName,
+        table.getCarbonTableIdentifier().getTableId());
 
     for (CarbonFile file : indexFiles) {
       file.delete();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
index f34008d..3dfc515 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.carbon;
 
+import java.util.UUID;
+
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 
@@ -93,7 +95,8 @@ public class AbsoluteTableIdentifierTest {
 
   @Test public void fromTablePathTest() {
     AbsoluteTableIdentifier absoluteTableIdentifierTest =
-        AbsoluteTableIdentifier.from("storePath/databaseName/tableName", "databaseName", "tableName");
+        AbsoluteTableIdentifier.from("storePath/databaseName/tableName", "databaseName", "tableName",
+            UUID.randomUUID().toString());
     Assert.assertTrue(absoluteTableIdentifierTest.getTablePath()
         .equals(absoluteTableIdentifier4.getTablePath()));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
index fa7bf08..8c234b5 100644
--- a/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
+++ b/core/src/test/java/org/apache/carbondata/core/indexstore/blockletindex/TestBlockletDataMapFactory.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
@@ -71,8 +72,9 @@ public class TestBlockletDataMapFactory {
             .getDeclaredConstructors()[0];
     constructor.setAccessible(true);
     carbonTable = (CarbonTable) constructor.newInstance();
-    absoluteTableIdentifier =
-        AbsoluteTableIdentifier.from("/opt/store/default/carbon_table/", "default", "carbon_table");
+    absoluteTableIdentifier = AbsoluteTableIdentifier
+        .from("/opt/store/default/carbon_table/", "default", "carbon_table",
+            UUID.randomUUID().toString());
     Deencapsulation.setField(tableInfo, "identifier", absoluteTableIdentifier);
     Deencapsulation.setField(carbonTable, "tableInfo", tableInfo);
     blockletDataMapFactory = new BlockletDataMapFactory(carbonTable, new DataMapSchema());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 741e5fb..57c645f 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -337,9 +337,10 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa
 
   @Override
   public void clear() {
-    List<String> segments = new ArrayList<>(segmentMap.keySet());
-    for (String segmentId : segments) {
-      clear(new Segment(segmentId, null, null));
+    if (segmentMap.size() > 0) {
+      for (String segmentId : segmentMap.keySet().toArray(new String[segmentMap.size()])) {
+        clear(new Segment(segmentId, null, null));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index f81959e..c46157c 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -106,6 +106,7 @@ This section provides the details of all the configurations required for CarbonD
 |---------------------------------------------|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | carbon.sort.file.write.buffer.size | 16384 | File write buffer size used during sorting. Minimum allowed buffer size is 10240 byte and Maximum allowed buffer size is 10485760 byte. |
 | carbon.lock.type | LOCALLOCK | This configuration specifies the type of lock to be acquired during concurrent operations on table. There are following types of lock implementation: - LOCALLOCK: Lock is created on local file system as file. This lock is useful when only one spark driver (thrift server) runs on a machine and no other CarbonData spark application is launched concurrently. - HDFSLOCK: Lock is created on HDFS file system as file. This lock is useful when multiple CarbonData spark applications are launched and no ZooKeeper is running on cluster and HDFS supports file based locking. |
+| carbon.lock.path | TABLEPATH | This configuration specifies the path where lock files have to be created. Recommended to configure zookeeper lock type or configure HDFS lock path(to this property) in case of S3 file system as locking is not feasible on S3.
 | carbon.sort.intermediate.files.limit | 20 | Minimum number of intermediate files after which merged sort can be started (minValue = 2, maxValue=50). |
 | carbon.block.meta.size.reserved.percentage | 10 | Space reserved in percentage for writing block meta data in CarbonData file. |
 | carbon.csv.read.buffersize.byte | 1048576 | csv reading buffer size. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 456916f..26c32ee 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -142,7 +142,7 @@ object CarbonStore {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     var carbonCleanFilesLock: ICarbonLock = null
     val absoluteTableIdentifier = if (forceTableClean) {
-      AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
     } else {
       carbonTable.getAbsoluteTableIdentifier
     }
@@ -317,7 +317,7 @@ object CarbonStore {
       tableName: String,
       storePath: String,
       segmentId: String): Boolean = {
-    val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName)
+    val identifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName, tableName)
     val validAndInvalidSegments: SegmentStatusManager.ValidAndInvalidSegmentsInfo = new
         SegmentStatusManager(
           identifier).getValidAndInvalidSegments

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index e14524e..0dbb4f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -21,7 +21,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 2fca57e..401ba29 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.CarbonException

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index b985459..54a7530 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -25,6 +25,8 @@ import scala.reflect.ClassTag
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.metadata.schema.table.TableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index de1ac49..2f08d07 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -824,19 +824,18 @@ object CommonUtil {
             if (tableFolder.isDirectory) {
               val tablePath = databaseLocation +
                               CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
-              val identifier =
-                AbsoluteTableIdentifier.from(tablePath, dbName, tableFolder.getName)
+              val tableUniqueName = dbName + "_" + tableFolder.getName
               val tableStatusFile =
                 CarbonTablePath.getTableStatusFilePath(tablePath)
               if (FileFactory.isFileExist(tableStatusFile, fileType)) {
                 try {
                   val carbonTable = CarbonMetadata.getInstance
-                    .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
+                    .getCarbonTable(tableUniqueName)
                   SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +
-                                s"${ identifier.getCarbonTableIdentifier.getTableUniqueName }")
+                                s"${ tableUniqueName }")
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index aeabdf1..e5552db 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -49,7 +49,7 @@ object ResourceRegisterAndCopier {
       sys.error(s"""Provided path $hdfsPath does not exist""")
     }
     LOGGER.audit("Try downloading resource data")
-    val lock = new HdfsFileLock(hdfsPath + "/resource.lock")
+    val lock = new HdfsFileLock(hdfsPath, "/resource.lock")
     var bool = false
     try {
       bool = lockWithRetries(lock)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
index 12ce17f..db63f6e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -75,8 +75,7 @@ object FileUtils {
       val filePaths = inputPath.split(",")
       for (i <- 0 until filePaths.size) {
         val filePath = CarbonUtil.checkAndAppendHDFSUrl(filePaths(i))
-        val fileType = FileFactory.getFileType(filePath)
-        val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType, hadoopConf)
+        val carbonFile = FileFactory.getCarbonFile(filePaths(i), hadoopConf)
         if (!carbonFile.exists()) {
           throw new DataLoadingException(
             s"The input file does not exist: ${CarbonUtil.removeAKSK(filePaths(i))}" )
@@ -99,8 +98,7 @@ object FileUtils {
     } else {
       val filePaths = inputPath.split(",")
       for (i <- 0 until filePaths.size) {
-        val fileType = FileFactory.getFileType(filePaths(i))
-        val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType, hadoopConfiguration)
+        val carbonFile = FileFactory.getCarbonFile(filePaths(i), hadoopConfiguration)
         size = size + carbonFile.getSize
       }
       size

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5d53ccc..2334871 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -99,7 +99,7 @@ object CarbonDataRDDFactory {
       CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
 
     configuredMdtPath = CarbonUtil.checkAndAppendFileSystemURIScheme(configuredMdtPath)
-    val lock = CarbonLockFactory.getCarbonLockObj(
+    val lock = CarbonLockFactory.getSystemLevelCarbonLockObj(
       configuredMdtPath + CarbonCommonConstants.FILE_SEPARATOR +
         CarbonCommonConstants.SYSTEM_LEVEL_COMPACTION_LOCK_FOLDER,
       LockUsage.SYSTEMLEVEL_COMPACTION_LOCK)
@@ -519,7 +519,8 @@ object CarbonDataRDDFactory {
       SegmentFileStore.updateSegmentFile(
         carbonTable.getTablePath,
         carbonLoadModel.getSegmentId,
-        segmentFileName)
+        segmentFileName,
+        carbonTable.getCarbonTableIdentifier.getTableId)
       operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
         carbonLoadModel.getSegmentId)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 30cb464..c59bb08 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -332,7 +332,6 @@ object CarbonSession {
             sparkConf.setAppName(randomAppName)
           }
           val sc = SparkContext.getOrCreate(sparkConf)
-          CarbonInputFormatUtil.setS3Configurations(sc.hadoopConfiguration)
           // maybe this is an existing SparkContext, update its SparkConf which maybe used
           // by SparkSession
           options.foreach { case (k, v) => sc.conf.set(k, v) }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 722119e..bae00ee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -66,8 +66,6 @@ case class CarbonDropDataMapCommand(
       val carbonEnv = CarbonEnv.getInstance(sparkSession)
       val catalog = carbonEnv.carbonMetastore
       val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
-      val tableIdentifier =
-        AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
       catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
       if (mainTable == null) {
         mainTable = try {
@@ -79,6 +77,12 @@ case class CarbonDropDataMapCommand(
             null
         }
       }
+      val tableIdentifier =
+        AbsoluteTableIdentifier
+          .from(tablePath,
+            dbName.toLowerCase,
+            tableName.toLowerCase,
+            mainTable.getCarbonTableIdentifier.getTableId)
       // forceDrop will be true only when parent table schema updation has failed.
       // This method will forcefully drop child table instance from metastore.
       if (forceDrop) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 776750b..5d74c2c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -28,14 +28,15 @@ import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.events._
 
 case class CarbonDropTableCommand(
@@ -179,6 +180,14 @@ case class CarbonDropTableCommand(
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }
+      // Delete lock directory if external lock path is specified.
+      if (CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_PATH, "").toLowerCase
+        .nonEmpty) {
+        val tableLockPath = CarbonLockFactory
+          .getLockpath(carbonTable.getCarbonTableIdentifier.getTableId)
+        val file = FileFactory.getCarbonFile(tableLockPath)
+        CarbonUtil.deleteFoldersAndFilesSilent(file)
+      }
       if (carbonTable.hasDataMapSchema && childDropCommands.nonEmpty) {
         // drop all child tables
         childDropCommands.foreach(_.processData(sparkSession))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 900f54c..5254933 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -386,7 +386,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
-      val isDirCreated = FileFactory.mkdirs(schemaMetadataPath, fileType)
+      val isDirCreated = FileFactory
+        .mkdirs(schemaMetadataPath, SparkSession.getActiveSession.get.sessionState.newHadoopConf())
       if (!isDirCreated) {
         throw new IOException(s"Failed to create the metadata directory $schemaMetadataPath")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
index 8d5f3d4..4f9f76c 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
@@ -17,7 +17,12 @@
 package org.apache.carbondata.lcm.locks;
 
 import java.io.File;
+import java.lang.reflect.Field;
+import java.util.UUID;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.locks.LocalFileLock;
 import org.apache.carbondata.core.locks.LockUsage;
@@ -32,11 +37,15 @@ import org.junit.Test;
  */
 public class LocalFileLockTest {
 
+  private  String rootPath;
+
+  private Class<?> secretClass = CarbonLockFactory.class;
+
   /**
    * @throws java.lang.Exception
    */
   @Before public void setUp() throws Exception {
-    String rootPath = new File(this.getClass().getResource("/").getPath()
+    rootPath = new File(this.getClass().getResource("/").getPath()
         + "../../..").getCanonicalPath();
     String storeLocation = rootPath + "/target/store";
     CarbonProperties.getInstance()
@@ -47,19 +56,22 @@ public class LocalFileLockTest {
    * @throws java.lang.Exception
    */
   @After public void tearDown() throws Exception {
+    Field f = secretClass.getDeclaredField("lockPath");
+    f.setAccessible(true);
+    f.set(secretClass, "");
   }
 
   @Test public void testingLocalFileLockingByAcquiring2Locks() {
 
     AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
         .from(CarbonProperties.getInstance().getProperty("carbon.storelocation"), "databaseName",
-            "tableName");
+            "tableName", UUID.randomUUID().toString());
     LocalFileLock localLock1 =
-        new LocalFileLock(absoluteTableIdentifier,
+        new LocalFileLock(absoluteTableIdentifier.getTablePath(),
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(localLock1.lock());
     LocalFileLock localLock2 =
-        new LocalFileLock(absoluteTableIdentifier,
+        new LocalFileLock(absoluteTableIdentifier.getTablePath(),
             LockUsage.METADATA_LOCK);
     Assert.assertTrue(!localLock2.lock());
 
@@ -68,4 +80,21 @@ public class LocalFileLockTest {
     Assert.assertTrue(localLock2.unlock());
   }
 
+  @Test public void testConfigurablePathForLock() throws Exception {
+    try {
+      Field f = secretClass.getDeclaredField("lockPath");
+      f.setAccessible(true);
+      f.set(secretClass, rootPath + "/target/");
+      AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier
+          .from(CarbonProperties.getInstance().getProperty("carbon.storelocation"), "databaseName",
+              "tableName", "1");
+      ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK);
+      carbonLock.lockWithRetries();
+      assert (new File(rootPath + "/target/1/LockFiles/tablestatus.lock").exists());
+      assert (!new File(absoluteTableIdentifier.getTablePath() + "/LockFiles").exists());
+    } finally {
+      tearDown();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
index e7a0602..2234d81 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
@@ -34,6 +34,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.Properties;
+import java.util.UUID;
 
 /**
  * ZooKeeperLocking Test cases
@@ -96,7 +97,7 @@ public class ZooKeeperLockingTest {
 
     AbsoluteTableIdentifier tableIdentifier = AbsoluteTableIdentifier
         .from(CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION),
-            "dbName", "tableName");
+            "dbName", "tableName", UUID.randomUUID().toString());
     ZooKeeperLocking zkl =
         new ZooKeeperLocking(tableIdentifier,
             LockUsage.METADATA_LOCK);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb10d03a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index cf90515..e3dabb1 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -179,7 +180,8 @@ public class CarbonReaderBuilder {
     // DB name is not applicable for SDK reader as, table will be never registered.
     CarbonTable table;
     if (isTransactionalTable) {
-      table = CarbonTable.buildFromTablePath(tableName, "default", tablePath);
+      table = CarbonTable
+          .buildFromTablePath(tableName, "default", tablePath, UUID.randomUUID().toString());
     } else {
       if (filterExpression != null) {
         table = CarbonTable.buildTable(tablePath, tableName);


Mime
View raw message