carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [50/50] carbondata git commit: [CARBONDATA-2749][dataload] In HDFS Empty tablestatus file is written during datalaod, iud or compaction when disk is full.
Date Mon, 30 Jul 2018 18:43:16 GMT
[CARBONDATA-2749][dataload] In HDFS Empty tablestatus file is written
during datalaod, iud or compaction when disk is full.

Problem:
When a failure happens due to disk full during load, IUD or Compaction,
then while updating the tablestatus file, the tablestaus.tmp file during
atomic file operation remains empty, and in the finally block the empty
tablestaus.tmp file is getting renamed to the actual file.
This leads to empty tablestatus file. Once such problem happens the
tablestatus file can not be retrieved and the already loaded data can be used.

Solution:
If the failures happens during write then the the schema rename in the finally
block must be avoided.

This closes #2517


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/92f02112
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/92f02112
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/92f02112

Branch: refs/heads/branch-1.4
Commit: 92f021122e6484c9a6cbe48bdb22a7db5d28aec8
Parents: 5b46d4c
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Tue Jul 17 16:59:35 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Tue Jul 31 00:11:26 2018 +0530

----------------------------------------------------------------------
 .../status/DiskBasedDataMapStatusProvider.java  |  1 +
 .../AtomicFileOperationS3Impl.java              |  5 +-
 .../fileoperations/AtomicFileOperations.java    |  2 +
 .../AtomicFileOperationsImpl.java               | 24 ++++++++--
 .../core/metadata/SegmentFileStore.java         |  4 ++
 .../statusmanager/SegmentStatusManager.java     |  5 ++
 .../SegmentUpdateStatusManager.java             |  2 +
 .../hadoop/testutil/StoreCreator.java           |  4 ++
 .../processing/util/CarbonLoaderUtil.java       | 50 ++++----------------
 9 files changed, 50 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
index 22a7f6d..d42c98a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
@@ -174,6 +174,7 @@ public class DiskBasedDataMapStatusProvider implements DataMapStatusStorageProvi
       brWriter.write(metadataInstance);
     } catch (IOException ioe) {
       LOG.error("Error message: " + ioe.getLocalizedMessage());
+      fileWrite.setFailed();
       throw ioe;
     } finally {
       if (null != brWriter) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
index 71730f0..f5311cb 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationS3Impl.java
@@ -44,7 +44,6 @@ class AtomicFileOperationS3Impl implements AtomicFileOperations {
   AtomicFileOperationS3Impl(String filePath) {
     this.filePath = filePath;
   }
-
   @Override public DataInputStream openForRead() throws IOException {
     return FileFactory.getDataInputStream(filePath, FileFactory.getFileType(filePath));
   }
@@ -61,4 +60,8 @@ class AtomicFileOperationS3Impl implements AtomicFileOperations {
     dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
     return dataOutStream;
   }
+
+  @Override public void setFailed() {
+    // no implementation required
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java
b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java
index ffaa1b1..a641a49 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperations.java
@@ -28,4 +28,6 @@ public interface AtomicFileOperations {
   void close() throws IOException;
 
   DataOutputStream openForWrite(FileWriteOperation operation) throws IOException;
+
+  void setFailed();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
index af2456a..f9f8647 100644
--- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java
@@ -21,6 +21,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -29,6 +31,11 @@ import org.apache.carbondata.core.util.CarbonUtil;
 
 class AtomicFileOperationsImpl implements AtomicFileOperations {
 
+  /**
+   * Logger instance
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AtomicFileOperationsImpl.class.getName());
   private String filePath;
 
   private FileType fileType;
@@ -36,6 +43,7 @@ class AtomicFileOperationsImpl implements AtomicFileOperations {
   private String tempWriteFilePath;
 
   private DataOutputStream dataOutStream;
+  private boolean setFailed;
 
   AtomicFileOperationsImpl(String filePath, FileType fileType) {
     this.filePath = filePath;
@@ -70,12 +78,20 @@ class AtomicFileOperationsImpl implements AtomicFileOperations {
     if (null != dataOutStream) {
       CarbonUtil.closeStream(dataOutStream);
       CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType);
-      if (!tempFile.renameForce(filePath)) {
-        throw new IOException("temporary file renaming failed, src="
-            + tempFile.getPath() + ", dest=" + filePath);
+      if (!this.setFailed) {
+        if (!tempFile.renameForce(filePath)) {
+          throw new IOException(
+              "temporary file renaming failed, src=" + tempFile.getPath() + ", dest=" + filePath);
+        }
+      } else {
+        LOGGER.warn("The temporary file renaming skipped due to I/O error, deleting file
"
+            + tempWriteFilePath);
+        tempFile.delete();
       }
     }
-
   }
 
+  @Override public void setFailed() {
+    this.setFailed = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 32f6155..67e58d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -229,6 +229,10 @@ public class SegmentFileStore {
       String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
       brWriter.write(metadataInstance);
       brWriter.flush();
+    } catch (IOException ie) {
+      LOGGER.error("Error message: " + ie.getLocalizedMessage());
+      fileWrite.setFailed();
+      throw ie;
     } finally {
       CarbonUtil.closeStreams(brWriter);
       fileWrite.close();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index d5b456c..daf54a0 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -512,6 +512,7 @@ public class SegmentStatusManager {
       brWriter.write(metadataInstance);
     } catch (IOException ioe) {
       LOG.error("Error message: " + ioe.getLocalizedMessage());
+      fileWrite.setFailed();
       throw ioe;
     } finally {
       if (null != brWriter) {
@@ -881,6 +882,10 @@ public class SegmentStatusManager {
 
       String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
       brWriter.write(metadataInstance);
+    } catch (IOException ie) {
+      LOG.error("Error message: " + ie.getLocalizedMessage());
+      writeOperation.setFailed();
+      throw ie;
     } finally {
       try {
         if (null != brWriter) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 5d5e8b0..c3daac5 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -724,6 +724,8 @@ public class SegmentUpdateStatusManager {
       brWriter.write(metadataInstance);
     } catch (IOException ioe) {
       LOG.error("Error message: " + ioe.getLocalizedMessage());
+      fileWrite.setFailed();
+      throw ioe;
     } finally {
       if (null != brWriter) {
         brWriter.flush();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 6e6a65b..c113228 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -449,6 +449,10 @@ public class StoreCreator {
 
       String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
       brWriter.write(metadataInstance);
+    } catch (IOException ioe) {
+      LOG.error("Error message: " + ioe.getLocalizedMessage());
+      writeOperation.setFailed();
+      throw ioe;
     } finally {
       try {
         if (null != brWriter) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/92f02112/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 272abec..19353d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,14 +16,17 @@
  */
 package org.apache.carbondata.processing.util;
 
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -40,9 +43,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -62,7 +62,6 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
-import com.google.gson.Gson;
 import org.apache.commons.lang3.StringUtils;
 
 public final class CarbonLoaderUtil {
@@ -397,39 +396,6 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
-      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
-
-    try {
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        LOGGER.error("error in  flushing ");
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-      writeOperation.close();
-    }
-
-  }
-
   public static boolean isValidEscapeSequence(String escapeChar) {
     return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
         escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||


Mime
View raw message