carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: Problem: Insert into select is failing as both are running as single task, both are sharing the same taskcontext and resources are cleared once if any one of the RDD(Select query's ScanRDD) is completed, so the other RDD(LoadRDD) r
Date Wed, 01 Aug 2018 13:41:08 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 3816e90e7 -> de9246066


Problem:
Insert into select is failing as both are running as single task, both are sharing the same
taskcontext and resources are cleared once if any one of the RDD(Select query's ScanRDD) is
completed, so the other RDD(LoadRDD) running is crashing as it is trying to access the cleared
memory.

Solution:
Check if any other RDD is sharing the same task context. If so, don't the clear the resource
at that time, the other RDD which shared the context should clear the memory once after the
task is finished.

This closes #2591


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de924606
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de924606
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de924606

Branch: refs/heads/master
Commit: de92460665bafb403a4b90b513a9136b6f1fb34c
Parents: 3816e90
Author: dhatchayani <dhatcha.official@gmail.com>
Authored: Tue Jul 31 23:11:49 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed Aug 1 19:10:56 2018 +0530

----------------------------------------------------------------------
 .../executor/impl/AbstractQueryExecutor.java    | 11 ++-
 .../carbondata/core/scan/model/QueryModel.java  | 11 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 64 +++++--------
 .../rdd/InsertTaskCompletionListener.scala      | 33 +++++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  3 +-
 .../spark/rdd/QueryTaskCompletionListener.scala | 94 ++++++++++++++++++++
 6 files changed, 168 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 5b67921..f87e46e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -87,6 +87,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
    */
   protected QueryExecutorProperties queryProperties;
 
+  // whether to clear/free unsafe memory or not
+  private boolean freeUnsafeMemory;
+
   /**
    * query result iterator which will execute the query
    * and give the result
@@ -114,6 +117,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
         queryModel.getQueryId());
     LOGGER.info("Query will be executed on table: " + queryModel.getAbsoluteTableIdentifier()
         .getCarbonTableIdentifier().getTableName());
+    this.freeUnsafeMemory = queryModel.isFreeUnsafeMemory();
     // Initializing statistics list to record the query statistics
     // creating copy on write to handle concurrent scenario
     queryProperties.queryStatisticsRecorder = queryModel.getStatisticsRecorder();
@@ -641,8 +645,11 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E>
{
         exceptionOccurred = e;
       }
     }
-    // clear all the unsafe memory used for the given task ID
-    UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+    // clear all the unsafe memory used for the given task ID only if it is neccessary to
be cleared
+    if (freeUnsafeMemory) {
+      UnsafeMemoryManager.INSTANCE
+          .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId());
+    }
     if (null != queryProperties.executorService) {
       // In case of limit query when number of limit records is already found so executors
       // must stop all the running execution otherwise it will keep running and will hit

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 55dafb9..31c7a86 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -114,6 +114,9 @@ public class QueryModel {
    */
   private boolean isFG;
 
+  // whether to clear/free unsafe memory or not
+  private boolean freeUnsafeMemory = true;
+
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     invalidSegmentIds = new ArrayList<>();
@@ -390,4 +393,12 @@ public class QueryModel {
         projection.getDimensions().size() + projection.getMeasures().size(),
         filterExpressionResolverTree.getFilterExpression().toString());
   }
+
+  public boolean isFreeUnsafeMemory() {
+    return freeUnsafeMemory;
+  }
+
+  public void setFreeUnsafeMemory(boolean freeUnsafeMemory) {
+    this.freeUnsafeMemory = freeUnsafeMemory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 67ea332..6b43999 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
+import org.apache.spark.util.{CarbonReflectionUtils, TaskCompletionListener}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
@@ -457,17 +458,29 @@ class CarbonScanRDD[T: ClassTag](
         }
       }
 
+      // create a statistics recorder
+      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
+      model.setStatisticsRecorder(recorder)
+
+      // TODO: rewrite this logic to call free memory in FailureListener on failures. On
success,
+      // no memory leak should be there, resources should be freed on success completion.
+      val listeners = CarbonReflectionUtils.getField("onCompleteCallbacks", context)
+        .asInstanceOf[ArrayBuffer[TaskCompletionListener]]
+      val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
+      model.setFreeUnsafeMemory(!isAdded)
       // add task completion before calling initialize as initialize method will internally
call
       // for usage of unsafe method for processing of one blocklet and if there is any exception
       // while doing that the unsafe memory occupied for that task will not get cleared
-      context.addTaskCompletionListener { _ =>
-        closeReader.apply()
-        close()
-        logStatistics(executionId, taskId, queryStartTime, model.getStatisticsRecorder, split)
+      context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
+        reader,
+        inputMetricsStats,
+        executionId,
+        taskId,
+        queryStartTime,
+        model.getStatisticsRecorder,
+        split,
+        queryId)
       }
-      // create a statistics recorder
-      val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
-      model.setStatisticsRecorder(recorder)
       // initialize the reader
       reader.initialize(inputSplit, attemptContext)
 
@@ -625,43 +638,6 @@ class CarbonScanRDD[T: ClassTag](
     format
   }
 
-  def logStatistics(
-      executionId: String,
-      taskId: Long,
-      queryStartTime: Long,
-      recorder: QueryStatisticsRecorder,
-      split: Partition
-  ): Unit = {
-    if (null != recorder) {
-      val queryStatistic = new QueryStatistic()
-      queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
-        System.currentTimeMillis - queryStartTime)
-      recorder.recordStatistics(queryStatistic)
-      // print executor query statistics for each task_id
-      val statistics = recorder.statisticsForTask(taskId, queryStartTime)
-      if (statistics != null && executionId != null) {
-        Profiler.invokeIfEnable {
-          val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
-          inputSplit.calculateLength()
-          val size = inputSplit.getLength
-          val files = inputSplit.getAllSplits.asScala.map { s =>
-            s.getSegmentId + "/" + s.getPath.getName
-          }.toArray[String]
-          Profiler.send(
-            QueryTaskEnd(
-              executionId.toLong,
-              queryId,
-              statistics.getValues,
-              size,
-              files
-            )
-          )
-        }
-      }
-      recorder.logStatisticsForTask(statistics)
-    }
-  }
-
   /**
    * This method will check and remove InExpression from filterExpression to prevent the
List
    * Expression values from serializing and deserializing on executor

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
new file mode 100644
index 0000000..9439ae5
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/InsertTaskCompletionListener.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.TaskContext
+import org.apache.spark.util.TaskCompletionListener
+
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo
+import org.apache.carbondata.processing.loading.DataLoadExecutor
+import org.apache.carbondata.spark.util.CommonUtil
+
+class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor)
+  extends TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    dataLoadExecutor.close()
+    CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 6b136bc..3848bad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -370,8 +370,7 @@ class NewDataFrameLoaderRDD[K, V](
         loader.initialize()
         val executor = new DataLoadExecutor
         // in case of success, failure or cancelation clear memory and stop execution
-        context.addTaskCompletionListener { context => executor.close()
-          CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)}
+        context.addTaskCompletionListener (new InsertTaskCompletionListener(executor))
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: NoRetryException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de924606/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
new file mode 100644
index 0000000..e4cb3f8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/QueryTaskCompletionListener.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.profiler.{Profiler, QueryTaskEnd}
+import org.apache.spark.util.TaskCompletionListener
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.memory.UnsafeMemoryManager
+import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder}
+import org.apache.carbondata.core.util.{TaskMetricsMap, ThreadLocalTaskInfo}
+import org.apache.carbondata.spark.InitInputMetrics
+
+class QueryTaskCompletionListener(freeMemory: Boolean,
+    var reader: RecordReader[Void, Object],
+    inputMetricsStats: InitInputMetrics, executionId: String, taskId: Int, queryStartTime:
Long,
+    queryStatisticsRecorder: QueryStatisticsRecorder, split: Partition, queryId: String)
+  extends TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = {
+    if (reader != null) {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception =>
+          LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(e)
+      }
+      reader = null
+    }
+    TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
+    inputMetricsStats.updateAndClose()
+    logStatistics(executionId, taskId, queryStartTime, queryStatisticsRecorder, split)
+    if (freeMemory) {
+      UnsafeMemoryManager.INSTANCE
+        .freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+    }
+  }
+
+  def logStatistics(
+      executionId: String,
+      taskId: Long,
+      queryStartTime: Long,
+      recorder: QueryStatisticsRecorder,
+      split: Partition
+  ): Unit = {
+    if (null != recorder) {
+      val queryStatistic = new QueryStatistic()
+      queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
+        System.currentTimeMillis - queryStartTime)
+      recorder.recordStatistics(queryStatistic)
+      // print executor query statistics for each task_id
+      val statistics = recorder.statisticsForTask(taskId, queryStartTime)
+      if (statistics != null && executionId != null) {
+        Profiler.invokeIfEnable {
+          val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
+          inputSplit.calculateLength()
+          val size = inputSplit.getLength
+          val files = inputSplit.getAllSplits.asScala.map { s =>
+            s.getSegmentId + "/" + s.getPath.getName
+          }.toArray[String]
+          Profiler.send(
+            QueryTaskEnd(
+              executionId.toLong,
+              queryId,
+              statistics.getValues,
+              size,
+              files
+            )
+          )
+        }
+      }
+      recorder.logStatisticsForTask(statistics)
+    }
+  }
+}
+


Mime
View raw message