carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [05/14] incubator-carbondata git commit: add range partition
Date Mon, 08 May 2017 09:48:17 GMT
add range partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6677497e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6677497e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6677497e

Branch: refs/heads/12-dev
Commit: 6677497e7ab9ceffe30e6239f5c7b0162616a317
Parents: 58883d6
Author: lionelcao <whucaolu@gmail.com>
Authored: Thu May 4 22:50:12 2017 +0800
Committer: lionelcao <whucaolu@gmail.com>
Committed: Thu May 4 22:50:12 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  6 ++-
 .../examples/CarbonPartitionExample.scala       |  4 +-
 .../carbondata/spark/util/CommonUtil.scala      | 10 ++---
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 46 +++++++++++++-------
 4 files changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6677497e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 841733a..55040a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -806,8 +806,10 @@ public final class CarbonCommonConstants {
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
   public static final String SORT_COLUMNS = "sort_columns";
-  public static final String PARTITIONING = "partitioning";
-  public static final String PARTITIONCOUNT = "partitioncount";
+  public static final String PARTITION_TYPE = "partition_type";
+  public static final String HASH_NUMBER = "hash_number";
+  public static final String RANGE_INFO = "range_info";
+  public static final String LIST_INFO = "list_info";
   public static final String COLUMN_PROPERTIES = "columnproperties";
   // table block size in MB
   public static final String TABLE_BLOCKSIZE = "table_blocksize";

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6677497e/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 86f3553..03223f3 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -59,7 +59,9 @@ object CarbonPartitionExample {
                 | country String,
                 | area String
                 | )
+                | PARTITIONED BY (logdate Timestamp)
                 | STORED BY 'carbondata'
+                | TBLPROPERTIES('PARTITION_TYPE'='RANGE','RANGE_INFO'='20140101, 20150101
,20160101 ')
               """.stripMargin)
 
     spark.sql("DROP TABLE IF EXISTS t3")
@@ -75,7 +77,7 @@ object CarbonPartitionExample {
        | )
        | PARTITIONED BY (vin String)
        | STORED BY 'carbondata'
-       | TBLPROPERTIES('PARTITIONING'='HASH','PARTITIONCOUNT'='5')
+       | TBLPROPERTIES('PARTITION_TYPE'='HASH','PARTITIONCOUNT'='5')
        """.stripMargin)
 
     // spark.sql(s"""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6677497e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index ff4fa3a..c4701de 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -158,15 +158,15 @@ object CommonUtil {
   def validatePartitionColumns(tableProperties: Map[String, String],
       partitionCols: Seq[StructField]): Boolean = {
     var isValid: Boolean = true
-    val partitioning = tableProperties.get(CarbonCommonConstants.PARTITIONING)
-    val partitioncount = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT)
+    val partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE)
+    val hashNumber = tableProperties.get(CarbonCommonConstants.HASH_NUMBER)
 
     // partition column and partitioning should be both exist or not exist
-    if (partitionCols.isEmpty ^ partitioning.isEmpty) {
+    if (partitionCols.isEmpty ^ partitionType.isEmpty) {
       isValid = false
     } else if (partitionCols.nonEmpty) {
-      partitioning.get.toUpperCase() match {
-        case "HASH" => if (!partitioncount.isDefined) isValid = false
+      partitionType.get.toUpperCase() match {
+        case "HASH" => if (!hashNumber.isDefined) isValid = false
         case "LIST" => isValid = false
         case "RANGE" => isValid = false
         case "RANGE_INTERVAL" => isValid = false

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6677497e/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 611481e..7c87b61 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -34,7 +34,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
-import org.apache.carbondata.core.metadata.schema.partition.Partitioning
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.processing.constants.LoggerAction
@@ -358,14 +358,24 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
    */
   protected def getPartitionInfo(partitionCols: Seq[PartitionerField],
       tableProperties: Map[String, String]): Option[PartitionInfo] = {
-    var partitioning: String = ""
-    var partition_count = 0
+    var partitionType: String = ""
+    var hashNumber = 0
+    var rangeInfo: List[String] = List[String]()
+    var listInfo: List[List[String]] = List[List[String]]()
 
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONING).isDefined) {
-      partitioning = tableProperties.get(CarbonCommonConstants.PARTITIONING).get
+    if (tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).isDefined) {
+      partitionType = tableProperties.get(CarbonCommonConstants.PARTITION_TYPE).get
     }
-    if (tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).isDefined) {
-      partition_count = tableProperties.get(CarbonCommonConstants.PARTITIONCOUNT).get.toInt
+    if (tableProperties.get(CarbonCommonConstants.HASH_NUMBER).isDefined) {
+      hashNumber = tableProperties.get(CarbonCommonConstants.HASH_NUMBER).get.toInt
+    }
+    if (tableProperties.get(CarbonCommonConstants.RANGE_INFO).isDefined) {
+      rangeInfo = tableProperties.get(CarbonCommonConstants.RANGE_INFO).get.replace(" ","")
+        .split(",").toList
+    }
+    if (tableProperties.get(CarbonCommonConstants.LIST_INFO).isDefined) {
+      rangeInfo = tableProperties.get(CarbonCommonConstants.LIST_INFO).get.replace(" ","")
+        .split(",").toList
     }
     val cols : ArrayBuffer[ColumnSchema] = new ArrayBuffer[ColumnSchema]()
     partitionCols.foreach(partition_col => {
@@ -376,15 +386,21 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser
{
       cols += columnSchema
     })
 
-    val partitionInfo: Option[PartitionInfo] = partitioning.toUpperCase() match {
-      case "HASH" => Some(new PartitionInfo(cols.asJava,
-                              Partitioning.HASH, partition_count))
-      case "LIST" => None
-      case "RANGE" => None
-      case "RANGE_INTERVAL" => None
-      case _ => None
+    var partitionInfo : PartitionInfo = null
+    partitionType.toUpperCase() match {
+      case "HASH" => {
+        partitionInfo = new PartitionInfo(cols.asJava, PartitionType.HASH)
+        partitionInfo.setHashNumber(hashNumber)
+      }
+      case "RANGE" => {
+        partitionInfo = new PartitionInfo(cols.asJava, PartitionType.RANGE)
+        partitionInfo.setRangeInfo(rangeInfo.asJava)
+      }
+      case "LIST" => {
+        partitionInfo = new PartitionInfo(cols.asJava, PartitionType.LIST)
+      }
     }
-    partitionInfo
+    Some(partitionInfo)
   }
 
   protected def extractColumnProperties(fields: Seq[Field], tableProperties: Map[String,
String]):


Mime
View raw message