carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kumarvisha...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3297] Fix that the IndexoutOfBoundsException when creating table and dropping table are at the same time
Date Tue, 12 Mar 2019 09:58:23 GMT
This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6840a18  [CARBONDATA-3297] Fix that the IndexoutOfBoundsException when creating table
and dropping table are at the same time
6840a18 is described below

commit 6840a183689ad6acd86e1850dedff8665bf126ae
Author: qiuchenjian <807169000@qq.com>
AuthorDate: Wed Feb 20 17:16:34 2019 +0800

    [CARBONDATA-3297] Fix that the IndexoutOfBoundsException when creating table and dropping
table are at the same time
    
    [Problem]
    Throw the IndexoutOfBoundsException when creating table and dropping table are at the
same time
    
    [Solution]
    The type of carbonTables in MetaData.class is ArrayBuffer, and the ArrayBuffer is not
thread-safe,
    so it throw this exception when creating table and dropping table are at the same time
    
    Use read write lock to guarantee the thread-safe
    
    This closes #3130
---
 .../spark/sql/hive/CarbonFileMetastore.scala       | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c1be154..ea3bba8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.net.URI
+import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -43,7 +44,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -53,9 +55,16 @@ import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
+  // use to lock the carbonTables
+  val lock : ReentrantReadWriteLock = new ReentrantReadWriteLock
+  val readLock: Lock = lock.readLock()
+  val writeLock: Lock = lock.writeLock()
+
   // clear the metadata
   def clear(): Unit = {
+    writeLock.lock()
     carbonTables.clear()
+    writeLock.unlock()
   }
 }
 
@@ -192,9 +201,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @return
    */
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
= {
-    metadata.carbonTables
+    metadata.readLock.lock()
+    val ret = metadata.carbonTables
       .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
         table.getTableName.equalsIgnoreCase(tableName))
+    metadata.readLock.unlock()
+    ret
   }
 
   def tableExists(
@@ -270,11 +282,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
       }
     }
 
+
     wrapperTableInfo.map { tableInfo =>
       CarbonMetadata.getInstance().removeTable(tableUniqueName)
       CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+      metadata.writeLock.lock()
       metadata.carbonTables += carbonTable
+      metadata.writeLock.unlock()
       carbonTable
     }
   }
@@ -413,8 +428,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    metadata.writeLock.lock()
     metadata.carbonTables +=
       CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
+    metadata.writeLock.unlock()
+    metadata.carbonTables
   }
 
   /**
@@ -427,7 +445,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
     carbonTableToBeRemoved match {
       case Some(carbonTable) =>
+        metadata.writeLock.lock()
         metadata.carbonTables -= carbonTable
+        metadata.writeLock.unlock()
       case None =>
         if (LOGGER.isDebugEnabled) {
           LOGGER.debug(s"No entry for table $tableName in database $dbName")
@@ -443,10 +463,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
       wrapperTableInfo.getTableUniqueName)
     for (i <- metadata.carbonTables.indices) {
+      metadata.writeLock.lock()
       if (wrapperTableInfo.getTableUniqueName.equals(
         metadata.carbonTables(i).getTableUniqueName)) {
         metadata.carbonTables(i) = carbonTable
       }
+      metadata.writeLock.unlock()
     }
   }
 
@@ -579,12 +601,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
         FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
       if (!(lastModifiedTime ==
             tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+        metadata.writeLock.lock()
         metadata.carbonTables = metadata.carbonTables.filterNot(
           table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) &&
                    table.getDatabaseName
                      .equalsIgnoreCase(tableIdentifier.database
                        .getOrElse(SparkSession.getActiveSession.get.sessionState.catalog
                          .getCurrentDatabase)))
+        metadata.writeLock.unlock()
         updateSchemasUpdatedTime(lastModifiedTime)
         isRefreshed = true
       }
@@ -594,8 +618,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   override def isReadFromHiveMetaStore: Boolean = false
 
-  override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
-    metadata.carbonTables
+  override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
+    metadata.readLock.lock
+    val ret = metadata.carbonTables.clone()
+    metadata.readLock.unlock
+    ret
+  }
+
 
   override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = {
     val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)


Mime
View raw message