carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [2/3] carbondata git commit: [CARBONDATA-2313] Support unmanaged carbon table read and write
Date Thu, 05 Apr 2018 13:27:46 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
new file mode 100644
index 0000000..bfb9471
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.{File, FileFilter}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  // prepare SDK writer output
+  def buildTestData(persistSchema: Boolean, outputMultipleFiles: Boolean): Any = {
+
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
+            .uniqueIdentifier(
+              System.currentTimeMillis)
+            .buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
+            .uniqueIdentifier(
+              System.currentTimeMillis).withBlockSize(1).withBlockletSize(1)
+            .buildWriterForCSVInput()
+        }
+      var i = 0
+      var row = 3
+      if (outputMultipleFiles) {
+        row = 1000000
+      }
+      while (i < row) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteFile(path: String, extension: String): Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  test("test create External Table with Schema with partition, should ignore schema and partition")
+  {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("read unmanaged table, files written from sdk Writer Output)") {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable1")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"),
+      Row("robot1"),
+      Row("robot2")))
+
+    checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"),
+      Seq(Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"),
+      Seq(Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"),
+      Seq(Row("robot0", 0, 0.0),
+        Row("robot1", 1, 0.5)))
+
+    checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3)))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3)))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3)))
+
+    sql("DROP TABLE sdkOutputTable1")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Test Blocked operations for unmanaged table ") {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    //1. alter datatype
+    var exception = intercept[MalformedCarbonCommandException] {
+      sql("Alter table sdkOutputTable change age age BIGINT")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //2. Load
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE sdkOutputTable ")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //3. Datamap creation
+    exception = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " +
+        "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //4. Insert Into
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("insert into table sdkOutputTable SELECT 20,'robotX',2.5")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //5. compaction
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //6. Show segments
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("Show segments for table sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //7. Delete segment by ID
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //8. Delete segment by date
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //9. Update Segment
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //10. Delete Segment
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //11. Show partition
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("Show partitions sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on unmanaged table"))
+
+    //12. Streaming table creation
+    // External table don't accept table properties
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("test create External Table With Schema, should ignore the schema provided") {
+    buildTestData(false, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with schema
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestData(false, false)
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //    data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestData(false, false)
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output multiple files ") {
+    buildTestData(false, true)
+    assert(new File(writerPath).exists())
+    val folder = new File(writerPath)
+    val dataFiles = folder.listFiles(new FileFilter() {
+      override def accept(pathname: File): Boolean = {
+        pathname.getName
+          .endsWith(CarbonCommonConstants.FACT_FILE_EXT)
+      }
+    })
+    Assert.assertNotNull(dataFiles)
+    Assert.assertNotEquals(1, dataFiles.length)
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b56bd6e..0c96247 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
@@ -69,7 +70,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
     val file = FileFactory.getCarbonFile(
       CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
@@ -87,8 +88,7 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Get datamaps for distributable object.
    */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = {
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[CoarseGrainDataMap] = {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: CoarseGrainDataMap = new CGDataMap()
     dataMap.init(new DataMapModel(mapDistributable.getFilePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index bb8d7f8..270c676 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -53,9 +54,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
   override def createWriter(segment: Segment, dataWritePath: String): DataMapWriter =
     DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 3bfc7b9..060d06a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -25,7 +25,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.datamap.dev.{DataMapModel}
+import org.apache.carbondata.core.datamap.dev.DataMapModel
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter}
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
@@ -71,7 +72,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segment: Segment): java.util.List[FineGrainDataMap] = {
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap] = {
     val file = FileFactory
       .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
@@ -88,8 +89,7 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Get datamap for distributable object.
    */
-  override def getDataMaps(
-      distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= {
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): java.util.List[FineGrainDataMap]= {
     val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable]
     val dataMap: FineGrainDataMap = new FGDataMap()
     dataMap.init(new DataMapModel(mapDistributable.getFilePath))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 32c10ef..be9cc51 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Se
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -183,9 +184,15 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable,
+      readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+    ???
+  }
 
-  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment,
+      readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = {
+    ???
+  }
 
   override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
     new DataMapWriter(identifier, segment, writeDirectoryPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 1f4b7fe..65857b1 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier}
+import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -276,9 +277,9 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
-  override def getDataMaps(segment: Segment): util.List[CoarseGrainDataMap] = ???
+  override def getDataMaps(segment: Segment, readCommitted: ReadCommittedScope): util.List[CoarseGrainDataMap] = ???
 
   override def createWriter(segment: Segment, writeDirectoryPath: String): DataMapWriter = {
     new DataMapWriter(identifier, segment, writeDirectoryPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 8c433ea..d34d009 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.Distributable
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.{TableInfo}
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.filter.FilterUtil
 import org.apache.carbondata.core.scan.model.QueryModel
@@ -495,6 +495,8 @@ class CarbonScanRDD(
     if (partitionNames != null) {
       CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
     }
+
+    CarbonInputFormat.setUnmanagedTable(conf, tableInfo.isUnManagedTable)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 605777d..22a0112 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -158,7 +158,7 @@ object PartitionUtils {
       .createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
     CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     val splits = format.getSplitsOfOneSegment(job, segmentId,
-      oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
+        oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
     val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
     val tableBlockInfoList = CarbonInputSplit.createBlocks(blockList.asJava)
     tableBlockInfoList

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index 5f78397..b9e2442 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -33,14 +33,14 @@ object CarbonSparkUtil {
 
   def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
     val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getTableName)
-        .asScala.map(x => x.getColName) // wf : may be problem
+      .asScala.map(x => x.getColName) // wf : may be problem
     val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getTableName)
-        .asScala.map(x => x.getColName)
+      .asScala.map(x => x.getColName)
     val dictionary =
       carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
         (f.getColName.toLowerCase,
-            f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-                !f.getDataType.isComplexType)
+          f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+          !f.getDataType.isComplexType)
       }
     CarbonMetaData(dimensionsAttr,
       measureAttr,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index b27a150..e29986a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -76,6 +76,8 @@ case class CarbonCountStar(
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
+    CarbonInputFormat
+      .setUnmanagedTable(job.getConfiguration, carbonTable.getTableInfo.isUnManagedTable)
     (job, carbonInputFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 693b6c8..ef23926 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -299,7 +299,10 @@ object CarbonSource {
       tableDesc.copy(storage = updatedFormat)
     } else {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
-      if (!metaStore.isReadFromHiveMetaStore) {
+      val isExternal = properties.getOrElse("isExternal", "false")
+      val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
+      tableInfo.setUnManagedTable(isUnManagedTable)
+      if (!isUnManagedTable && !metaStore.isReadFromHiveMetaStore) {
         // save to disk
         metaStore.saveToDisk(tableInfo, properties("tablePath"))
         // remove schema string from map as we don't store carbon schema to hive metastore
@@ -320,16 +323,20 @@ object CarbonSource {
       properties: Map[String, String]): Map[String, String] = {
     val model = createTableInfoFromParams(properties, dataSchema, identifier)
     val tableInfo: TableInfo = TableNewProcessor(model)
+    val isExternal = properties.getOrElse("isExternal", "false")
+    val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
+    val tablePath = properties.getOrElse("path", "")
     tableInfo.setTablePath(identifier.getTablePath)
+    tableInfo.setUnManagedTable(isUnManagedTable)
     tableInfo.setDatabaseName(identifier.getDatabaseName)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    val map = if (metaStore.isReadFromHiveMetaStore) {
-      CarbonUtil.convertToMultiStringMap(tableInfo)
-    } else {
+    val map = if (!metaStore.isReadFromHiveMetaStore && !isUnManagedTable) {
       metaStore.saveToDisk(tableInfo, identifier.getTablePath)
       new java.util.HashMap[String, String]()
+    } else {
+      CarbonUtil.convertToMultiStringMap(tableInfo)
     }
     properties.foreach(e => map.put(e._1, e._2))
     map.put("tablepath", identifier.getTablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index a46d514..be5287f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -55,6 +55,11 @@ case class CarbonCreateDataMapCommand(
         CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
       case _ => null
     }
+
+    if (mainTable != null && mainTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {
       if (!ifNotExistsSet) {
         throw new MalformedDataMapCommandException(s"DataMap name '$dataMapName' already exist")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index bccc4dd..dc96399 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -77,6 +77,10 @@ case class CarbonAlterTableCompactionCommand(
       }
       relation.carbonTable
     }
+    if (table.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     if (CarbonUtil.hasAggregationDataMap(table) ||
         (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
       // If the compaction request is of 'streaming' type then we need to generate loadCommands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 81427a1..57ccd82 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
@@ -35,6 +36,10 @@ case class CarbonDeleteLoadByIdCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 1d76bda..7d0655d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
@@ -36,6 +37,10 @@ case class CarbonDeleteLoadByLoadDateCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     // if insert overwrite in progress, do not allow delete segment
     if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f0a9a9e..3f86ca4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -153,6 +153,9 @@ case class CarbonLoadDataCommand(
     } else {
       null
     }
+    if (table.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
     // get the value of 'spark.executor.cores' from spark conf, default value is 1
     val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
     // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 1e5885e..1e65887 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.{StringType, TimestampType}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 case class CarbonShowLoadsCommand(
     databaseNameOp: Option[String],
@@ -43,6 +44,9 @@ case class CarbonShowLoadsCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
     CarbonStore.showSegments(
       limit,
       carbonTable.getMetadataPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 230378b..ca9a6a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -44,6 +45,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
+
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 2a92478..24ac80c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -56,6 +57,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    if (carbonTable.getTableInfo.isUnManagedTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data update")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index e0e51ed..65c6269 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -39,6 +39,7 @@ case class CarbonCreateTableCommand(
     tableInfo: TableInfo,
     ifNotExistsSet: Boolean = false,
     tableLocation: Option[String] = None,
+    isExternal : Boolean = false,
     createDSTable: Boolean = true,
     isVisible: Boolean = true)
   extends MetadataCommand {
@@ -89,6 +90,7 @@ case class CarbonCreateTableCommand(
       OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
+      val isUnmanaged = tableInfo.isUnManagedTable
       if (createDSTable) {
         try {
           val tablePath = tableIdentifier.getTablePath
@@ -128,6 +130,8 @@ case class CarbonCreateTableCommand(
                |  dbName "$dbName",
                |  tablePath "$tablePath",
                |  path "$tablePath",
+               |  isExternal "$isExternal",
+               |  isUnManaged "$isUnmanaged",
                |  isVisible "$isVisible"
                |  $carbonSchemaString)
                |  $partitionString

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index 2daece3..788a820 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.datamap.{DataMapChooser, DataMapStoreManager,
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, ColumnarFormatVersion}
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
 import org.apache.carbondata.core.reader.CarbonHeaderReader
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
@@ -222,6 +223,8 @@ class SparkCarbonFileFormat extends FileFormat
 
 
         val segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null")
+        val readCommittedScope = new LatestFilesReadCommittedScope(
+          identifier.getTablePath + "/Fact/Part0/Segment_null/")
         val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentPath)
         if (indexFiles.size() == 0) {
           throw new SparkException("Index file not present to read the carbondata file")
@@ -233,7 +236,7 @@ class SparkCarbonFileFormat extends FileFormat
           .choose(tab, model.getFilterExpressionResolverTree)
 
         // TODO : handle the partition for CarbonFileLevelFormat
-        val prunedBlocklets = dataMapExprWrapper.prune(segments, null)
+        val prunedBlocklets = dataMapExprWrapper.prune(segments, null, readCommittedScope)
 
         val detailInfo = prunedBlocklets.get(0).getDetailInfo
         detailInfo.readColumnSchema(detailInfo.getColumnSchemaBinary)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index a5a96af..ad3ad2e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
+          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
           } else {
             ExecutedCommandExec(dataTypeChange) :: Nil
           }
@@ -133,6 +135,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
+          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
           } else {
             ExecutedCommandExec(addColumn) :: Nil
           }
@@ -149,6 +153,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
+          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
           } else {
             ExecutedCommandExec(dropColumn) :: Nil
           }
@@ -181,6 +187,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
             .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+          if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+          }
           if (!carbonTable.isHivePartitionTable) {
             ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil
           } else {
@@ -231,6 +240,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         // if the table has 'preaggregate' DataMap, it doesn't support streaming now
         val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+        if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
+          throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+        }
 
         // TODO remove this limitation later
         val property = properties.find(_._1.equalsIgnoreCase("streaming"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 7a8601a..0075c13 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.net.URI
-import java.util.UUID
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -43,11 +42,10 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.table
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -105,7 +103,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
       case Some(t) =>
         CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
       case None =>
-        readCarbonSchema(absIdentifier) match {
+        readCarbonSchema(absIdentifier,
+          parameters.getOrElse("isUnManaged", "false").toBoolean) match {
           case Some(meta) =>
             CarbonRelation(database, tableName,
               CarbonSparkUtil.createSparkMeta(meta), meta)
@@ -207,25 +206,51 @@ class CarbonFileMetastore extends CarbonMetaStore {
     true
   }
 
-  private def readCarbonSchema(identifier: AbsoluteTableIdentifier): Option[CarbonTable] = {
+  def isTableInMetastore(identifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean = {
+    sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
+      .exists(_.table.equalsIgnoreCase(identifier.getTableName))
+  }
+
+
+  private def readCarbonSchema(identifier: AbsoluteTableIdentifier,
+      inferSchema: Boolean): Option[CarbonTable] = {
+
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val tableName = identifier.getCarbonTableIdentifier.getTableName
+    val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
     val tablePath = identifier.getTablePath
-    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-    val fileType = FileFactory.getFileType(tableMetadataFile)
-    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-      val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
-      val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
-      val schemaConverter = new ThriftWrapperSchemaConverterImpl
+    val wrapperTableInfo =
+    if (inferSchema) {
+      val thriftTableInfo = schemaConverter
+        .fromWrapperToExternalTableInfo(SchemaReader.inferSchema(identifier, false),
+          dbName, tableName)
       val wrapperTableInfo =
-        schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
+        schemaConverter
+          .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath)
+      wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true")
+      wrapperTableInfo.setUnManagedTable(true)
+      Some(wrapperTableInfo)
+    } else {
+      val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+      val fileType = FileFactory.getFileType(tableMetadataFile)
+      if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+        val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
+        val wrapperTableInfo =
+          schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
+        Some(wrapperTableInfo)
+      } else {
+        None
+      }
+    }
+
+    wrapperTableInfo.map { tableInfo =>
       CarbonMetadata.getInstance().removeTable(tableUniqueName)
-      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
+      CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
       metadata.carbonTables += carbonTable
-      Some(carbonTable)
-    } else {
-      None
+      carbonTable
     }
   }
 
@@ -448,6 +473,34 @@ class CarbonFileMetastore extends CarbonMetaStore {
       val tableIdentifier = TableIdentifier(tableName, Option(dbName))
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
       DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+    } else {
+      if (isUnmanagedCarbonTable(absoluteTableIdentifier, sparkSession)) {
+        removeTableFromMetadata(dbName, tableName)
+        CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
+        // discard cached table info in cachedDataSourceTables
+        val tableIdentifier = TableIdentifier(tableName, Option(dbName))
+        sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+        DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
+      }
+    }
+  }
+
+
+  def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier,
+      sparkSession: SparkSession): Boolean = {
+    if (sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName)
+      .exists(_.table.equalsIgnoreCase(identifier.getTableName))) {
+
+      val table = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+        .getCarbonEnv().carbonMetastore
+        .getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName)
+
+      table match {
+        case null => false
+        case _ => table.get.getTableInfo.isUnManagedTable
+      }
+    } else {
+      false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 7b464f2..33eac61 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -258,6 +258,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+    var unManagedTable : Boolean = false
 
     val tableInfo = if (external) {
       // read table info from schema file in the provided table path
@@ -267,9 +268,13 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         tableIdentifier.table)
       val table = try {
         val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
-        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
-            provider.equalsIgnoreCase("'carbonfile'")) {
-          SchemaReader.inferSchema(identifier)
+        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+          if (provider.equalsIgnoreCase("'carbonfile'")) {
+            SchemaReader.inferSchema(identifier, true)
+          } else {
+            unManagedTable = true
+            SchemaReader.inferSchema(identifier, false)
+          }
         }
         else {
           SchemaReader.getTableInfo(identifier)
@@ -302,6 +307,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         tableComment)
       TableNewProcessor(tableModel)
     }
+    tableInfo.setUnManagedTable(unManagedTable)
     selectQuery match {
       case query@Some(q) =>
         CarbonCreateTableAsSelectCommand(
@@ -313,7 +319,8 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         CarbonCreateTableCommand(
           tableInfo = tableInfo,
           ifNotExistsSet = ifNotExists,
-          tableLocation = tablePath)
+          tableLocation = tablePath,
+          external)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index c21c57f..829c17e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -110,6 +110,8 @@ public class CarbonDataLoadConfiguration {
 
   private SortColumnRangeInfo sortColumnRangeInfo;
 
+  private boolean carbonUnmanagedTable;
+
   /**
    * Flder path to where data should be written for this load.
    */
@@ -377,4 +379,12 @@ public class CarbonDataLoadConfiguration {
   public void setSortColumnRangeInfo(SortColumnRangeInfo sortColumnRangeInfo) {
     this.sortColumnRangeInfo = sortColumnRangeInfo;
   }
+
+  public boolean isCarbonUnmanagedTable() {
+    return carbonUnmanagedTable;
+  }
+
+  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
+    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 21a4b3e..6d3f596 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -185,6 +185,7 @@ public final class DataLoadProcessBuilder {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
     configuration.setTableIdentifier(identifier);
+    configuration.setCarbonUnmanagedTable(loadModel.isCarbonUnmanagedTable());
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());
@@ -214,6 +215,7 @@ public final class DataLoadProcessBuilder {
         loadModel.getGlobalSortPartitions());
     configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
         loadModel.getBadRecordsLocation());
+
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index ebf3cf1..fd39563 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -47,6 +47,13 @@ public class CarbonLoadModel implements Serializable {
 
   private String tablePath;
 
+  /*
+     This points if the carbonTable is a Unmanaged Table or not.
+     The path will be pointed by the tablePath. And there will be
+     no Metadata folder present for the unmanaged Table.
+   */
+  private boolean carbonUnmanagedTable;
+
   private String csvHeader;
   private String[] csvHeaderColumns;
   private String csvDelimiter;
@@ -410,6 +417,7 @@ public class CarbonLoadModel implements Serializable {
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
     copy.tablePath = tablePath;
+    copy.carbonUnmanagedTable = carbonUnmanagedTable;
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
@@ -463,6 +471,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.tablePath = tablePath;
+    copyObj.carbonUnmanagedTable = carbonUnmanagedTable;
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
@@ -825,4 +834,12 @@ public class CarbonLoadModel implements Serializable {
     LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath);
     setLoadMetadataDetails(Arrays.asList(details));
   }
+
+  public boolean isCarbonUnmanagedTable() {
+    return carbonUnmanagedTable;
+  }
+
+  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
+    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 8c005c3..8eb5ed1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -60,7 +60,7 @@ public class CarbonLoadModelBuilder {
    * @return a new CarbonLoadModel instance
    */
   public CarbonLoadModel build(
-      Map<String, String> options) throws InvalidLoadOptionException, IOException {
+      Map<String, String> options, long UUID) throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
 
     if (!options.containsKey("fileheader")) {
@@ -72,10 +72,13 @@ public class CarbonLoadModelBuilder {
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
     }
     CarbonLoadModel model = new CarbonLoadModel();
+    model.setCarbonUnmanagedTable(table.isUnManagedTable());
+    model.setFactTimeStamp(UUID);
 
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);
 
+
     // set default values
     model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
     model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index e605b9e..089b8c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -244,8 +244,9 @@ public class LoadOption {
       }
     }
 
-    if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
+    if (!carbonLoadModel.isCarbonUnmanagedTable() && !CarbonDataProcessorUtil
+        .isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
+            carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {
         LOG.error("CSV header in DDL is not proper."
             + " Column names in schema and CSV header are not the same.");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 1d892e0..0625a66 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -366,8 +366,14 @@ public class CarbonFactDataHandlerModel {
       return paths;
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
-    String carbonDataDirectoryPath = CarbonTablePath
-        .getSegmentPath(absoluteTableIdentifier.getTablePath(), configuration.getSegmentId() + "");
+    String carbonDataDirectoryPath;
+    if (configuration.isCarbonUnmanagedTable()) {
+      carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath();
+    } else {
+      carbonDataDirectoryPath = CarbonTablePath
+          .getSegmentPath(absoluteTableIdentifier.getTablePath(),
+              configuration.getSegmentId() + "");
+    }
     CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
index 210d516..716ec2a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReader.java
@@ -84,8 +84,8 @@ public class CarbonReader<T> {
   /**
    * Return a new {@link CarbonReaderBuilder} instance
    */
-  public static CarbonReaderBuilder builder(String tablePath) {
-    return new CarbonReaderBuilder(tablePath);
+  public static CarbonReaderBuilder builder(String tablePath, String tableName) {
+    return new CarbonReaderBuilder(tablePath, tableName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 894b973..7f00b49 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -45,9 +45,11 @@ public class CarbonReaderBuilder {
   private String tablePath;
   private String[] projectionColumns;
   private Expression filterExpression;
+  private String tableName;
 
-  CarbonReaderBuilder(String tablePath) {
+  CarbonReaderBuilder(String tablePath, String tableName) {
     this.tablePath = tablePath;
+    this.tableName = tableName;
   }
 
   public CarbonReaderBuilder projection(String[] projectionColumnNames) {
@@ -63,7 +65,7 @@ public class CarbonReaderBuilder {
   }
 
   public <T> CarbonReader<T> build() throws IOException, InterruptedException {
-    CarbonTable table = CarbonTable.buildFromTablePath("_temp", tablePath);
+    CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath);
 
     final CarbonFileInputFormat format = new CarbonFileInputFormat();
     final Job job = new Job(new Configuration());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 5be60c4..f70e165 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -55,6 +55,8 @@ public class CarbonWriterBuilder {
   private boolean persistSchemaFile;
   private int blockletSize;
   private int blockSize;
+  private boolean isUnManagedTable;
+  private long UUID;
 
   public CarbonWriterBuilder withSchema(Schema schema) {
     Objects.requireNonNull(schema, "schema should not be null");
@@ -82,9 +84,21 @@ public class CarbonWriterBuilder {
     return this;
   }
 
+  public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) {
+    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
+    this.isUnManagedTable = isUnManagedTable;
+    return this;
+  }
+
+  public CarbonWriterBuilder uniqueIdentifier(long UUID) {
+    Objects.requireNonNull(UUID, "Unique Identifier should not be null");
+    this.UUID = UUID;
+    return this;
+  }
+
   public CarbonWriterBuilder withBlockSize(int blockSize) {
-    if (blockSize <= 0) {
-      throw new IllegalArgumentException("blockSize should be greater than zero");
+    if (blockSize <= 0 || blockSize > 2048) {
+      throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB");
     }
     this.blockSize = blockSize;
     return this;
@@ -129,7 +143,7 @@ public class CarbonWriterBuilder {
     }
 
     // build LoadModel
-    return buildLoadModel(table);
+    return buildLoadModel(table, UUID);
   }
 
   /**
@@ -152,8 +166,15 @@ public class CarbonWriterBuilder {
           new StructField(field.getFieldName(), field.getDataType()),
           sortColumnsList.contains(field.getFieldName()));
     }
-    String tableName = "_tempTable";
-    String dbName = "_tempDB";
+    String tableName;
+    String dbName;
+    if (!isUnManagedTable) {
+      tableName = "_tempTable";
+      dbName = "_tempDB";
+    } else {
+      dbName = null;
+      tableName = null;
+    }
     TableSchema schema = tableSchemaBuilder.build();
     schema.setTableName(tableName);
     CarbonTable table = CarbonTable.builder()
@@ -161,6 +182,7 @@ public class CarbonWriterBuilder {
         .databaseName(dbName)
         .tablePath(path)
         .tableSchema(schema)
+        .isUnManagedTable(isUnManagedTable)
         .build();
     return table;
   }
@@ -198,13 +220,13 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table)
+  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID)
       throws InvalidLoadOptionException, IOException {
     Map<String, String> options = new HashMap<>();
     if (sortColumns != null) {
       options.put("sort_columns", Strings.mkString(sortColumns, ","));
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    return builder.build(options);
+    return builder.build(options, UUID);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/280a4003/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
new file mode 100644
index 0000000..4bcdfff
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class CSVUnManagedCarbonWriterTest {
+
+  @Test
+  public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testWriteFilesJsonSchema() throws IOException {
+    String path = "./testWriteFilesJsonSchema";
+    FileUtils.deleteDirectory(new File(path));
+
+    String schema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"}\n")
+        .append("]")
+        .toString();
+
+    writeFilesAndVerify(Schema.parseJson(schema), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   */
+  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize) {
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(schema)
+          .unManagedTable(true)
+          .uniqueIdentifier(System.currentTimeMillis())
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+      }
+      writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+  }
+  
+
+  @Test
+  public void testAllPrimitiveDataType() throws IOException {
+    // TODO: write all data type and read by CarbonRecordReader to verify the content
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(new Schema(fields))
+          .uniqueIdentifier(System.currentTimeMillis())
+          .unManagedTable(true)
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34"
+        };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Blocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+
+    // TODO: implement reader to verify the number of blocklet in the file
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Block() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+
+    File segmentFolder = new File(path);
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(2, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testSortColumns() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+    // TODO: implement reader and verify the data is sorted
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testPartitionOutput() {
+    // TODO: test write data with partition
+  }
+
+  @Test
+  public void testSchemaPersistence() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+    Assert.assertTrue(new File(schemaFile).exists());
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}


Mime
View raw message