carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject carbondata git commit: [CARBONDATA-2745] Added atomic file operations for S3
Date Tue, 24 Jul 2018 13:08:43 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master a75b9db6a -> 878bbd8a4


[CARBONDATA-2745] Added atomic file operations for S3

S3 supports atomic file overwrite. hdfs rename is atomic, while overwrite is not atomic and
can result in empty file read temporarily.
So separate implementations for both hdfs and S3 to ensure consistancy of overwrite and read

This closes #2511


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/878bbd8a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/878bbd8a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/878bbd8a

Branch: refs/heads/master
Commit: 878bbd8a487de1d4fbd5c67cb67e76b6d89dcf48
Parents: a75b9db
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Mon Jul 16 14:28:00 2018 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Tue Jul 24 18:33:10 2018 +0530

----------------------------------------------------------------------
 .../status/DiskBasedDataMapStatusProvider.java  |  5 +-
 .../AtomicFileOperationFactory.java             | 32 ++++++++++
 .../AtomicFileOperationS3Impl.java              | 64 ++++++++++++++++++++
 .../AtomicFileOperationsImpl.java               |  4 +-
 .../core/metadata/SegmentFileStore.java         |  6 +-
 .../reader/CarbonDeleteDeltaFileReaderImpl.java |  5 +-
 .../statusmanager/SegmentStatusManager.java     |  8 +--
 .../SegmentUpdateStatusManager.java             |  8 +--
 .../carbondata/core/writer/ThriftWriter.java    |  5 +-
 .../datamap/examples/MinMaxIndexDataMap.java    |  4 +-
 .../hadoop/testutil/StoreCreator.java           |  4 +-
 .../presto/util/CarbonDataStoreCreator.scala    |  7 +--
 .../processing/util/CarbonLoaderUtil.java       |  4 +-
 13 files changed, 123 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
index 41d98fb..22a7f6d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
@@ -160,8 +160,7 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi
    */
   private static void writeLoadDetailsIntoFile(String location,
       DataMapStatusDetail[] dataMapStatusDetails) throws IOException {
-    AtomicFileOperations fileWrite =
-        new AtomicFileOperationsImpl(location, FileFactory.getFileType(location));
+    AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(location);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationFactory.java
b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationFactory.java
new file mode 100644
index 0000000..b764eb9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.fileoperations;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+public class AtomicFileOperationFactory {
+
+  public static AtomicFileOperations getAtomicFileOperations(String filePath) {
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    if (fileType == FileFactory.FileType.S3) {
+      return new AtomicFileOperationS3Impl(filePath);
+    } else {
+      return new AtomicFileOperationsImpl(filePath, fileType);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
new file mode 100644
index 0000000..71730f0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.fileoperations;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * This Implementation for AtomicFileOperation is specific to S3 store.
+ * In this temporary files would not be written instead directly overwrite call
+ * would be fired on the desired file.
+ *
+ * This is required because deletion and recreation on tablestatus has a very small window
where the
+ * file would not exist in the Metadata directory. Any query which tries to access tablestatus
+ * during this time will fail. By this fix the complete object will be overwritten to the
bucket and
+ * S3 will ensure that either the old or the new file content is always available for read.
+ *
+ */
+class AtomicFileOperationS3Impl implements AtomicFileOperations {
+
+  private String filePath;
+
+  private DataOutputStream dataOutStream;
+
+  AtomicFileOperationS3Impl(String filePath) {
+    this.filePath = filePath;
+  }
+
+  @Override public DataInputStream openForRead() throws IOException {
+    return FileFactory.getDataInputStream(filePath, FileFactory.getFileType(filePath));
+  }
+
+  @Override public void close() throws IOException {
+    if (null != dataOutStream) {
+      CarbonUtil.closeStream(dataOutStream);
+    }
+  }
+
+  @Override public DataOutputStream openForWrite(FileWriteOperation operation) throws IOException
{
+    filePath = filePath.replace("\\", "/");
+    FileFactory.FileType fileType = FileFactory.getFileType(filePath);
+    dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
+    return dataOutStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
index 61690ff..af2456a 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
 import org.apache.carbondata.core.util.CarbonUtil;
 
-public class AtomicFileOperationsImpl implements AtomicFileOperations {
+class AtomicFileOperationsImpl implements AtomicFileOperations {
 
   private String filePath;
 
@@ -37,7 +37,7 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations {
 
   private DataOutputStream dataOutStream;
 
-  public AtomicFileOperationsImpl(String filePath, FileType fileType) {
+  AtomicFileOperationsImpl(String filePath, FileType fileType) {
     this.filePath = filePath;
 
     this.fileType = fileType;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 28ac47e..32f6155 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -42,8 +42,8 @@ import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
@@ -217,7 +217,7 @@ public class SegmentFileStore {
    */
   public static void writeSegmentFile(SegmentFile segmentFile, String path) throws IOException
{
     AtomicFileOperations fileWrite =
-        new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+        AtomicFileOperationFactory.getAtomicFileOperations(path);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();
@@ -429,7 +429,7 @@ public class SegmentFileStore {
     InputStreamReader inStream = null;
     SegmentFile segmentFile;
     AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(segmentFilePath, FileFactory.getFileType(segmentFilePath));
+        AtomicFileOperationFactory.getAtomicFileOperations(segmentFilePath);
 
     try {
       if (!FileFactory.isFileExist(segmentFilePath, FileFactory.getFileType(segmentFilePath)))
{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
index 1c0b586..b2260f0 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -100,8 +100,7 @@ public class CarbonDeleteDeltaFileReaderImpl implements CarbonDeleteDeltaFileRea
     InputStreamReader inStream = null;
     DeleteDeltaBlockDetails deleteDeltaBlockDetails;
     AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath));
-
+        AtomicFileOperationFactory.getAtomicFileOperations(filePath);
     try {
       if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) {
         return new DeleteDeltaBlockDetails("");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c7925c9..5c73259 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -35,8 +35,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
@@ -240,7 +240,7 @@ public class SegmentStatusManager {
     InputStreamReader inStream = null;
     LoadMetadataDetails[] listOfLoadFolderDetailsArray;
     AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(tableStatusPath, FileFactory.getFileType(tableStatusPath));
+        AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath);
 
     try {
       if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath)))
{
@@ -497,7 +497,7 @@ public class SegmentStatusManager {
   public static void writeLoadDetailsIntoFile(String dataLoadLocation,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
     AtomicFileOperations fileWrite =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();
@@ -851,7 +851,7 @@ public class SegmentStatusManager {
     BufferedWriter brWriter = null;
 
     AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
 
     try {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 0c2098a..083325d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -37,8 +37,8 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
@@ -662,8 +662,8 @@ public class SegmentUpdateStatusManager {
     String tableUpdateStatusPath =
         CarbonTablePath.getMetadataPath(identifier.getTablePath()) +
             CarbonCommonConstants.FILE_SEPARATOR + tableUpdateStatusIdentifier;
-    AtomicFileOperations fileOperation = new AtomicFileOperationsImpl(tableUpdateStatusPath,
-        FileFactory.getFileType(tableUpdateStatusPath));
+    AtomicFileOperations fileOperation =
+        AtomicFileOperationFactory.getAtomicFileOperations(tableUpdateStatusPath);
 
     try {
       if (!FileFactory
@@ -709,7 +709,7 @@ public class SegmentUpdateStatusManager {
             + CarbonUpdateUtil.getUpdateStatusFileName(updateStatusFileIdentifier);
 
     AtomicFileOperations fileWrite =
-        new AtomicFileOperationsImpl(fileLocation, FileFactory.getFileType(fileLocation));
+        AtomicFileOperationFactory.getAtomicFileOperations(fileLocation);
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
index 28f81f3..93cd238 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java
@@ -21,8 +21,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -91,8 +91,7 @@ public class ThriftWriter {
    * @throws IOException
    */
   public void open(FileWriteOperation fileWriteOperation) throws IOException {
-    FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    atomicFileOperationsWriter = new AtomicFileOperationsImpl(fileName, fileType);
+    atomicFileOperationsWriter = AtomicFileOperationFactory.getAtomicFileOperations(fileName);
     dataOutputStream = atomicFileOperationsWriter.openForWrite(fileWriteOperation);
     binaryOut = new TCompactProtocol(new TIOStreamTransport(dataOutputStream));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index 78868a9..314a168 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.memory.MemoryException;
@@ -94,8 +93,7 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
     BufferedReader buffReader = null;
     InputStreamReader inStream = null;
     MinMaxIndexBlockDetails[] readMinMax = null;
-    AtomicFileOperations fileOperation =
-        new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath));
+    AtomicFileOperations fileOperation = AtomicFileOperationsF(filePath, FileFactory.getFileType(filePath));
 
     try {
       if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 287f623..6e6a65b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -44,8 +44,8 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -439,7 +439,7 @@ public class StoreCreator {
     BufferedWriter brWriter = null;
 
     AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
 
     try {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index ffa5b00..e57312b 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumn
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.fileoperations.{AtomicFileOperations, AtomicFileOperationsImpl,
FileWriteOperation}
+import org.apache.carbondata.core.fileoperations.{AtomicFileOperationFactory, AtomicFileOperations,
FileWriteOperation}
 import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.encoder.Encoding
@@ -525,9 +525,8 @@ object CarbonDataStoreCreator {
       val dataLoadLocation: String = schema.getCarbonTable.getMetadataPath + File.separator
+
                                      CarbonTablePath.TABLE_STATUS_FILE
       val gsonObjectToWrite: Gson = new Gson()
-      val writeOperation: AtomicFileOperations = new AtomicFileOperationsImpl(
-        dataLoadLocation,
-        FileFactory.getFileType(dataLoadLocation))
+      val writeOperation: AtomicFileOperations = AtomicFileOperationFactory
+        .getAtomicFileOperations(dataLoadLocation)
       val dataOutputStream =
         writeOperation.openForWrite(FileWriteOperation.OVERWRITE)
       val brWriter = new BufferedWriter(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/878bbd8a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 8a0c2b6..272abec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -40,8 +40,8 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
@@ -406,7 +406,7 @@ public final class CarbonLoaderUtil {
     BufferedWriter brWriter = null;
 
     AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
 
     try {
       dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);


Mime
View raw message