carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3913] Support table properties: dateformat, timestampformat
Date Fri, 24 Jul 2020 07:38:09 GMT
This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 79275aa  [CARBONDATA-3913] Support table properties: dateformat, timestampformat
79275aa is described below

commit 79275aa6f07f83d7e799af4c6977d9b5eaf3c95a
Author: Shreelekhya <shreelu_gampa@yahoo.com>
AuthorDate: Thu Jul 16 15:55:01 2020 +0530

    [CARBONDATA-3913] Support table properties: dateformat, timestampformat
    
    Why is this PR needed?
    To support dateformat, timestamp format table level.
    
    What changes were proposed in this PR?
    Made the priority of dateformat, timestamp format as:
    1. Load command options
    2. carbon.options.date.format/carbon.options.timestamp.format session property
    3. Table level properties
    4. carbon.date.format/carbon.timestamp.format session property
    Updated in DDL, DML documents.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3849
---
 docs/ddl-of-carbondata.md                          |  19 ++-
 docs/dml-of-carbondata.md                          |   6 +
 .../org/apache/carbondata/spark/CarbonOption.scala |   7 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala           |  35 ++--
 .../carbondata/streaming/StreamSinkFactory.scala   |  18 +-
 .../apache/spark/sql/CarbonDataFrameWriter.scala   |   4 +-
 .../command/management/CommonLoadUtils.scala       |  16 +-
 .../table/CarbonDescribeFormattedCommand.scala     |   2 +
 .../datasources/SparkCarbonTableFormat.scala       |  15 ++
 .../sql/parser/CarbonSparkSqlParserUtil.scala      |   3 +-
 .../org/apache/spark/util/AlterTableUtil.scala     |   4 +-
 .../TestLoadDataWithDiffTimestampFormat.scala      | 186 ++++++++++++++++++++-
 .../spark/testsuite/merge/MergeTestCase.scala      |  71 +++++++-
 .../loading/model/CarbonLoadModelBuilder.java      |  15 +-
 14 files changed, 378 insertions(+), 23 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index e7cfb0c..228d9e7 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -109,7 +109,8 @@ CarbonData DDL statements are documented here,which includes:
 | [LOAD_MIN_SIZE_INMB](#load-minimum-data-size)                | Minimum input data size
per node for data loading          |
 | [Range Column](#range-column)                                | partition input data by
range                              |
 | [INDEX_CACHE_EXPIRATION_TIME_IN_SECONDS](#index-cache-expiration-time-in-seconds)| Table
level time-based cache expiration in seconds |
-
+| [DATEFORMAT](#dateformattimestampformat)                                    | Table level
date format |
+| [TIMESTAMPFORMAT](#dateformattimestampformat)                          | Table level timestamp
format |
  Following are the guidelines for TBLPROPERTIES, CarbonData's additional table options can
be set via carbon.properties.
 
    - ##### Local Dictionary Configuration
@@ -517,6 +518,22 @@ CarbonData DDL statements are documented here,which includes:
      ```
       ALTER TABLE [dbName].tableName SET TBLPROPERTIES ('index_cache_expiration_seconds'='3')
 
+   - ##### DATEFORMAT/TIMESTAMPFORMAT:
+   
+     Date and Timestamp format for specified column to set at Table level.
+     
+     ```
+     TBLPROPERTIES('DATEFORMAT' = 'yyyy-MM-dd','TIMESTAMPFORMAT'='yyyy-MM-dd HH:mm:ss')
+     ```
+     After creation of table or on already created tables use the alter table command to
configure the DATEFORMAT/TIMESTAMPFORMAT.
+     
+     Syntax:
+     
+     ```
+      ALTER TABLE [dbName].tableName SET TBLPROPERTIES ('DATEFORMAT' = 'yyyy-MM-dd','TIMESTAMPFORMAT'='yyyy-MM-dd
HH:mm:ss')
+     ``` 
+     **NOTE:** Date formats are specified by date pattern strings. The date pattern in CarbonData
is the same as in JAVA. Refer to [SimpleDateFormat](http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html).
+     
 ## CREATE TABLE AS SELECT
   This function allows user to create a Carbon table from any of the Parquet/Hive/Carbon
table. This is beneficial when the user wants to create Carbon table from any other Parquet/Hive
table and use the Carbon query engine to query and achieve better query results for cases
where Carbon is faster than other file formats. Also this feature can be used for backing
up the data.
 
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 98a3289..0a8163a 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -191,6 +191,12 @@ CarbonData DML statements are documented here,which includes:
     ```
     **NOTE:** Date formats are specified by date pattern strings. The date pattern in CarbonData
is the same as in JAVA. Refer to [SimpleDateFormat](http://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html).
 
+    Priority order for choosing DATEFORMAT/TIMESTAMPFORMAT is:
+    * Load Data Command
+    * ```carbon.options.date.format/carbon.options.timestamp.format``` session property.
+    * Table level Property
+    * ```carbon.date.format/carbon.timestamp.format``` session property
+    
   - ##### SORT COLUMN BOUNDS:
 
     Range bounds for sort columns.
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 107cbe1..a4ebfbd 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -19,6 +19,9 @@ package org.apache.carbondata.spark
 
 import scala.util.Try
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 /**
  * Contains all options for Spark data source
  */
@@ -77,5 +80,9 @@ class CarbonOption(options: Map[String, String]) {
   lazy val overwriteEnabled: Boolean =
     options.getOrElse("overwrite", "false").toBoolean
 
+  lazy val timestampformat: Option[String] = options.get("timestampformat")
+
+  lazy val dateformat: Option[String] = options.get("dateformat")
+
   def toMap: Map[String, String] = options
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 521f105..5670c1f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -368,12 +368,19 @@ class NewRddIterator(rddIter: Iterator[Row],
     carbonLoadModel: CarbonLoadModel,
     context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
 
-  private val timeStampformatString = CarbonProperties.getInstance()
-    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  private var timeStampformatString = carbonLoadModel.getTimestampFormat
+  private var dateFormatString = carbonLoadModel.getDateFormat
+  if (timeStampformatString.isEmpty) {
+    timeStampformatString = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+  if (dateFormatString.isEmpty) {
+    dateFormatString = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  }
   private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-    .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
   private val dateFormat = new SimpleDateFormat(dateFormatString)
   private val complexDelimiters = carbonLoadModel.getComplexDelimiters
   private val serializationNullFormat =
@@ -430,13 +437,19 @@ class LazyRddIterator(serializer: SerializerInstance,
     carbonLoadModel: CarbonLoadModel,
     context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
 
-  private val timeStampformatString = CarbonProperties.getInstance()
-    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  private var timeStampformatString = carbonLoadModel.getTimestampFormat
+  private var dateFormatString = carbonLoadModel.getDateFormat
+  if (timeStampformatString.isEmpty) {
+    timeStampformatString = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+  if (dateFormatString.isEmpty) {
+    dateFormatString = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+  }
   private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
-  private val dateFormatString = CarbonProperties.getInstance()
-    .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
-      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
   private val dateFormat = new SimpleDateFormat(dateFormatString)
   private val complexDelimiters = carbonLoadModel.getComplexDelimiters
   private val serializationNullFormat =
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index e76fad4..9a1d2d4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -29,8 +29,9 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
 
+import org.apache.carbondata.common.Maps
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
@@ -174,6 +175,7 @@ object StreamSinkFactory {
       parameters: Map[String, String],
       segmentId: String): CarbonLoadModel = {
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
     val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
     optionsFinal.put("sort_scope", "no_sort")
@@ -183,6 +185,20 @@ object StreamSinkFactory {
     }
     optionsFinal
       .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(parameters.asJava, carbonTable))
+    // If DATEFORMAT is not present in load options, check from table properties.
+    if (optionsFinal.get("dateformat").isEmpty) {
+      optionsFinal.put("dateformat", Maps.getOrDefault(tableProperties,
+        "dateformat", CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)))
+    }
+    // If TIMESTAMPFORMAT is not present in load options, check from table properties.
+    if (optionsFinal.get("timestampformat").isEmpty) {
+      optionsFinal.put("timestampformat", Maps.getOrDefault(tableProperties,
+        "timestampformat", CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)))
+    }
     val carbonLoadModel = new CarbonLoadModel()
     new CarbonLoadModelBuilder(carbonTable).build(
       parameters.asJava,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 8331b54..7cafa59 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -86,7 +86,9 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame)
{
       "TABLE_BLOCKSIZE" -> options.tableBlockSize,
       "TABLE_BLOCKLET_SIZE" -> options.tableBlockletSize,
       "TABLE_PAGE_SIZE_INMB" -> options.tablePageSizeInMb,
-      "STREAMING" -> Option(options.isStreaming.toString)
+      "STREAMING" -> Option(options.isStreaming.toString),
+      "DATEFORMAT" -> options.dateformat,
+      "TIMESTAMPFORMAT" -> options.timestampformat
     ).filter(_._2.isDefined)
       .map(property => s"'${property._1}' = '${property._2.get}'").mkString(",")
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 71649b8..fa2d178 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CollectionAccumulator, SparkUtil}
 
-import org.apache.carbondata.common.Strings
+import org.apache.carbondata.common.{Maps, Strings}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants,
SortScopeOptions}
@@ -263,6 +263,20 @@ object CommonLoadUtils {
     }
     optionsFinal
       .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
+    // If DATEFORMAT is not present in load options, check from table properties.
+    if (optionsFinal.get("dateformat").isEmpty) {
+      optionsFinal.put("dateformat", Maps.getOrDefault(tableProperties,
+        "dateformat", CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)))
+    }
+    // If TIMESTAMPFORMAT is not present in load options, check from table properties.
+    if (optionsFinal.get("timestampformat").isEmpty) {
+      optionsFinal.put("timestampformat", Maps.getOrDefault(tableProperties,
+        "timestampformat", CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)))
+    }
     optionsFinal
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 9969942..5e927a3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -137,6 +137,8 @@ private[sql] case class CarbonDescribeFormattedCommand(
       ("Table Blocklet Size ", carbonTable.getBlockletSizeInMB + " MB", ""),
       ("Comment", tblProps.getOrElse(CarbonCommonConstants.TABLE_COMMENT, ""), ""),
       ("Bad Record Path", tblProps.getOrElse("bad_record_path", ""), ""),
+      ("Date Format", tblProps.getOrElse("dateformat", ""), ""),
+      ("Timestamp Format", tblProps.getOrElse("timestampformat", ""), ""),
       ("Min Input Per Node Per Load",
         Strings.formatSize(
           tblProps.getOrElse(CarbonCommonConstants.CARBON_LOAD_MIN_SIZE_INMB,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index e25c6e1..1bf3946 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types._
 import org.apache.spark.TaskContext
 
+import org.apache.carbondata.common.Maps
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -119,6 +120,20 @@ with Serializable {
     }
     optionsFinal
       .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
+    // If DATEFORMAT is not present in load options, check from table properties.
+    if (optionsFinal.get("dateformat").isEmpty) {
+      optionsFinal.put("dateformat", Maps.getOrDefault(tableProperties,
+        "dateformat", CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)))
+    }
+    // If TIMESTAMPFORMAT is not present in load options, check from table properties.
+    if (optionsFinal.get("timestampformat").isEmpty) {
+      optionsFinal.put("timestampformat", Maps.getOrDefault(tableProperties,
+        "timestampformat", CarbonProperties.getInstance
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)))
+    }
     val partitionStr =
       table.getTableInfo.getFactTable.getPartitionInfo.getColumnSchemaList.asScala.map(
         _.getColumnName.toLowerCase).mkString(",")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 694a9bc..4b9a126 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -558,7 +558,8 @@ object CarbonSparkSqlParserUtil {
    * @return returns <true> if lower case conversion is needed else <false>
    */
   def needToConvertToLowerCase(key: String): Boolean = {
-    val noConvertList = Array(CarbonCommonConstants.COMPRESSOR, "PATH", "bad_record_path")
+    val noConvertList = Array(CarbonCommonConstants.COMPRESSOR, "PATH", "bad_record_path",
+      "timestampformat", "dateformat")
     !noConvertList.exists(x => x.equalsIgnoreCase(key))
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index a77ab19..3a1ac1e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -570,7 +570,9 @@ object AlterTableUtil {
       "SORT_COLUMNS",
       "GLOBAL_SORT_PARTITIONS",
       "LONG_STRING_COLUMNS",
-      "INDEX_CACHE_EXPIRATION_SECONDS")
+      "INDEX_CACHE_EXPIRATION_SECONDS",
+      "DATEFORMAT",
+      "TIMESTAMPFORMAT")
     supportedOptions.contains(propKey.toUpperCase)
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index 304b2bc..57c8f4b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
@@ -122,6 +122,190 @@ class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterA
 
   }
 
+  test("test load data with date/timestamp format set at table level") {
+    sql("DROP TABLE IF EXISTS t3")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date date, starttime Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED AS carbondata TBLPROPERTIES('dateformat'='yyyy/MM/dd',
+           'timestampformat'='yyyy-MM-dd HH:mm')
+        """)
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData1.csv' into table t3
+           """)
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData2.csv' into table t3
+           OPTIONS('dateformat' = 'yyyy-MM-dd','timestampformat'='yyyy/MM/dd HH:mm:ss')
+           """)
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 1"),
+      Seq(Row(Timestamp.valueOf("2016-07-23 01:01:00")))
+    )
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 18"),
+      Seq(Row(Timestamp.valueOf("2016-07-25 02:32:02")))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 1"),
+      Seq(Row(new Date(sdf.parse("2015-07-23").getTime)))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 18"),
+      Seq(Row(new Date(sdf.parse("2015-07-25").getTime)))
+    )
+  }
+
+  test("test load data with date/timestamp format set at different levels") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, "yyyy/MM/dd")
+    CarbonProperties.getInstance().addProperty(
+      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, "yyyy-MM-dd HH:mm")
+    sql("DROP TABLE IF EXISTS t3")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date date, starttime Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED AS carbondata TBLPROPERTIES('dateformat'='yyyy/MM/dd',
+           'timestampformat'='yyyy-MM-dd')
+        """)
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData1.csv' into table t3
+           """)
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData2.csv' into table t3
+           OPTIONS('dateformat' = 'yyyy-MM-dd','timestampformat'='yyyy/MM/dd HH:mm:ss')
+           """)
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 1"),
+      Seq(Row(Timestamp.valueOf("2016-07-23 01:01:00")))
+    )
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 18"),
+      Seq(Row(Timestamp.valueOf("2016-07-25 02:32:02")))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 1"),
+      Seq(Row(new Date(sdf.parse("2015-07-23").getTime)))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 18"),
+      Seq(Row(new Date(sdf.parse("2015-07-25").getTime)))
+    )
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT)
+  }
+
+  test("test insert data with date/timestamp format set at table level") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
+    sql("DROP TABLE IF EXISTS t3")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date date, starttime Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED AS carbondata TBLPROPERTIES('dateformat'='yyyy-MM-dd',
+           'timestampformat'='yyyy-MM-dd HH:mm')
+        """)
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData1.csv' into table t3
+           OPTIONS('dateformat' = 'yyyy/MM/dd')
+           """)
+    sql(s"insert into t3 select 11,'2015-7-23','2016-7-23 01:01:30','china','aaa1','phone197',"
+
+        s"'ASD69643',15000")
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 1"),
+      Seq(Row(Timestamp.valueOf("2016-07-23 01:01:00")))
+    )
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 11"),
+      Seq(Row(Timestamp.valueOf("2016-07-23 01:01:00")))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 1"),
+      Seq(Row(new Date(sdf.parse("2015-07-23").getTime)))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 11"),
+      Seq(Row(new Date(sdf.parse("2015-07-23").getTime)))
+    )
+  }
+
+  test("test alter table set and unset date,timestamp from properties") {
+    CarbonProperties.getInstance
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd hh:mm:ss")
+    sql("DROP TABLE IF EXISTS t3")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date date, starttime Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED AS carbondata
+        """)
+    sql("alter table t3 set tblproperties('dateformat'='yyyy/MM/dd'," +
+        "'timestampformat'='yyyy-MM-dd HH:mm')")
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData1.csv' into table t3
+           """)
+    sql("alter table t3 unset tblproperties('dateformat','timestampformat')")
+    sql(
+      s"""
+           LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData2.csv' into table t3
+           """)
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 1"),
+      Seq(Row(Timestamp.valueOf("2016-07-23 01:01:00")))
+    )
+    checkAnswer(
+      sql("SELECT starttime FROM t3 WHERE ID = 18"),
+      Seq(Row(Timestamp.valueOf("2016-07-25 02:32:02")))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 1"),
+      Seq(Row(new Date(sdf.parse("2015-07-23").getTime)))
+    )
+    checkAnswer(
+      sql("SELECT date FROM t3 WHERE ID = 18"),
+      Seq(Row(new Date(sdf.parse("2015-07-25").getTime)))
+    )
+  }
+
+  test("test create table with date/timestamp format and check describe formatted") {
+    sql("DROP TABLE IF EXISTS t3")
+    sql(
+      """
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date date, starttime Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED AS carbondata TBLPROPERTIES('dateformat'='yyyy/MM/dd',
+           'timestampformat'='yyyy-MM-dd HH:mm')
+        """)
+    val descTable = sql(s"describe formatted t3").collect
+    descTable.find(_.get(0).toString.contains("Date Format")) match {
+      case Some(row) => assert(row.get(1).toString.contains("yyyy/MM/dd"))
+      case None => assert(false)
+    }
+    descTable.find(_.get(0).toString.contains("Timestamp Format")) match {
+      case Some(row) => assert(row.get(1).toString.contains("yyyy-MM-dd HH:mm"))
+      case None => assert(false)
+    }
+  }
+
   override def afterAll {
     sql("DROP TABLE IF EXISTS t3")
   }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 8c4725a..916846a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -17,7 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.merge
 
-import java.sql.Date
+import java.sql.{Date, Timestamp}
+import java.text.SimpleDateFormat
 import java.time.LocalDateTime
 
 import scala.collection.JavaConverters._
@@ -157,6 +158,47 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     (dwSelframe, odsframe)
   }
 
+  private def initializeWithDateTimeFormat = {
+    import sqlContext.implicits._
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    val initframe = sqlContext.sparkContext.parallelize(1 to 10, 4)
+      .map { x =>
+        ("id" + x, s"order$x", s"customer$x", x * 10, x * 75, 1, new Date(sdf
+          .parse("2015-07-23").getTime), Timestamp.valueOf("2015-03-03 12:25:03.205"))
+      }.toDF("id", "name", "c_name", "quantity", "price", "state", "date", "time")
+    val loadframe = sqlContext.sparkContext.parallelize(11 to 12, 4)
+      .map { x =>
+        ("id" + x, s"order$x", s"customer$x", x * 10, x * 75, 1, new Date(sdf
+          .parse("2020-07-23").getTime), Timestamp.valueOf("2020-04-04 09:40:05.205"))
+      }.toDF("id", "name", "c_name", "quantity", "price", "state", "date", "time")
+    // setting date and timestampformat table level
+    initframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("dateformat", "yyyy-MM-dd")
+      .option("timestampformat", "yyyy-MM-dd HH:mm")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // setting date and timestampformat for another load option
+    loadframe.write
+      .format("carbondata")
+      .option("tableName", "order")
+      .option("dateformat", "yyyy-MM")
+      .option("timestampformat", "yyyy-MM-dd HH:mm:ss.SSS")
+      .mode(SaveMode.Append)
+      .save()
+    val dwframe = sqlContext.read.format("carbondata").option("tableName", "order").load()
+    val dwSelframe = dwframe.as("A")
+
+    val odsframe = sqlContext.sparkContext.parallelize(1 to 4, 4)
+      .map { x =>
+        ("id" + x, s"order$x", s"customer$x", x * 10, x * 75, 2,
+          new Date(sdf.parse("2015-07-23").getTime), Timestamp.valueOf("2015-05-23 10:30:30"))
+      }.toDS().toDF("id", "name", "c_name", "quantity", "price", "state", "date", "time").as("B")
+
+    (dwSelframe, odsframe)
+  }
+
   test("test basic merge update with all mappings") {
     sql("drop table if exists order")
     val (dwSelframe, odsframe) = initialize
@@ -765,6 +807,33 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select * from target order by key"), Seq(Row("a1","0"),Row("d", "3")))
   }
 
+  test("test merge with table level date and timestamp format") {
+    sql("drop table if exists order")
+    val (dwSelframe, odsframe) = initializeWithDateTimeFormat
+    val insertMap = Map("id" -> "B.id",
+      "name" -> "B.name",
+      "c_name" -> "B.c_name",
+      "quantity" -> "B.quantity",
+      "price" -> "B.price",
+      "state" -> "B.state",
+      "date" -> "B.date",
+      "time" -> "B.time").asInstanceOf[Map[Any, Any]]
+    dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched().
+      insertExpr(insertMap).execute()
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    checkAnswer(
+      sql("select date,time from order where id = 'id1'"),
+      Seq(
+        Row(new Date(sdf.parse("2015-07-23").getTime), Timestamp.valueOf("2015-03-03 12:25:00")),
+        Row(new Date(sdf.parse("2015-07-23").getTime), Timestamp.valueOf("2015-05-23 10:30:00"))
+      ))
+    checkAnswer(
+      sql("select date,time from order where id = 'id11'"),
+      Seq(
+        Row(new Date(sdf.parse("2020-07-01").getTime), Timestamp.valueOf("2020-04-04 09:40:05.205"))
+      ))
+  }
+
   case class Target (id: Int, value: String, remark: String, mdt: String)
   case class Change (id: Int, value: String, change_type: String, mdt: String)
   private val numInitialRows = 10
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index d2bb47e..199b5df 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -71,6 +71,7 @@ public class CarbonLoadModelBuilder {
   public CarbonLoadModel build(Map<String, String>  options, long timestamp, String
taskNo)
       throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
+    Map<String, String> tableProperties = table.getTableInfo().getFactTable().getTableProperties();
 
     if (!options.containsKey("fileheader")) {
       List<CarbonColumn> csvHeader = table.getCreateOrderColumn();
@@ -93,16 +94,22 @@ public class CarbonLoadModelBuilder {
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);
     String timestampFormat = options.get("timestampformat");
+    // If TIMESTAMPFORMAT is not present in load options, check from table properties.
     if (timestampFormat == null) {
       timestampFormat = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-              CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+          .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+              Maps.getOrDefault(tableProperties, "timestampformat", CarbonProperties.getInstance()
+                  .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+                      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)));
     }
     String dateFormat = options.get("dateFormat");
+    // If DATEFORMAT is not present in load options, check from table properties.
     if (dateFormat == null) {
       dateFormat = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
-              CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+          .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+              Maps.getOrDefault(tableProperties, "dateformat", CarbonProperties.getInstance()
+                  .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+                      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)));
     }
     model.setDateFormat(dateFormat);
     model.setTimestampFormat(timestampFormat);


Mime
View raw message