carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [2/2] carbondata git commit: [CARBONDATA-2666] updated rename command so that table directory is not renamed
Date Mon, 09 Jul 2018 08:36:31 GMT
[CARBONDATA-2666] updated rename command so that table directory is not renamed

rename will not rename table folder but only changes metadata

This closes #2420


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7a1d12aa
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7a1d12aa
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7a1d12aa

Branch: refs/heads/master
Commit: 7a1d12aa1d6d0dfb0b2a8f40317e60115081cd2b
Parents: c3bc1ba
Author: kunal642 <kunalkapoor642@gmail.com>
Authored: Wed Jun 27 12:28:04 2018 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Mon Jul 9 14:04:47 2018 +0530

----------------------------------------------------------------------
 .../core/scan/executor/util/QueryUtil.java      |  19 +--
 .../carbondata/core/scan/filter/FilterUtil.java |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  12 --
 .../core/util/path/CarbonTablePath.java         |  28 ++---
 .../carbondata/core/util/CarbonUtilTest.java    |  10 --
 .../datamap/bloom/BloomCoarseGrainDataMap.java  |   6 +-
 .../hadoop/api/CarbonTableOutputFormat.java     |  25 ++--
 .../TestTimeSeriesUnsupportedSuite.scala        |   2 +-
 .../badrecordloger/BadRecordActionTest.scala    |  16 +--
 .../StandardPartitionBadRecordLoggerTest.scala  |   4 +-
 .../streaming/StreamSinkFactory.scala           |   4 +
 .../command/carbonTableSchemaCommon.scala       |  23 +++-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  10 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   8 +-
 .../management/CarbonLoadDataCommand.scala      |  13 +-
 .../management/RefreshCarbonTableCommand.scala  |  11 +-
 .../preaaggregate/PreAggregateListeners.scala   |  13 +-
 .../schema/CarbonAlterTableRenameCommand.scala  | 119 +------------------
 .../table/CarbonCreateTableCommand.scala        |  19 ++-
 .../datasources/SparkCarbonTableFormat.scala    |   4 +
 .../sql/execution/strategy/DDLStrategy.scala    |   3 +
 .../spark/sql/hive/CarbonFileMetastore.scala    |   6 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |   8 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |  31 +----
 .../BadRecordPathLoadOptionTest.scala           |  11 +-
 .../TestStreamingTableOperation.scala           |   9 +-
 .../restructure/AlterTableRevertTestCase.scala  |  12 --
 .../AlterTableValidationTestCase.scala          |   2 +-
 .../vectorreader/AddColumnTestCases.scala       |   6 +-
 .../loading/BadRecordsLoggerProvider.java       |  29 +----
 .../loading/CarbonDataLoadConfiguration.java    |  10 ++
 .../loading/DataLoadProcessBuilder.java         |   2 +
 .../converter/impl/FieldEncoderFactory.java     |   8 +-
 .../converter/impl/RowConverterImpl.java        |   8 +-
 .../loading/model/CarbonLoadModel.java          |  12 ++
 .../loading/model/CarbonLoadModelBuilder.java   |  12 +-
 .../processing/loading/model/LoadOption.java    |  11 --
 .../processing/util/CarbonBadRecordUtil.java    |  39 +++++-
 38 files changed, 234 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 7986e8a..c8b0f6e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -61,7 +61,6 @@ import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnRes
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -378,8 +377,8 @@ public class QueryUtil {
         ColumnIdentifier columnIdentifier;
         if (null != dimension.getColumnSchema().getParentColumnTableRelations() && !dimension
             .getColumnSchema().getParentColumnTableRelations().isEmpty()) {
-          dictionarySourceAbsoluteTableIdentifier = getTableIdentifierForColumn(dimension,
-              carbonTable.getAbsoluteTableIdentifier());
+          dictionarySourceAbsoluteTableIdentifier =
+              getTableIdentifierForColumn(dimension);
           columnIdentifier = new ColumnIdentifier(
               dimension.getColumnSchema().getParentColumnTableRelations().get(0).getColumnId(),
               dimension.getColumnProperties(), dimension.getDataType());
@@ -397,8 +396,14 @@ public class QueryUtil {
     return dictionaryColumnUniqueIdentifiers;
   }
 
-  public static AbsoluteTableIdentifier getTableIdentifierForColumn(CarbonDimension carbonDimension,
-      AbsoluteTableIdentifier identifier) {
+  public static AbsoluteTableIdentifier getTableIdentifierForColumn(
+      CarbonDimension carbonDimension) {
+    RelationIdentifier parentRelationIdentifier =
+        carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
+            .getRelationIdentifier();
+    String parentTablePath = CarbonMetadata.getInstance()
+        .getCarbonTable(parentRelationIdentifier.getDatabaseName(),
+            parentRelationIdentifier.getTableName()).getTablePath();
     RelationIdentifier relation = carbonDimension.getColumnSchema()
         .getParentColumnTableRelations()
         .get(0)
@@ -406,9 +411,7 @@ public class QueryUtil {
     String parentTableName = relation.getTableName();
     String parentDatabaseName = relation.getDatabaseName();
     String parentTableId = relation.getTableId();
-    String newTablePath =
-        CarbonTablePath.getNewTablePath(identifier.getTablePath(), parentTableName);
-    return AbsoluteTableIdentifier.from(newTablePath, parentDatabaseName, parentTableName,
+    return AbsoluteTableIdentifier.from(parentTablePath, parentDatabaseName, parentTableName,
         parentTableId);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index b5fd0b7..d23e2d6 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -1237,7 +1237,7 @@ public final class FilterUtil {
       if (null != carbonDimension.getColumnSchema().getParentColumnTableRelations()
           && carbonDimension.getColumnSchema().getParentColumnTableRelations().size() == 1) {
         dictionarySourceAbsoluteTableIdentifier = QueryUtil
-            .getTableIdentifierForColumn(carbonDimension, carbonTable.getAbsoluteTableIdentifier());
+            .getTableIdentifierForColumn(carbonDimension);
         columnIdentifier = new ColumnIdentifier(
             carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0).getColumnId(),
             carbonDimension.getColumnProperties(), carbonDimension.getDataType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index eaa2b74..c568f90 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -356,17 +355,6 @@ public final class CarbonUtil {
     });
   }
 
-  public static String getBadLogPath(String storeLocation) {
-    String badLogStoreLocation = CarbonProperties.getInstance()
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
-    if (null == badLogStoreLocation) {
-      badLogStoreLocation =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    }
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-    return badLogStoreLocation;
-  }
-
   public static void deleteFoldersAndFilesSilent(final CarbonFile... file)
       throws IOException, InterruptedException {
     UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index fe68adf..14117f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -25,8 +25,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
-import org.apache.hadoop.fs.Path;
-
 /**
  * Helps to get Table content paths.
  */
@@ -386,21 +384,6 @@ public class CarbonTablePath {
   }
 
   /**
-   * get the parent folder of old table path and returns the new tablePath by appending new
-   * tableName to the parent
-   *
-   * @param tablePath         Old tablePath
-   * @param newTableName      new table name
-   * @return the new table path
-   */
-  public static String getNewTablePath(
-      String tablePath,
-      String newTableName) {
-    Path parentPath = new Path(tablePath).getParent();
-    return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + newTableName;
-  }
-
-  /**
    * Return store path for datamap based on the taskNo,if three tasks get launched during loading,
    * then three folders will be created based on the shard name and lucene index file will be
    * written into those folders
@@ -772,4 +755,15 @@ public class CarbonTablePath {
     return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
         + TABLE_STATUS_HISTORY_FILE;
   }
+
+  public static String generateBadRecordsPath(String badLogStoreLocation, String segmentId,
+      String taskNo, boolean isTransactionalTable) {
+    if (!isTransactionalTable) {
+      return badLogStoreLocation + File.separator + "SdkWriterBadRecords"
+          + CarbonCommonConstants.FILE_SEPARATOR + taskNo;
+    } else {
+      return badLogStoreLocation + File.separator + segmentId + CarbonCommonConstants.FILE_SEPARATOR
+          + taskNo;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 5f8d199..4846841 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -206,16 +206,6 @@ public class CarbonUtilTest {
     assertTrue(!testDir.exists());
   }
 
-  @Test public void testToGetBadLogPath() throws InterruptedException {
-    new MockUp<CarbonProperties>() {
-      @SuppressWarnings("unused") @Mock public String getProperty(String key) {
-        return "../unibi-solutions/system/carbon/badRecords";
-      }
-    };
-    String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath");
-    assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath");
-  }
-
   @Test public void testToDeleteFoldersAndFilesForCarbonFileSilently()
       throws IOException, InterruptedException {
     LocalCarbonFile testDir = new LocalCarbonFile("../core/src/test/resources/testDir");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
index 47ba79d..01bd804 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMap.java
@@ -124,9 +124,9 @@ public class BloomCoarseGrainDataMap extends CoarseGrainDataMap {
             CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
             CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
         dataField.setTimestampFormat(tsFormat);
-        FieldConverter fieldConverter =
-            FieldEncoderFactory.getInstance().createFieldEncoder(dataField, absoluteTableIdentifier,
-                i, nullFormat, null, false, localCaches[i], false);
+        FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
+            .createFieldEncoder(dataField, absoluteTableIdentifier, i, nullFormat, null, false,
+                localCaches[i], false, carbonTable.getTablePath());
         this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter);
       }
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index e11efc4..78c670a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWra
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -279,9 +280,9 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
     model.setTableName(CarbonTableOutputFormat.getTableName(conf));
     model.setCarbonTransactionalTable(true);
-    model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
+    CarbonTable carbonTable = getCarbonTable(conf);
+    model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
     model.setTablePath(getTablePath(conf));
-
     setFileHeader(conf, model);
     model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N"));
     model.setBadRecordsLoggerEnable(
@@ -345,14 +346,18 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
                     CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
                     CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));
 
-    model.setBadRecordsLocation(
-        conf.get(BAD_RECORD_PATH,
-            carbonProperty.getProperty(
-                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-                carbonProperty.getProperty(
-                    CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-                    CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
-
+    String badRecordsPath = conf.get(BAD_RECORD_PATH);
+    if (StringUtils.isEmpty(badRecordsPath)) {
+      badRecordsPath =
+          carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
+      if (StringUtils.isEmpty(badRecordsPath)) {
+        badRecordsPath = carbonProperty
+            .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty
+                .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL));
+      }
+    }
+    model.setBadRecordsLocation(badRecordsPath);
     model.setUseOnePass(
         conf.getBoolean(IS_ONE_PASS_LOAD,
             Boolean.parseBoolean(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
index 1bcd6ec..a7e425c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesUnsupportedSuite.scala
@@ -251,7 +251,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
       sql("alter table maintable_agg1_minute rename to maintable_agg1_minute_new")
     }
     assert(e.getMessage.contains(
-      "Rename operation for pre-aggregate table is not supported."))
+      "Rename operation for datamaps is not supported."))
 
     // check datamap after alter
     checkExistence(sql("SHOW DATAMAP ON TABLE mainTable"), true, "maintable_agg1_minute")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
index 959aa6a..82337a3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -125,19 +125,6 @@ class BadRecordActionTest extends QueryTest {
       Seq(Row(2)))
   }
 
-  test("test bad record REDIRECT but not having location should throw exception") {
-    sql("drop table if exists sales")
-    sql(
-      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
-    val exMessage = intercept[Exception] {
-      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
-          "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
-          " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')")
-    }
-    assert(exMessage.getMessage.contains("Invalid bad records location."))
-  }
-
   test("test bad record REDIRECT but not having empty location in option should throw exception") {
     sql("drop table if exists sales")
     sql(
@@ -153,7 +140,8 @@ class BadRecordActionTest extends QueryTest {
             "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
             " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
       }
-      assert(exMessage.getMessage.contains("Invalid bad records location."))
+      assert(exMessage.getMessage
+        .contains("Cannot redirect bad records as bad record location is not provided."))
     }
     finally {
       CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index 8e1f13b..d9e5d3c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -43,8 +43,8 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter
 
   test("test partition redirect") {
     sql(
-      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'""")
+      s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")
 
     val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index c162ea7..840b03f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -22,6 +22,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession
@@ -40,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.streaming.segment.StreamSegment
@@ -255,6 +257,8 @@ object StreamSinkFactory {
       optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
         .asScala.map(_.getColName).mkString(","))
     }
+    optionsFinal
+      .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(parameters.asJava, carbonTable))
     val carbonLoadModel = new CarbonLoadModel()
     new CarbonLoadModelBuilder(carbonTable).build(
       parameters.asJava,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 6fee8c7..e50a8fd 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -32,6 +32,8 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
@@ -43,7 +45,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema,
   ParentColumnTableRelation}
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
-import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CompactionType
@@ -814,6 +816,11 @@ class TableNewProcessor(cm: TableModel) {
     }
     // Add table comment to table properties
     tablePropertiesMap.put("comment", cm.tableComment.getOrElse(""))
+    val badRecordsPath = getBadRecordsPath(tablePropertiesMap,
+      cm.tableName,
+      tableSchema.getTableId,
+      cm.databaseNameOp.getOrElse("default"))
+    tablePropertiesMap.put("bad_records_path", badRecordsPath)
     tableSchema.setTableProperties(tablePropertiesMap)
     if (cm.bucketFields.isDefined) {
       val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
@@ -862,6 +869,20 @@ class TableNewProcessor(cm: TableModel) {
     tableInfo
   }
 
+  private def getBadRecordsPath(tablePropertiesMap: util.HashMap[String, String],
+      tableName: String,
+      tableId: String,
+      databaseName: String): String = {
+    val badRecordsPath = tablePropertiesMap.asScala
+      .getOrElse("bad_records_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
+    if (badRecordsPath == null || badRecordsPath.isEmpty) {
+      CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
+    } else {
+      badRecordsPath + CarbonCommonConstants.FILE_SEPARATOR + databaseName +
+      CarbonCommonConstants.FILE_SEPARATOR + s"${tableName}_$tableId"
+    }
+  }
+
   /**
    * Method to check to get the encoder from parent or not
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 1f65fce..3c9039d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.SparkTypeConverter
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.ColumnIdentifier
+import org.apache.carbondata.core.metadata.{CarbonMetadata, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -275,9 +275,13 @@ case class CarbonDictionaryDecoder(
               if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
                   !carbonDimension
                     .getColumnSchema.getParentColumnTableRelations.isEmpty) {
+                val parentRelationIdentifier = carbonDimension.getColumnSchema
+                  .getParentColumnTableRelations.get(0).getRelationIdentifier
+                val parentTablePath = CarbonMetadata.getInstance()
+                  .getCarbonTable(parentRelationIdentifier.getDatabaseName,
+                    parentRelationIdentifier.getTableName).getTablePath
                 (QueryUtil
-                  .getTableIdentifierForColumn(carbonDimension,
-                    atiMap(tableName).getAbsoluteTableIdentifier),
+                  .getTableIdentifierForColumn(carbonDimension),
                   new ColumnIdentifier(carbonDimension.getColumnSchema
                     .getParentColumnTableRelations.get(0).getColumnId,
                     carbonDimension.getColumnProperties,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index bd5260f..5650187 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -166,7 +166,7 @@ object CarbonEnv {
       .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener)
       .addListener(classOf[DeleteFromTablePreEvent], DeletePreAggregatePreListener)
       .addListener(classOf[AlterTableDropColumnPreEvent], PreAggregateDropColumnPreListener)
-      .addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener)
+      .addListener(classOf[AlterTableRenamePreEvent], RenameTablePreListener)
       .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener)
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
@@ -220,11 +220,9 @@ object CarbonEnv {
       identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
       identifier.table)
     if (carbonEnv.carbonMetastore
-          .checkSchemasModifiedTimeAndReloadTable(identifier)) {
+          .checkSchemasModifiedTimeAndReloadTable(identifier)  && table.isDefined) {
       sparkSession.sessionState.catalog.refreshTable(identifier)
-      val tablePath = CarbonProperties.getStorePath + File.separator + identifier.database
-        .getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase) +
-                      File.separator + identifier.table
+      val tablePath = table.get.getTablePath
       DataMapStoreManager.getInstance().
         clearDataMaps(AbsoluteTableIdentifier.from(tablePath,
           identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 38bdbcf..6b1865a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -71,7 +71,7 @@ import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
+import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
@@ -102,6 +102,8 @@ case class CarbonLoadDataCommand(
 
   var currPartitions: util.List[PartitionSpec] = _
 
+  var parentTablePath: String = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -129,6 +131,12 @@ case class CarbonLoadDataCommand(
         }.head
       sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
     }
+    if (table.isChildDataMap) {
+      val parentTableIdentifier = table.getTableInfo.getParentRelationIdentifiers.get(0)
+      parentTablePath = CarbonEnv
+        .getCarbonTable(Some(parentTableIdentifier.getDatabaseName),
+          parentTableIdentifier.getTableName)(sparkSession).getTablePath
+    }
     operationContext.setProperty("isOverwrite", isOverwriteTable)
     if(CarbonUtil.hasAggregationDataMap(table)) {
       val loadMetadataEvent = new LoadMetadataEvent(table, false)
@@ -182,11 +190,14 @@ case class CarbonLoadDataCommand(
           carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
             CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
+      optionsFinal
+        .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
       val factPath = if (dataFrame.isDefined) {
         ""
       } else {
         FileUtils.getPaths(factPathFromUser, hadoopConf)
       }
+      carbonLoadModel.setParentTablePath(parentTablePath)
       carbonLoadModel.setFactFilePath(factPath)
       carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable)
       carbonLoadModel.setAggLoadRequest(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 1d91458..cf88fb9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -94,7 +94,7 @@ case class RefreshCarbonTableCommand(
           // 2.2.1 Register the aggregate tables to hive
           registerAggregates(databaseName, dataMapSchemaList)(sparkSession)
         }
-        registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession)
+        registerTableWithHive(databaseName, tableName, tableInfo, tablePath)(sparkSession)
         // Register partitions to hive metastore in case of hive partitioning carbon table
         if (tableInfo.getFactTable.getPartitionInfo != null &&
             tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
@@ -144,14 +144,16 @@ case class RefreshCarbonTableCommand(
    */
   def registerTableWithHive(dbName: String,
       tableName: String,
-      tableInfo: TableInfo)(sparkSession: SparkSession): Any = {
+      tableInfo: TableInfo,
+      tablePath: String)(sparkSession: SparkSession): Any = {
     val operationContext = new OperationContext
     try {
       val refreshTablePreExecutionEvent: RefreshTablePreExecutionEvent =
         new RefreshTablePreExecutionEvent(sparkSession,
           tableInfo.getOrCreateAbsoluteTableIdentifier())
       OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
-      CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false).run(sparkSession)
+      CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath))
+        .run(sparkSession)
       LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " +
                    s"[$tableName] is successful.")
     } catch {
@@ -199,7 +201,6 @@ case class RefreshCarbonTableCommand(
    */
   def registerAggregates(dbName: String,
       dataMapSchemaList: util.List[DataMapSchema])(sparkSession: SparkSession): Any = {
-    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     dataMapSchemaList.asScala.foreach(dataMap => {
       val tableName = dataMap.getChildSchema.getTableName
       if (!sparkSession.sessionState.catalog.listTables(dbName)
@@ -208,7 +209,7 @@ case class RefreshCarbonTableCommand(
         val absoluteTableIdentifier = AbsoluteTableIdentifier
           .from(tablePath, dbName, tableName)
         val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
-        registerTableWithHive(dbName, tableName, tableInfo)(sparkSession)
+        registerTableWithHive(dbName, tableName, tableInfo, tablePath)(sparkSession)
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index a41f78c..2fb2902 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
@@ -783,7 +783,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
   }
 }
 
-object PreAggregateRenameTablePreListener extends OperationEventListener {
+object RenameTablePreListener extends OperationEventListener {
   /**
    * Called on a specified event occurrence
    *
@@ -796,12 +796,17 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
     val carbonTable = renameTablePostListener.carbonTable
     if (carbonTable.isChildDataMap) {
       throw new UnsupportedOperationException(
-        "Rename operation for pre-aggregate table is not supported.")
+        "Rename operation for datamaps is not supported.")
     }
-    if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
+    if (carbonTable.hasAggregationDataMap) {
       throw new UnsupportedOperationException(
         "Rename operation is not supported for table with pre-aggregate tables")
     }
+    val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
+    if (!indexSchemas.isEmpty) {
+      throw new UnsupportedOperationException(
+        "Rename operation is not supported for table with datamaps")
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 779b937..0e3033e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.command.schema
 
-import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
@@ -27,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
@@ -36,8 +34,6 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
 
@@ -86,20 +82,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       throw new MalformedCarbonCommandException("alter rename is not supported for index datamap")
     }
 
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
-      LockUsage.COMPACTION_LOCK,
-      LockUsage.DELETE_SEGMENT_LOCK,
-      LockUsage.CLEAN_FILES_LOCK,
-      LockUsage.DROP_TABLE_LOCK)
-    var locks = List.empty[ICarbonLock]
     var timeStamp = 0L
     var carbonTable: CarbonTable = null
     // lock file path to release locks after operation
     var carbonTableLockFilePath: String = null
     try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
-          sparkSession)
       carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
         .asInstanceOf[CarbonRelation].carbonTable
       carbonTableLockFilePath = carbonTable.getTablePath
@@ -111,7 +98,6 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
-      val tableMetadataFile = oldTableIdentifier.getTablePath
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
       val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
@@ -126,14 +112,10 @@ private[sql] case class CarbonAlterTableRenameCommand(
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
       schemaEvolutionEntry.setTime_stamp(timeStamp)
-      renameBadRecords(oldTableName, newTableName, oldDatabaseName)
-      val fileType = FileFactory.getFileType(tableMetadataFile)
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
       val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
       val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
-      var newTablePath = CarbonTablePath.getNewTablePath(
-        oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       var partitions: Seq[CatalogTablePartition] = Seq.empty
       if (carbonTable.isHivePartitionTable) {
@@ -144,43 +126,9 @@ private[sql] case class CarbonAlterTableRenameCommand(
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
           oldIdentifier,
           newIdentifier,
-          newTablePath)
-      // changed the rename order to deal with situation when carbon table and hive table
-      // will point to the same tablePath
-      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(oldTableIdentifier.getTablePath, fileType)
-          .renameForce(
-            CarbonTablePath.getNewTablePath(oldTableIdentifier.getTablePath, newTableName))
-        if (!rename) {
-          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
-          sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
-        }
-      }
-      val updatedParts = updatePartitionLocations(
-        partitions,
-        oldTableIdentifier.getTablePath,
-        newTablePath,
-        sparkSession,
-        newIdentifier.table,
-        oldDatabaseName)
+        oldTableIdentifier.getTablePath)
 
-      val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
-      // Update the storage location with new path
-      sparkSession.sessionState.catalog.alterTable(
-        catalogTable.copy(storage = sparkSession.sessionState.catalog.
-          asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
-          new Path(newTablePath),
-          catalogTable.storage,
-          newIdentifier.table,
-          oldDatabaseName)))
-      if (updatedParts.nonEmpty) {
-        // Update the new updated partitions specs with new location.
-        sparkSession.sessionState.catalog.alterPartitions(
-          newIdentifier,
-          updatedParts)
-      }
-
-      newTablePath = metastore.updateTableSchemaForAlter(
+      metastore.updateTableSchemaForAlter(
         newTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
@@ -190,12 +138,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
         carbonTable,
         alterTableRenameModel,
-        newTablePath,
+        oldTableIdentifier.getTablePath,
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
 
       sparkSession.catalog.refreshTable(newIdentifier.quotedString)
-      carbonTableLockFilePath = newTablePath
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
@@ -209,71 +156,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
             carbonTable,
             timeStamp)(
             sparkSession)
-          renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         }
         throwMetadataException(oldDatabaseName, oldTableName,
           s"Alter table rename table operation failed: ${e.getMessage}")
-    } finally {
-      // case specific to rename table as after table rename old table path will not be found
-      if (carbonTable != null) {
-        AlterTableUtil
-          .releaseLocksManually(locks,
-            locksToBeAcquired,
-            oldDatabaseName,
-            newTableName,
-            carbonTableLockFilePath)
-      }
     }
     Seq.empty
   }
 
-  /**
-   * Update partitions with new table location
-   *
-   */
-  private def updatePartitionLocations(
-      partitions: Seq[CatalogTablePartition],
-      oldTablePath: String,
-      newTablePath: String,
-      sparkSession: SparkSession,
-      newTableName: String,
-      dbName: String): Seq[CatalogTablePartition] = {
-    partitions.map{ part =>
-      if (part.storage.locationUri.isDefined) {
-        val path = new Path(part.location)
-        if (path.toString.contains(oldTablePath)) {
-          val newPath = new Path(path.toString.replace(oldTablePath, newTablePath))
-          part.copy(storage = sparkSession.sessionState.catalog.
-            asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
-              newPath,
-              part.storage,
-              newTableName,
-              dbName))
-        } else {
-          part
-        }
-      } else {
-        part
-      }
-    }
-  }
-
-  private def renameBadRecords(
-      oldTableName: String,
-      newTableName: String,
-      dataBaseName: String): Unit = {
-    val oldPath = CarbonUtil
-      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
-    val newPath = CarbonUtil
-      .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
-    val fileType = FileFactory.getFileType(oldPath)
-    if (FileFactory.isFileExist(oldPath, fileType)) {
-      val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
-        .renameForce(newPath)
-      if (!renameSuccess) {
-        sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 16e99b5..543ba39 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -26,12 +26,13 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
@@ -56,7 +57,7 @@ case class CarbonCreateTableCommand(
     tableInfo.setDatabaseName(dbName)
     tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
     LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tableName]")
-
+    val isTransactionalTable = tableInfo.isTransactionalTable
     if (sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
       if (!ifNotExistsSet) {
@@ -66,8 +67,19 @@ case class CarbonCreateTableCommand(
         throw new TableAlreadyExistsException(dbName, tableName)
       }
     } else {
-      val tablePath = tableLocation.getOrElse(
+      val path = tableLocation.getOrElse(
         CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession))
+      val tablePath = if (FileFactory.getCarbonFile(path).exists() && !isExternal &&
+                          isTransactionalTable && tableLocation.isEmpty) {
+        path + "_" + tableInfo.getFactTable.getTableId
+      } else {
+        path
+      }
+      val streaming = tableInfo.getFactTable.getTableProperties.get("streaming")
+      if (path.startsWith("s3") && streaming != null && streaming != null &&
+          streaming.equalsIgnoreCase("true")) {
+        throw new UnsupportedOperationException("streaming is not supported with s3 store")
+      }
       tableInfo.setTablePath(tablePath)
       val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
 
@@ -90,7 +102,6 @@ case class CarbonCreateTableCommand(
       OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
-      val isTransactionalTable = tableInfo.isTransactionalTable
       if (createDSTable) {
         try {
           val tablePath = tableIdentifier.getTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 42f1f77..b02ea55 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -50,6 +51,7 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, SparkDataTypeConverterImpl, Util}
 
 class SparkCarbonTableFormat
@@ -92,6 +94,8 @@ with Serializable {
       carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
         carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
           CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+    optionsFinal
+      .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
     val partitionStr =
       table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
         _.getColumnName.toLowerCase).mkString(",")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f5c5188..416dbec 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -274,6 +274,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))
         if (property.isDefined) {
+          if (carbonTable.getTablePath.startsWith("s3") && property.get._2.equalsIgnoreCase("s3")) {
+            throw new UnsupportedOperationException("streaming is not supported with s3 store")
+          }
           if (carbonTable.isStreamingSink) {
             throw new MalformedCarbonCommandException(
               "Streaming property can not be changed once it is 'true'")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 5254933..1f8e359 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -286,15 +286,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    val newTablePath = CarbonTablePath.getNewTablePath(
-      identifier.getTablePath, newTableIdentifier.getTableName)
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
       newTableIdentifier.getDatabaseName,
       newTableIdentifier.getTableName,
-      newTablePath)
+      identifier.getTablePath)
     val newAbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
-      newTablePath,
+      identifier.getTablePath,
       newTableIdentifier.getDatabaseName,
       newTableIdentifier.getTableName,
       oldTableIdentifier.getTableId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 2e6ebee..02ee67e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -155,16 +155,14 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
   private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      oldTablePath: String,
+      tablePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
-    val newTablePath =
-      CarbonTablePath.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
     val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
       newTableIdentifier.getDatabaseName,
       newTableIdentifier.getTableName,
-      newTablePath)
+      tablePath)
     val dbName = newTableIdentifier.getDatabaseName
     val tableName = newTableIdentifier.getTableName
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
@@ -175,7 +173,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    newTablePath
+    tablePath
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index b5450b3..67c33ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -102,32 +102,6 @@ object AlterTableUtil {
   }
 
   /**
-   * This method will release the locks by manually forming a lock path. Specific usage for
-   * rename table
-   *
-   * @param locks
-   * @param locksAcquired
-   * @param dbName
-   * @param tableName
-   * @param tablePath
-   */
-  def releaseLocksManually(locks: List[ICarbonLock],
-      locksAcquired: List[String],
-      dbName: String,
-      tableName: String,
-      tablePath: String): Unit = {
-    val lockLocation = tablePath
-    locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
-      val lockFilePath = CarbonTablePath.getLockFilePath(lockLocation, lockType)
-      if (carbonLock.releaseLockManually(lockFilePath)) {
-        LOGGER.info(s"Alter table lock released successfully: ${ lockType }")
-      } else {
-        LOGGER.error("Unable to release lock during alter table operation")
-      }
-    }
-  }
-
-  /**
    * @param carbonTable
    * @param schemaEvolutionEntry
    * @param thriftTable
@@ -195,7 +169,6 @@ object AlterTableUtil {
     val oldCarbonTableIdentifier = oldCarbonTable.getCarbonTableIdentifier
     val database = oldCarbonTable.getDatabaseName
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val newTablePath = CarbonTablePath.getNewTablePath(tablePath, newTableName)
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val fileType = FileFactory.getFileType(tablePath)
     if (FileFactory.isFileExist(tablePath, fileType)) {
@@ -204,10 +177,8 @@ object AlterTableUtil {
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
       if (updatedTime == timeStamp) {
         LOGGER.error(s"Reverting changes for $database.${oldCarbonTable.getTableName}")
-        FileFactory.getCarbonFile(tablePath, fileType)
-          .renameForce(CarbonTablePath.getNewTablePath(tablePath, oldCarbonTable.getTableName))
         val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
-          newTablePath,
+          tablePath,
           newCarbonTableIdentifier)
         metastore.revertTableSchemaInAlterFailure(oldCarbonTableIdentifier,
           tableInfo, absoluteTableIdentifier)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
index a59ae67..986365e 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.carbondata
 
 import java.io.File
 
+import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.hive.HiveContext
 import org.scalatest.BeforeAndAfterAll
@@ -44,8 +45,8 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll
 
   test("data load log file and csv file written at the configured location") {
     sql(
-      """CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
+      s"""CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
     val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
@@ -63,9 +64,9 @@ class BadRecordPathLoadOptionTest extends Spark2QueryTest with BeforeAndAfterAll
   }
 
   def isFilesWrittenAtBadStoreLocation: Boolean = {
-    val badStorePath = CarbonProperties.getInstance()
-                         .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC) +
-                       "/default/salestest/0/0"
+    val badStorePath =
+      CarbonEnv.getCarbonTable(Some("default"), "salestest")(sqlContext.sparkSession).getTableInfo
+        .getFactTable.getTableProperties.get("bad_records_path") + "/0/0"
     val carbonFile: CarbonFile = FileFactory
       .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath))
     var exists: Boolean = carbonFile.exists()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 9d9a9f5..9b2af33 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -53,6 +53,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
   val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords")
 
   override def beforeAll {
+    badRecordFilePath.delete()
     badRecordFilePath.mkdirs()
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
@@ -191,6 +192,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
     sql("USE default")
     sql("DROP DATABASE IF EXISTS streaming CASCADE")
     var csvDataDir = integrationPath + "/spark2/target/csvdatanew"
+    badRecordFilePath.delete()
     new File(csvDataDir).delete()
     csvDataDir = integrationPath + "/spark2/target/csvdata"
     new File(csvDataDir).delete()
@@ -1627,8 +1629,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       generateBadRecords = true,
       badRecordAction = "redirect",
       autoHandoff = false,
-      badRecordsPath = badRecordFilePath.getCanonicalPath
-    )
+      badRecordsPath = badRecordFilePath.getCanonicalPath)
     assert(new File(badRecordFilePath.getCanonicalFile + "/streaming/bad_record_redirect").isDirectory)
     checkAnswer(sql("select count(*) from streaming.bad_record_redirect"), Seq(Row(19)))
   }
@@ -2199,7 +2200,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
-         | 'sort_columns'='name', 'dictionary_include'='city,register')
+         | 'sort_columns'='name', 'dictionary_include'='city,register', 'BAD_RECORDS_PATH'='$badRecordFilePath')
          | """.stripMargin)
 
     if (withBatchLoad) {
@@ -2228,7 +2229,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
          | )
          | STORED BY 'carbondata'
          | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
-         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated')
+         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'BAD_RECORDS_PATH'='$badRecordFilePath')
          | """.stripMargin)
 
     if (withBatchLoad) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 27ed1bd..34abc92 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -98,18 +98,6 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test to check if exception during rename table does not throws table not found exception") {
-    val locks = AlterTableUtil
-      .validateTableAndAcquireLock("default", "reverttest", List("meta.lock"))(sqlContext
-        .sparkSession)
-    val exception = intercept[ProcessMetaDataException] {
-      sql("alter table reverttest rename to revert")
-    }
-    AlterTableUtil.releaseLocks(locks)
-    assert(exception.getMessage.contains(
-      "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time"))
-  }
-
   override def afterAll() {
     hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     sql("drop table if exists reverttest")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 648ed11..c7219f8 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -547,7 +547,7 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
       " a,sum(b) from PreAggMain group by a")
     assert(intercept[ProcessMetaDataException] {
       sql("alter table preAggmain_preagg1 rename to preagg2")
-    }.getMessage.contains("Rename operation for pre-aggregate table is not supported."))
+    }.getMessage.contains("Rename operation for datamaps is not supported."))
     assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain rename to preaggmain_new")
     }.getMessage.contains("Rename operation is not supported for table with pre-aggregate tables"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index d94570a..ba42670 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -21,13 +21,14 @@ import java.io.{File, FileOutputStream, FileWriter}
 import java.math.{BigDecimal, RoundingMode}
 import java.sql.{Date, Timestamp}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
@@ -679,6 +680,9 @@ class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
     sql("alter table t5 rename to t6")
     sql("create table t5 (c1 string, c2 int,c3 string) stored by 'carbondata'")
     sql("insert into t5 select 'asd',1,'sdf'")
+    val t5: CarbonTable = CarbonEnv.getCarbonTable(None, "t5")(sqlContext.sparkSession)
+    assert(t5.getTablePath
+      .contains(t5.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId))
     checkAnswer(sql("select * from t5"),Seq(Row("asd",1,"sdf")))
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
index c2ddff8..25ae1c1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
@@ -17,13 +17,10 @@
 
 package org.apache.carbondata.processing.loading;
 
-import java.io.File;
-
 import org.apache.carbondata.common.constants.LoggerAction;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 
 /**
@@ -71,34 +68,18 @@ public class BadRecordsLoggerProvider {
     }
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
-    String storeLocation = "";
-    if (configuration.isCarbonTransactionalTable()) {
-      storeLocation =
-          identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-              .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
-              + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo();
-    } else {
-      storeLocation =
-          "SdkWriterBadRecords" + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo();
-    }
 
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
         identifier.getTableName() + '_' + System.currentTimeMillis(),
-        getBadLogStoreLocation(configuration, storeLocation), badRecordsLogRedirect,
+        getBadLogStoreLocation(configuration), badRecordsLogRedirect,
         badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
   }
 
-  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
-      String storeLocation) {
+  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration) {
     String badLogStoreLocation = (String) configuration
         .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
-    if (null == badLogStoreLocation) {
-      badLogStoreLocation =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    }
-    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
-    return badLogStoreLocation;
+    return CarbonTablePath.generateBadRecordsPath(badLogStoreLocation, configuration.getSegmentId(),
+        configuration.getTaskNo(), configuration.isCarbonTransactionalTable());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 9418efb..46ad32f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -119,9 +119,19 @@ public class CarbonDataLoadConfiguration {
    */
   private String dataWritePath;
 
+  private String parentTablePath;
+
   public CarbonDataLoadConfiguration() {
   }
 
+  public String getParentTablePath() {
+    return parentTablePath;
+  }
+
+  public void setParentTablePath(String parentTablePath) {
+    this.parentTablePath = parentTablePath;
+  }
+
   public void setDataFields(DataField[] dataFields) {
     this.dataFields = dataFields;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 2a9ab6d..aa9aa01 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -217,6 +217,7 @@ public final class DataLoadProcessBuilder {
     CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+    configuration.setParentTablePath(loadModel.getParentTablePath());
     configuration.setTableIdentifier(identifier);
     configuration.setCarbonTransactionalTable(loadModel.isCarbonTransactionalTable());
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
@@ -240,6 +241,7 @@ public final class DataLoadProcessBuilder {
         loadModel.getSkipEmptyLine());
     configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
         loadModel.getFactFilePath());
+    configuration.setParentTablePath(loadModel.getParentTablePath());
     configuration
         .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
     configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 470c092..39c12a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -67,7 +66,7 @@ public class FieldEncoderFactory {
   public FieldConverter createFieldEncoder(DataField dataField,
       AbsoluteTableIdentifier absoluteTableIdentifier, int index, String nullFormat,
       DictionaryClient client, Boolean useOnePass, Map<Object, Integer> localCache,
-      boolean isEmptyBadRecord) throws IOException {
+      boolean isEmptyBadRecord, String parentTablePath) throws IOException {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimension()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -100,10 +99,7 @@ public class FieldEncoderFactory {
               new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null,
                   dataField.getColumn().getDataType());
           AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
-              AbsoluteTableIdentifier.from(
-                  CarbonTablePath.getNewTablePath(
-                      absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()),
-                  parentTableIdentifier);
+              AbsoluteTableIdentifier.from(parentTablePath, parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType());
           return new DictionaryFieldConverterImpl(dataField.getColumn(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 8f0557a..a5e5138 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -93,8 +93,9 @@ public class RowConverterImpl implements RowConverter {
     for (int i = 0; i < fields.length; i++) {
       localCaches[i] = new ConcurrentHashMap<>();
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
-          .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat,
-              client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord);
+          .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat, client,
+              configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
+              configuration.getParentTablePath());
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -213,7 +214,8 @@ public class RowConverterImpl implements RowConverter {
       try {
         fieldConverter = FieldEncoderFactory.getInstance()
             .createFieldEncoder(fields[i], configuration.getTableIdentifier(), i, nullFormat,
-                client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord);
+                client, configuration.getUseOnePass(), localCaches[i], isEmptyBadRecord,
+                configuration.getParentTablePath());
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a1d12aa/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index b9b42b2..146d9af 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -50,6 +50,8 @@ public class CarbonLoadModel implements Serializable {
 
   private String tablePath;
 
+  private String parentTablePath;
+
   /*
      This points if the carbonTable is a Non Transactional Table or not.
      The path will be pointed by the tablePath. And there will be
@@ -232,6 +234,14 @@ public class CarbonLoadModel implements Serializable {
     isAggLoadRequest = aggLoadRequest;
   }
 
+  public String getParentTablePath() {
+    return parentTablePath;
+  }
+
+  public void setParentTablePath(String parentTablePath) {
+    this.parentTablePath = parentTablePath;
+  }
+
   /**
    * get escape char
    *
@@ -459,6 +469,7 @@ public class CarbonLoadModel implements Serializable {
     copy.isLoadWithoutConverterStep = isLoadWithoutConverterStep;
     copy.sortColumnsBoundsStr = sortColumnsBoundsStr;
     copy.loadMinSize = loadMinSize;
+    copy.parentTablePath = parentTablePath;
     return copy;
   }
 
@@ -513,6 +524,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.isAggLoadRequest = isAggLoadRequest;
     copyObj.sortColumnsBoundsStr = sortColumnsBoundsStr;
     copyObj.loadMinSize = loadMinSize;
+    copyObj.parentTablePath = parentTablePath;
     return copyObj;
   }
 


Mime
View raw message