carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [28/50] [abbrv] incubator-carbondata git commit: [CARBONDATA-51] Support generate global dictionary from common dictionary files for multiple columns(#799)
Date Wed, 20 Jul 2016 10:13:56 GMT
[CARBONDATA-51] Support generate global dictionary from common dictionary files for multiple
columns(#799)

1) Support loading dictionary from unique dictionary values of each column.
#801 supports loading dictionary from dictionary files where separate input file for each
column.
While this PR supports loading dictionary values, which are listed in multiple files with
each file containing multiple column unique/dictionary values, in "column_position, unique_value"
format.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/8c839545
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/8c839545
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/8c839545

Branch: refs/heads/master
Commit: 8c839545c0364bc42beb6009d9b4cb555397b79b
Parents: fcb9f84
Author: Gin-zhj <zhujin2@huawei.com>
Authored: Mon Jul 18 18:40:57 2016 +0800
Committer: Venkata Ramana G <g.ramana.v1@gmail.com>
Committed: Mon Jul 18 16:10:57 2016 +0530

----------------------------------------------------------------------
 .../src/main/resources/datawithoutheader.csv    |  10 +
 .../examples/AllDictionaryExample.scala         |  62 +++++
 .../examples/util/AllDictionaryUtil.scala       | 108 ++++++++
 .../carbondata/spark/load/CarbonLoadModel.java  |  10 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   3 +-
 .../execution/command/carbonTableSchema.scala   |   3 +
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  87 ++++--
 .../spark/util/GlobalDictionaryUtil.scala       | 267 +++++++++++++++----
 .../20160423/1400_1405/complex.dictionary       |  20 ++
 .../sample/20160423/1400_1405/sample.dictionary |   9 +
 .../spark/util/AllDictionaryTestCase.scala      | 140 ++++++++++
 11 files changed, 640 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/examples/src/main/resources/datawithoutheader.csv
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/datawithoutheader.csv b/examples/src/main/resources/datawithoutheader.csv
new file mode 100644
index 0000000..df2b945
--- /dev/null
+++ b/examples/src/main/resources/datawithoutheader.csv
@@ -0,0 +1,10 @@
+1,2015/7/23,china,aaa1,phone197,ASD69643,15000
+2,2015/7/24,china,aaa2,phone756,ASD42892,15001
+3,2015/7/25,china,aaa3,phone1904,ASD37014,15002
+4,2015/7/26,china,aaa4,phone2435,ASD66902,15003
+5,2015/7/27,china,aaa5,phone2441,ASD90633,15004
+6,2015/7/28,china,aaa6,phone294,ASD59961,15005
+7,2015/7/29,china,aaa7,phone610,ASD14875,15006
+8,2015/7/30,china,aaa8,phone1848,ASD57308,15007
+9,2015/7/18,china,aaa9,phone706,ASD86717,15008
+10,2015/7/19,usa,aaa10,phone685,ASD30505,15009

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/examples/src/main/scala/org/carbondata/examples/AllDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/AllDictionaryExample.scala b/examples/src/main/scala/org/carbondata/examples/AllDictionaryExample.scala
new file mode 100644
index 0000000..bf14a99
--- /dev/null
+++ b/examples/src/main/scala/org/carbondata/examples/AllDictionaryExample.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.carbondata.examples
+
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.examples.util.{AllDictionaryUtil, InitForExamples}
+
+object AllDictionaryExample {
+  def main(args: Array[String]) {
+    val cc = InitForExamples.createCarbonContext("CarbonExample")
+    val testData = InitForExamples.currentPath + "/src/main/resources/datawithoutheader.csv"
+    val csvHeader = "id,date,country,name,phonetype,serialname,salary"
+    val dictCol = "|date|country|name|phonetype|serialname|"
+    val allDictFile = InitForExamples.currentPath +
+      "/src/main/resources/datawithoutheader.dictionary"
+    // extract all dictionary files from source data
+    AllDictionaryUtil.extractDictionary(cc.sparkContext,
+      testData, allDictFile, csvHeader, dictCol)
+    // Specify timestamp format based on raw data
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/mm/dd")
+
+    cc.sql("DROP TABLE IF EXISTS t3")
+
+    cc.sql("""
+           CREATE TABLE IF NOT EXISTS t3
+           (ID Int, date Timestamp, country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED BY 'org.apache.carbondata.format'
+           """)
+
+    cc.sql(s"""
+           LOAD DATA LOCAL INPATH '$testData' into table t3
+           options('FILEHEADER'='id,date,country,name,phonetype,serialname,salary',
+           'ALL_DICTIONARY_PATH'='$allDictFile')
+           """)
+
+    cc.sql("""
+           SELECT * FROM t3
+           """).show()
+
+    cc.sql("DROP TABLE IF EXISTS t3")
+
+    // clean local dictionary files
+    AllDictionaryUtil.cleanDictionary(allDictFile)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/examples/src/main/scala/org/carbondata/examples/util/AllDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/carbondata/examples/util/AllDictionaryUtil.scala
b/examples/src/main/scala/org/carbondata/examples/util/AllDictionaryUtil.scala
new file mode 100644
index 0000000..78c39a6
--- /dev/null
+++ b/examples/src/main/scala/org/carbondata/examples/util/AllDictionaryUtil.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.examples.util
+
+import java.io.DataOutputStream
+
+import scala.collection.mutable.{ArrayBuffer, HashSet}
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkContext
+
+import org.carbondata.core.datastorage.store.impl.FileFactory
+
+object AllDictionaryUtil extends Logging{
+  def extractDictionary(sc: SparkContext,
+                        srcData: String,
+                        outputPath: String,
+                        fileHeader: String,
+                        dictCol: String): Unit = {
+    val fileHeaderArr = fileHeader.split(",")
+    val isDictCol = new Array[Boolean](fileHeaderArr.length)
+    for (i <- 0 until fileHeaderArr.length) {
+      if (dictCol.contains("|" + fileHeaderArr(i) + "|")) {
+        isDictCol(i) = true
+      } else {
+        isDictCol(i) = false
+      }
+    }
+    val dictionaryRdd = sc.textFile(srcData).flatMap(x => {
+      val tokens = x.split(",")
+      val result = new ArrayBuffer[(Int, String)]()
+      for (i <- 0 until isDictCol.length) {
+        if (isDictCol(i)) {
+          try {
+            result += ((i, tokens(i)))
+          } catch {
+            case ex: ArrayIndexOutOfBoundsException =>
+              logError("Read a bad record: " + x)
+          }
+        }
+      }
+      result
+    }).groupByKey().flatMap(x => {
+      val distinctValues = new HashSet[(Int, String)]()
+      for (value <- x._2) {
+        distinctValues.add(x._1, value)
+      }
+      distinctValues
+    })
+    val dictionaryValues = dictionaryRdd.map(x => x._1 + "," + x._2).collect()
+    saveToFile(dictionaryValues, outputPath)
+  }
+
+  def cleanDictionary(outputPath: String): Unit = {
+    try {
+      val fileType = FileFactory.getFileType(outputPath)
+      val file = FileFactory.getCarbonFile(outputPath, fileType)
+      if (file.exists()) {
+        file.delete()
+      }
+    } catch {
+      case ex: Exception =>
+        logError("Clean dictionary catching exception:" + ex)
+    }
+  }
+
+  def saveToFile(contents: Array[String], outputPath: String): Unit = {
+    var writer: DataOutputStream = null
+    try {
+      val fileType = FileFactory.getFileType(outputPath)
+      val file = FileFactory.getCarbonFile(outputPath, fileType)
+      if (!file.exists()) {
+        file.createNewFile()
+      }
+      writer = FileFactory.getDataOutputStream(outputPath, fileType)
+      for (content <- contents) {
+        writer.writeBytes(content + "\n")
+      }
+    } catch {
+      case ex: Exception =>
+        logError("Save dictionary to file catching exception:" + ex)
+    } finally {
+      if (writer != null) {
+        try {
+          writer.close()
+        } catch {
+          case ex: Exception =>
+            logError("Close output stream catching exception:" + ex)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
index 098c863..531fa0a 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoadModel.java
@@ -89,6 +89,8 @@ public class CarbonLoadModel implements Serializable {
    */
   private String segmentId;
 
+  private String allDictPath;
+
   /**
    * escape Char
    */
@@ -165,6 +167,14 @@ public class CarbonLoadModel implements Serializable {
     this.isDirectLoad = isDirectLoad;
   }
 
+  public String getAllDictPath() {
+    return allDictPath;
+  }
+
+  public void setAllDictPath(String allDictPath) {
+    this.allDictPath = allDictPath;
+  }
+
   public List<String> getFactFilesToProcess() {
     return factFilesToProcess;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 2533321..7de452a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -953,7 +953,8 @@ class CarbonSqlParser()
     val options = partionDataOptions.get.groupBy(x => x._1)
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
-      "SERIALIZATION_NULL_FORMAT"
+      "SERIALIZATION_NULL_FORMAT",
+      "ALL_DICTIONARY_PATH"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 82e9921..3a0e21c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -1534,6 +1534,7 @@ private[sql] case class LoadCube(
       val escapeChar = partionValues.getOrElse("escapechar", "\\")
       val columnDict = partionValues.getOrElse("columndict", null)
       val serializationNullFormat = partionValues.getOrElse("serialization_null_format",
"\\N")
+      val allDictionaryPath = partionValues.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = partionValues.getOrElse("complex_delimiter_level_1",
"\\$")
       val complex_delimiter_level_2 = partionValues.getOrElse("complex_delimiter_level_2",
"\\:")
       val multiLine = partionValues.getOrElse("multiline", "false").trim.toLowerCase match
{
@@ -1559,6 +1560,8 @@ private[sql] case class LoadCube(
         carbonLoadModel.setComplexDelimiterLevel2(
           CarbonUtil.escapeComplexDelimiterChar(complex_delimiter_level_2))
       }
+      // set local dictionary path, and dictionary file extension
+      carbonLoadModel.setAllDictPath(allDictionaryPath)
 
       var partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       try {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index f229c8e..9abcceb 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -161,6 +161,69 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
 case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
 
 /**
+ * A RDD to combine all dictionary distinct values.
+ *
+ * @constructor create a RDD with RDD[(String, Iterable[String])]
+ * @param prev the input RDD[(String, Iterable[String])]
+ * @param model a model package load info
+ */
+class CarbonAllDictionaryCombineRDD(
+                                       prev: RDD[(String, Iterable[String])],
+                                       model: DictionaryLoadModel)
+  extends RDD[(Int, ColumnDistinctValues)](prev) with Logging {
+
+  override def getPartitions: Array[Partition] =
+    firstParent[(String, Iterable[String])].partitions
+
+  override def compute(split: Partition, context: TaskContext
+                      ): Iterator[(Int, ColumnDistinctValues)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass().getName())
+
+    val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
+    /*
+     * for all dictionary, all columns need to encoding and checking
+     * isHighCardinalityColumn, so no need to calculate rowcount
+     */
+    val rowCount = 0L
+    try {
+      val dimensionParsers =
+        GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
+      val dimNum = model.dimensions.length
+      // Map[dimColName -> dimColNameIndex]
+      val columnIndexMap = new HashMap[String, Int]()
+      for (j <- 0 until dimNum) {
+        columnIndexMap.put(model.dimensions(j).getColName, j)
+      }
+
+      var row: (String, Iterable[String]) = null
+      val rddIter = firstParent[(String, Iterable[String])].iterator(split, context)
+      // generate block distinct value set
+      while (rddIter.hasNext) {
+        row = rddIter.next()
+        if (row != null) {
+          columnIndexMap.get(row._1) match {
+            case Some(index) =>
+              for (record <- row._2) {
+                dimensionParsers(index).parseString(record)
+              }
+            case None =>
+          }
+        }
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex)
+        throw ex
+    }
+
+    distinctValuesList.map { iter =>
+      val valueList = iter._2.toArray
+      (iter._1, ColumnDistinctValues(valueList, rowCount))
+    }.iterator
+  }
+}
+
+/**
  * A RDD to combine distinct values in block.
  *
  * @constructor create a RDD with RDD[Row]
@@ -181,23 +244,9 @@ class CarbonBlockDistinctValuesCombineRDD(
     val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
     var rowCount = 0L
     try {
-      // local combine set
+      val dimensionParsers =
+        GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
       val dimNum = model.dimensions.length
-      val primDimNum = model.primDimensions.length
-      val columnValues = new Array[HashSet[String]](primDimNum)
-      val mapColumnValuesWithId = new HashMap[String, HashSet[String]]
-      for (i <- 0 until primDimNum) {
-        columnValues(i) = new HashSet[String]
-        distinctValuesList += ((i, columnValues(i)))
-        mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i))
-      }
-      val dimensionParsers = new Array[GenericParser](dimNum)
-      for (j <- 0 until dimNum) {
-        dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension(
-          Some(model.dimensions(j)),
-          GlobalDictionaryUtil.createDataFormat(model.delimiters),
-          mapColumnValuesWithId).get
-      }
       var row: Row = null
       val rddIter = firstParent[Row].iterator(split, context)
       // generate block distinct value set
@@ -216,6 +265,7 @@ class CarbonBlockDistinctValuesCombineRDD(
         LOGGER.error(ex)
         throw ex
     }
+
     distinctValuesList.map { iter =>
       val valueList = iter._2.toArray
       (iter._1, ColumnDistinctValues(valueList, rowCount))
@@ -373,7 +423,6 @@ class CarbonGlobalDictionaryGenerateRDD(
     iter
   }
 }
-
 /**
  * Set column dictionry patition format
  *
@@ -444,7 +493,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
         } catch {
           case ex: Exception =>
             logError(s"Error in closing csvReader of " +
-            s"pre-defined dictionary file:${ex.getMessage}")
+              s"pre-defined dictionary file:${ex.getMessage}")
         }
       }
       if (inputStream != null) {
@@ -453,7 +502,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
         } catch {
           case ex: Exception =>
             logError(s"Error in closing inputStream of " +
-            s"pre-defined dictionary file:${ex.getMessage}")
+              s"pre-defined dictionary file:${ex.getMessage}")
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 49d3d5b..25093f8 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -17,7 +17,7 @@
 
 package org.carbondata.spark.util
 
-import java.io.IOException
+import java.io.{File, IOException}
 import java.nio.charset.Charset
 import java.util.regex.Pattern
 
@@ -29,7 +29,7 @@ import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.commons.lang3.{ArrayUtils, StringUtils}
 import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation, DataFrame}
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
 import org.apache.spark.sql.SQLContext
@@ -44,6 +44,7 @@ import org.carbondata.core.carbon.metadata.encoder.Encoding
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.core.carbon.path.CarbonStorePath
 import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.datastorage.store.impl.FileFactory
 import org.carbondata.core.reader.CarbonDictionaryReader
 import org.carbondata.core.util.CarbonProperties
 import org.carbondata.core.writer.CarbonDictionaryWriter
@@ -52,8 +53,7 @@ import org.carbondata.processing.etl.DataLoadingException
 import org.carbondata.spark.load.CarbonLoaderUtil
 import org.carbondata.spark.load.CarbonLoadModel
 import org.carbondata.spark.partition.reader.CSVWriter
-import org.carbondata.spark.rdd._
-import org.carbondata.spark.rdd.{ArrayParser, CarbonBlockDistinctValuesCombineRDD, CarbonDataRDDFactory,
CarbonGlobalDictionaryGenerateRDD, ColumnPartitioner, DataFormat, DictionaryLoadModel, GenericParser,
PrimitiveParser, StructParser}
+import org.carbondata.spark.rdd.{ArrayParser, CarbonAllDictionaryCombineRDD, CarbonBlockDistinctValuesCombineRDD,
CarbonColumnDictGenerateRDD, CarbonDataRDDFactory, CarbonGlobalDictionaryGenerateRDD, ColumnPartitioner,
DataFormat, DictionaryLoadModel, GenericParser, PrimitiveParser, StructParser}
 import org.carbondata.spark.CarbonSparkFactory
 
 /**
@@ -91,7 +91,8 @@ object GlobalDictionaryUtil extends Logging {
 
   /**
    * use this method to judge whether CarbonDimension use some encoding or not
-   * @param dimension   carbonDimension
+    *
+    * @param dimension   carbonDimension
    * @param encoding   the coding way of dimension
    * @param excludeEncoding  the coding way to exclude
    */
@@ -460,7 +461,8 @@ object GlobalDictionaryUtil extends Logging {
 
   /**
    * set pre defined dictionary for dimension
-   * @param dimensions  all the dimensions
+    *
+    * @param dimensions  all the dimensions
    * @param table   carbon table identifier
    * @param colName  user specified  column name for predefined dict
    * @param colDictPath  column dictionary file path
@@ -540,6 +542,111 @@ object GlobalDictionaryUtil extends Logging {
     checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
   }
 
+  /* generate Dimension Parsers
+   *
+   * @param model
+   * @param distinctValuesList
+   * @return dimensionParsers
+   */
+  def createDimensionParsers(model: DictionaryLoadModel,
+      distinctValuesList: ArrayBuffer[(Int, HashSet[String])]): Array[GenericParser] = {
+    // local combine set
+    val dimNum = model.dimensions.length
+    val primDimNum = model.primDimensions.length
+    val columnValues = new Array[HashSet[String]](primDimNum)
+    val mapColumnValuesWithId = new HashMap[String, HashSet[String]]
+    for (i <- 0 until primDimNum) {
+      columnValues(i) = new HashSet[String]
+      distinctValuesList += ((i, columnValues(i)))
+      mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i))
+    }
+    val dimensionParsers = new Array[GenericParser](dimNum)
+    for (j <- 0 until dimNum) {
+      dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension(
+        Some(model.dimensions(j)),
+        GlobalDictionaryUtil.createDataFormat(model.delimiters),
+        mapColumnValuesWithId).get
+    }
+    dimensionParsers
+  }
+
+  /**
+   * read local dictionary and prune column
+   *
+   * @param sqlContext
+   * @param csvFileColumns
+   * @param requireColumns
+   * @param allDictionaryPath
+   * @return allDictionaryRdd
+   */
+  private def readAllDictionaryFiles(sqlContext: SQLContext,
+                                       csvFileColumns: Array[String],
+                                       requireColumns: Array[String],
+                                     allDictionaryPath: String) = {
+    var allDictionaryRdd: RDD[(String, Iterable[String])] = null
+    try {
+      // read local dictionary file, and spilt (columnIndex, columnValue)
+      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
+        .map(x => {
+        val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
+        var index: Int = 0
+        var value: String = ""
+        try {
+          index = tokens(0).toInt
+          value = tokens(1)
+        } catch {
+          case ex: Exception =>
+            logError("read a bad dictionary record" + x)
+        }
+        (index, value)
+      })
+      // group by column index, and filter required columns
+      val requireColumnsList = requireColumns.toList
+      allDictionaryRdd = basicRdd
+        .groupByKey()
+        .map(x => (csvFileColumns(x._1), x._2))
+        .filter(x => requireColumnsList.contains(x._1))
+    } catch {
+      case ex: Exception =>
+        logError("read local dictionary files failed")
+        throw ex
+    }
+
+    allDictionaryRdd
+  }
+
+  /**
+   * validate local dictionary files
+   *
+   * @param allDictionaryPath
+   * @return (isNonempty, isDirectory)
+   */
+  private def validateAllDictionaryPath(allDictionaryPath: String): Boolean = {
+    val fileType = FileFactory.getFileType(allDictionaryPath)
+    val filePath = FileFactory.getCarbonFile(allDictionaryPath, fileType)
+    // filepath regex, look like "/path/*.dictionary"
+    if (filePath.getName.startsWith("*")) {
+      val dictExt = filePath.getName.substring(1)
+      val listFiles = filePath.getParentFile.listFiles()
+      if (listFiles.exists(file =>
+        file.getName.endsWith(dictExt) && file.getSize > 0)) {
+        true
+      } else {
+        logInfo("No dictionary files found or empty dictionary files! " +
+          "Won't generate new dictionary.")
+        false
+      }
+    } else {
+      if (filePath.exists() && filePath.getSize > 0) {
+        true
+      } else {
+        logInfo("No dictionary files found or empty dictionary files! " +
+          "Won't generate new dictionary.")
+        false
+      }
+    }
+  }
+
   /**
    * generate global dictionary with SQLContext and CarbonLoadModel
    *
@@ -547,73 +654,115 @@ object GlobalDictionaryUtil extends Logging {
    * @param carbonLoadModel  carbon load model
    */
   def generateGlobalDictionary(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      hdfsLocation: String): Unit = {
+                               carbonLoadModel: CarbonLoadModel,
+                               hdfsLocation: String): Unit = {
     try {
       val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier
       // create dictionary folder if not exists
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
       val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
-      // load data by using dataSource com.databricks.spark.csv
-      var df = loadDataFrame(sqlContext, carbonLoadModel)
       // columns which need to generate global dictionary file
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val dimensions = carbonTable.getDimensionByTableName(
         carbonTable.getFactTableName).asScala.toArray
-      val headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-        df.columns
-      }
-      else {
-        carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR)
-      }
       // generate global dict from pre defined column dict file
       carbonLoadModel.initPredefDictMap()
-      val colDictFilePath = carbonLoadModel.getColDictFilePath
-      if (colDictFilePath != null) {
-        // generate predefined dictionary
-        generatePredefinedColDictionary(colDictFilePath, table,
-          dimensions, carbonLoadModel, sqlContext, hdfsLocation, dictfolderPath)
-      }
-      // use fact file to generate global dict
-      val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
-        headers, df.columns)
-      if (requireDimension.nonEmpty) {
-        // select column to push down pruning
-        df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
-        val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
-          hdfsLocation, dictfolderPath, false)
-        // combine distinct value in a block and partition by column
-        val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
-          .partitionBy(new ColumnPartitioner(model.primDimensions.length))
-        // generate global dictionary files
-        val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
-        // check result status
-        checkStatus(carbonLoadModel, sqlContext, model, statusList)
+
+      val allDictionaryPath = carbonLoadModel.getAllDictPath
+      if(StringUtils.isEmpty(allDictionaryPath)) {
+        logInfo("Generate global dictionary from source data files!")
+        // load data by using dataSource com.databricks.spark.csv
+        var df = loadDataFrame(sqlContext, carbonLoadModel)
+        val headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+          df.columns
+        }
+        else {
+          carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR)
+        }
+        val colDictFilePath = carbonLoadModel.getColDictFilePath
+        if (colDictFilePath != null) {
+          // generate predefined dictionary
+          generatePredefinedColDictionary(colDictFilePath, table,
+            dimensions, carbonLoadModel, sqlContext, hdfsLocation, dictfolderPath)
+        }
+        // use fact file to generate global dict
+        val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
+          headers, df.columns)
+        if (requireDimension.nonEmpty) {
+          // select column to push down pruning
+          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
+          val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
+            hdfsLocation, dictfolderPath, false)
+          // combine distinct value in a block and partition by column
+          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
+            .partitionBy(new ColumnPartitioner(model.primDimensions.length))
+          // generate global dictionary files
+          val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+          // check result status
+          checkStatus(carbonLoadModel, sqlContext, model, statusList)
+        } else {
+          logInfo("have no column need to generate global dictionary in Fact file")
+        }
+        // generate global dict from dimension file
+        if (carbonLoadModel.getDimFolderPath != null) {
+          val fileMapArray = carbonLoadModel.getDimFolderPath.split(",")
+          for (fileMap <- fileMapArray) {
+            val dimTableName = fileMap.split(":")(0)
+            var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel)
+            val (requireDimensionForDim, requireColumnNamesForDim) =
+              pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns)
+            if (requireDimensionForDim.length >= 1) {
+              dimDataframe = dimDataframe.select(requireColumnNamesForDim.head,
+                requireColumnNamesForDim.tail: _*)
+              val modelforDim = createDictionaryLoadModel(carbonLoadModel, table,
+                requireDimensionForDim, hdfsLocation, dictfolderPath, false)
+              val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD(
+                dimDataframe.rdd, modelforDim)
+                .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length))
+              val statusListforDim = new CarbonGlobalDictionaryGenerateRDD(
+                inputRDDforDim, modelforDim).collect()
+              checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim)
+            } else {
+              logInfo(s"No columns in dimension table $dimTableName to generate global dictionary")
+            }
+          }
+        }
       } else {
-        logInfo("have no column need to generate global dictionary in Fact file")
-      }
-      // generate global dict from dimension file
-      if (carbonLoadModel.getDimFolderPath != null) {
-        val fileMapArray = carbonLoadModel.getDimFolderPath.split(",")
-        for (fileMap <- fileMapArray) {
-          val dimTableName = fileMap.split(":")(0)
-          var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel)
-          val (requireDimensionForDim, requireColumnNamesForDim) =
-            pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns)
-          if (requireDimensionForDim.length >= 1) {
-            dimDataframe = dimDataframe.select(requireColumnNamesForDim.head,
-              requireColumnNamesForDim.tail: _*)
-            val modelforDim = createDictionaryLoadModel(carbonLoadModel, table,
-              requireDimensionForDim, hdfsLocation, dictfolderPath, false)
-            val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD(
-              dimDataframe.rdd, modelforDim)
-              .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length))
-            val statusListforDim = new CarbonGlobalDictionaryGenerateRDD(
-              inputRDDforDim, modelforDim).collect()
-            checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim)
+        logInfo("Generate global dictionary from all dictionary files!")
+        val isNonempty = validateAllDictionaryPath(allDictionaryPath)
+        if(isNonempty) {
+          // fill the map[columnIndex -> columnName]
+          var fileHeaders : Array[String] = null
+          if(!StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+            val splitColumns = carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR)
+            val fileHeadersArr = new ArrayBuffer[String]()
+            for(i <- 0 until splitColumns.length) {
+              fileHeadersArr += splitColumns(i).trim.toLowerCase()
+            }
+            fileHeaders = fileHeadersArr.toArray
+          } else {
+            logError("Not found file header! Please set fileheader")
+            throw new IOException("Failed to get file header")
+          }
+          // prune columns according to the CSV file header, dimension columns
+          val (requireDimension, requireColumnNames) =
+            pruneDimensions(dimensions, fileHeaders, fileHeaders)
+          if (requireDimension.nonEmpty) {
+            val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
+              hdfsLocation, dictfolderPath, false)
+            // read local dictionary file, and group by key
+            val allDictionaryRdd = readAllDictionaryFiles(sqlContext, fileHeaders,
+              requireColumnNames, allDictionaryPath)
+            // read exist dictionary and combine
+            val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
+              .partitionBy(new ColumnPartitioner(model.primDimensions.length))
+            // generate global dictionary files
+            val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
+            // check result status
+            checkStatus(carbonLoadModel, sqlContext, model, statusList)
           } else {
-            logInfo(s"No columns in dimension table $dimTableName to generate global dictionary")
+            logInfo("have no column need to generate global dictionary")
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/test/resources/alldictionary/complex/20160423/1400_1405/complex.dictionary
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/alldictionary/complex/20160423/1400_1405/complex.dictionary
b/integration/spark/src/test/resources/alldictionary/complex/20160423/1400_1405/complex.dictionary
new file mode 100644
index 0000000..223cabd
--- /dev/null
+++ b/integration/spark/src/test/resources/alldictionary/complex/20160423/1400_1405/complex.dictionary
@@ -0,0 +1,20 @@
+0,100080
+1,2355
+2,1ROM size
+3,29-11-2015
+4,1AA100080$2BB100080
+5,MAC283$MAC284$MAC285
+6,1:Chinese:Guangdong Province:shenzhen:longgang:matishan$1:India:Guangdong Province:shenzhen:longgang:matishan
+7,02-03-2016$02-03-2016:02-03-2016
+8,2355
+9,954
+0,100081
+1,1650
+2,6ROM size
+3,29-11-2015
+4,1AA100081$2BB100081
+5,MAC286$MAC287$MAC288
+6,1:Chinese:Guangdong Province:shenzhen:longgang:matishan$1:India:Guangdong Province:shenzhen:longgang:matishan
+7,03-03-2016$03-03-2016:03-03-2016
+8,1650
+9,613
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/test/resources/alldictionary/sample/20160423/1400_1405/sample.dictionary
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/alldictionary/sample/20160423/1400_1405/sample.dictionary
b/integration/spark/src/test/resources/alldictionary/sample/20160423/1400_1405/sample.dictionary
new file mode 100644
index 0000000..592e634
--- /dev/null
+++ b/integration/spark/src/test/resources/alldictionary/sample/20160423/1400_1405/sample.dictionary
@@ -0,0 +1,9 @@
+0,1
+1,david
+2,shenzhen
+0,2
+1,eason
+0,3
+1,jarry
+2,wuhan
+2,Bangalore
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/8c839545/integration/spark/src/test/scala/org/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/util/AllDictionaryTestCase.scala
b/integration/spark/src/test/scala/org/carbondata/spark/util/AllDictionaryTestCase.scala
new file mode 100644
index 0000000..756a35a
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.spark.util
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.CarbonHiveContext.sql
+import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
+import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
+import org.carbondata.core.carbon.CarbonDataLoadSchema
+import org.carbondata.spark.load.CarbonLoadModel
+import org.scalatest.BeforeAndAfterAll
+
+/**
+  * Test Case for org.carbondata.integration.spark.util.GlobalDictionaryUtil
+  *
+  * @date: Apr 10, 2016 10:34:58 PM
+  * @See org.carbondata.integration.spark.util.GlobalDictionaryUtil
+  */
+class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
+
+  var pwd: String = _
+  var sampleRelation: CarbonRelation = _
+  var complexRelation: CarbonRelation = _
+  var sampleAllDictionaryFile: String = _
+  var complexAllDictionaryFile: String = _
+
+  def buildCarbonLoadModel(relation: CarbonRelation,
+    filePath: String,
+    dimensionFilePath: String,
+    header: String,
+    allDictFilePath: String): CarbonLoadModel = {
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setTableName(relation.cubeMeta.carbonTableIdentifier.getDatabaseName)
+    carbonLoadModel.setDatabaseName(relation.cubeMeta.carbonTableIdentifier.getTableName)
+    val table = relation.cubeMeta.carbonTable
+    val carbonSchema = new CarbonDataLoadSchema(table)
+    carbonLoadModel.setDatabaseName(table.getDatabaseName)
+    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
+    carbonLoadModel.setFactFilePath(filePath)
+    carbonLoadModel.setDimFolderPath(dimensionFilePath)
+    carbonLoadModel.setCsvHeader(header)
+    carbonLoadModel.setCsvDelimiter(",")
+    carbonLoadModel.setComplexDelimiterLevel1("\\$")
+    carbonLoadModel.setComplexDelimiterLevel2("\\:")
+    carbonLoadModel.setAllDictPath(allDictFilePath)
+    carbonLoadModel
+  }
+
+  override def beforeAll {
+    sql("drop table if exists sample")
+    sql("drop table if exists complextypes")
+    buildTestData
+    // second time comment this line
+    buildTable
+    buildRelation
+  }
+
+  def buildTestData() = {
+    pwd = new File(this.getClass.getResource("/").getPath + "/../../").getCanonicalPath
+    sampleAllDictionaryFile = pwd + "/src/test/resources/alldictionary/sample/20160423/1400_1405/*.dictionary"
+    complexAllDictionaryFile = pwd + "/src/test/resources/alldictionary/complex/20160423/1400_1405/*.dictionary"
+  }
+
+  def buildTable() = {
+    try {
+      sql(
+        "CREATE TABLE IF NOT EXISTS sample (id STRING, name STRING, city STRING, " +
+          "age INT) STORED BY 'org.apache.carbondata.format'"
+      )
+    } catch {
+      case ex: Throwable => logError(ex.getMessage + "\r\n" + ex.getStackTraceString)
+    }
+    try {
+      sql(
+        "create table complextypes (deviceInformationId string, channelsId string, " +
+          "ROMSize string, purchasedate string, mobile struct<imei: string, imsi: string>,
MAC " +
+          "array<string>, locationinfo array<struct<ActiveAreaId: INT, ActiveCountry:
string, " +
+          "ActiveProvince: string, Activecity: string, ActiveDistrict: string, ActiveStreet:
" +
+          "string>>, proddate struct<productionDate: string,activeDeactivedate:
array<string>>, " +
+          "gamePointId INT,contractNumber INT) STORED BY 'org.apache.carbondata.format'"
+
+          "TBLPROPERTIES('DICTIONARY_EXCLUDE'='ROMSize')"
+      )
+    } catch {
+      case ex: Throwable => logError(ex.getMessage + "\r\n" + ex.getStackTraceString)
+    }
+  }
+
+  def buildRelation() = {
+    val catalog = CarbonEnv.getInstance(CarbonHiveContext).carbonCatalog
+    sampleRelation = catalog.lookupRelation1(Option("default"), "sample", None)(CarbonHiveContext).asInstanceOf[CarbonRelation]
+    complexRelation = catalog.lookupRelation1(Option("default"), "complextypes", None)(CarbonHiveContext).asInstanceOf[CarbonRelation]
+  }
+
+  test("Support generate global dictionary from all dictionary files") {
+    val header = "id,name,city,age"
+    val carbonLoadModel = buildCarbonLoadModel(sampleRelation, null, null, header, sampleAllDictionaryFile)
+    GlobalDictionaryUtil
+      .generateGlobalDictionary(CarbonHiveContext,
+        carbonLoadModel,
+        sampleRelation.cubeMeta.storePath)
+
+    DictionaryTestCaseUtil.
+      checkDictionary(sampleRelation, "city", "shenzhen")
+  }
+
+  test("Support generate global dictionary from all dictionary files for complex type") {
+    val header = "deviceInformationId,channelsId,ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber"
+    val carbonLoadModel = buildCarbonLoadModel(complexRelation, null, null, header, complexAllDictionaryFile)
+    GlobalDictionaryUtil
+      .generateGlobalDictionary(CarbonHiveContext,
+      carbonLoadModel,
+      complexRelation.cubeMeta.storePath)
+
+    DictionaryTestCaseUtil.
+      checkDictionary(complexRelation, "channelsId", "1650")
+  }
+  
+  override def afterAll {
+    sql("drop table sample")
+    sql("drop table complextypes")
+  }
+}



Mime
View raw message