carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2181] Thread Leak during compaction processing on restructured table
Date Thu, 22 Mar 2018 09:31:05 GMT
Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 53200ccff -> 0299dd90a


[CARBONDATA-2181] Thread Leak during compaction processing on restructured table

Problem
Thread leak in compaction operation

Analysis
Compaction uses both query and data loading processes. During data laod operation during compaction
new threads are spawned in sorting, merger and
data writer step using executor service. These threads are not getting closed in case comapciton
fails or the operation is killed from spark UI
as observed by taking a thread dump after compaction failure.

Fix
Add a task completion listener in each compaction task which will close all the executor service
instances as well as clean any other system
resources to prevent thread leak

This closes #2086


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0299dd90
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0299dd90
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0299dd90

Branch: refs/heads/branch-1.3
Commit: 0299dd90ab1b9737480245e994e2c2c83f49222e
Parents: 53200cc
Author: manishgupta88 <tomanishgupta18@gmail.com>
Authored: Wed Mar 21 12:08:01 2018 +0530
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Thu Mar 22 15:04:38 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 29 ++++++++++++++++----
 .../merger/AbstractResultProcessor.java         |  6 ++++
 .../merger/CompactionResultSortProcessor.java   | 16 +++++++++++
 .../merger/RowResultMergerProcessor.java        |  8 ++++++
 4 files changed, 53 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0299dd90/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index e0dcffd..6932c79 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -98,6 +98,7 @@ class CarbonMergerRDD[K, V](
       var mergeStatus = false
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
+      var processor: AbstractResultProcessor = null
       try {
 
 
@@ -173,6 +174,10 @@ class CarbonMergerRDD[K, V](
         exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
           carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
 
+        // add task completion listener to clean up the resources
+        context.addTaskCompletionListener { _ =>
+          close()
+        }
         // fire a query and get the results.
         var result2: java.util.List[RawResultIterator] = null
         try {
@@ -199,7 +204,6 @@ class CarbonMergerRDD[K, V](
         )
 
         carbonLoadModel.setPartitionId("0")
-        var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {
           LOGGER.info("CompactionResultSortProcessor flow is selected")
           processor = new CompactionResultSortProcessor(
@@ -228,9 +232,25 @@ class CarbonMergerRDD[K, V](
         case e: Exception =>
           LOGGER.error(e)
           throw e
-      } finally {
-        // delete temp location data
+      }
+
+      private def close(): Unit = {
+        deleteLocalDataFolders()
+        // close all the query executor service and clean up memory acquired during query
processing
+        if (null != exec) {
+          LOGGER.info("Cleaning up query resources acquired during compaction")
+          exec.finish()
+        }
+        // clean up the resources for processor
+        if (null != processor) {
+          LOGGER.info("Closing compaction processor instance to clean up loading resources")
+          processor.close()
+        }
+      }
+
+      private def deleteLocalDataFolders(): Unit = {
         try {
+          LOGGER.info("Deleting local folder store location")
           val isCompactionFlow = true
           TableProcessingOperations
             .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow, false)
@@ -238,9 +258,6 @@ class CarbonMergerRDD[K, V](
           case e: Exception =>
             LOGGER.error(e)
         }
-        if (null != exec) {
-          exec.finish()
-        }
       }
 
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0299dd90/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index d3caa99..067a4c4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -41,6 +41,12 @@ public abstract class AbstractResultProcessor {
    */
   public abstract boolean execute(List<RawResultIterator> resultIteratorList);
 
+  /**
+   * This method will be sued to clean up the resources and close all the spawned threads
to avoid
+   * any kind of memory or thread leak
+   */
+  public abstract void close();
+
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
       CompactionType compactionType, CarbonTable carbonTable,
       CarbonFactDataHandlerModel carbonFactDataHandlerModel) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0299dd90/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index e7c4502..f3e4b8f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -186,6 +186,22 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor
{
     return isCompactionSuccess;
   }
 
+  @Override
+  public void close() {
+    // close the sorter executor service
+    if (null != sortDataRows) {
+      sortDataRows.close();
+    }
+    // close the final merger
+    if (null != finalMerger) {
+      finalMerger.close();
+    }
+    // close data handler
+    if (null != dataHandler) {
+      dataHandler.closeHandler();
+    }
+  }
+
   /**
    * This method will clean up the local folders and files created during compaction process
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0299dd90/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b41829f..6e99a1a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -182,6 +182,14 @@ public class RowResultMergerProcessor extends AbstractResultProcessor
{
     return mergeStatus;
   }
 
+  @Override
+  public void close() {
+    // close data handler
+    if (null != dataHandler) {
+      dataHandler.closeHandler();
+    }
+  }
+
   /**
    * Below method will be used to add sorted row
    *


Mime
View raw message