carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [22/50] [abbrv] incubator-carbondata git commit: [Bug] Direct Dictionary Complex primitive timestamp type support in data loading and query (#823)
Date Wed, 20 Jul 2016 10:13:50 GMT
[Bug] Direct Dictionary Complex primitive timestamp type support in data loading and query
(#823)

* Complex type support for timestamp primitives in data loading

* Query Flow Updated to support complex primitive timestamp types

* Fixed Style Issues and added testcode


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f4c3d10e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f4c3d10e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f4c3d10e

Branch: refs/heads/master
Commit: f4c3d10e6be703ffd5fea9d77b376c18aaaceabe
Parents: a45dc4f
Author: nareshpr <prnaresh.naresh@gmail.com>
Authored: Sun Jul 17 10:26:05 2016 +0530
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Sun Jul 17 10:26:05 2016 +0530

----------------------------------------------------------------------
 .../query/carbon/executor/util/QueryUtil.java   |  9 ++-
 .../complex/querytypes/PrimitiveQueryType.java  | 32 ++++++-----
 .../src/test/resources/datasamplecomplex.csv    |  2 +
 ...plexPrimitiveTimestampDirectDictionary.scala | 60 ++++++++++++++++++++
 .../processing/datatypes/PrimitiveDataType.java |  2 +-
 .../CarbonCSVBasedDimSurrogateKeyGen.java       | 39 +++++++++++++
 .../FileStoreSurrogateKeyGenForCSV.java         |  4 ++
 .../processing/util/CarbonSchemaParser.java     | 16 +++++-
 8 files changed, 144 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
index 9de0ca1..b88da23 100644
--- a/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/carbondata/query/carbon/executor/util/QueryUtil.java
@@ -375,10 +375,11 @@ public class QueryUtil {
   private static void getChildDimensionDictionaryDetail(CarbonDimension queryDimensions,
       Set<String> dictionaryDimensionFromQuery) {
     for (int j = 0; j < queryDimensions.numberOfChild(); j++) {
+      List<Encoding> encodingList = queryDimensions.getListOfChildDimensions().get(j).getEncoder();
       if (queryDimensions.getListOfChildDimensions().get(j).numberOfChild() > 0) {
         getChildDimensionDictionaryDetail(queryDimensions.getListOfChildDimensions().get(j),
             dictionaryDimensionFromQuery);
-      } else {
+      } else if(!CarbonUtil.hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {
         dictionaryDimensionFromQuery
             .add(queryDimensions.getListOfChildDimensions().get(j).getColumnId());
       }
@@ -1077,13 +1078,17 @@ public class QueryUtil {
                   dimension.getColName(), ++parentBlockIndex));
           break;
         default:
+          boolean isDirectDictionary = CarbonUtil.hasEncoding(
+              dimension.getListOfChildDimensions().get(i).getEncoder(),
+              Encoding.DIRECT_DICTIONARY);
           parentQueryType.addChildren(
               new PrimitiveQueryType(dimension.getListOfChildDimensions().get(i).getColName(),
                   dimension.getColName(), ++parentBlockIndex,
                   dimension.getListOfChildDimensions().get(i).getDataType(),
                   eachComplexColumnValueSize[dimension.getListOfChildDimensions().get(i)
                       .getComplexTypeOrdinal()], columnIdToDictionaryMap
-                  .get(dimension.getListOfChildDimensions().get(i).getColumnId())));
+                  .get(dimension.getListOfChildDimensions().get(i).getColumnId()),
+                  isDirectDictionary));
       }
       if (dimension.getListOfChildDimensions().get(i).getNumberOfChild() > 0) {
         parentBlockIndex = fillChildrenDetails(eachComplexColumnValueSize, columnIdToDictionaryMap,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/core/src/main/java/org/carbondata/query/complex/querytypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/complex/querytypes/PrimitiveQueryType.java
b/core/src/main/java/org/carbondata/query/complex/querytypes/PrimitiveQueryType.java
index b19c52e..9c618ba 100644
--- a/core/src/main/java/org/carbondata/query/complex/querytypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/carbondata/query/complex/querytypes/PrimitiveQueryType.java
@@ -27,6 +27,9 @@ import java.util.List;
 
 import org.carbondata.core.cache.dictionary.Dictionary;
 import org.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.carbondata.core.keygenerator.mdkey.Bits;
 import org.carbondata.query.carbon.processor.BlocksChunkHolder;
 import org.carbondata.query.carbon.util.DataTypeUtil;
 
@@ -53,9 +56,11 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
 
   private org.carbondata.core.carbon.metadata.datatype.DataType dataType;
 
+  private boolean isDirectDictionary;
+
   public PrimitiveQueryType(String name, String parentname, int blockIndex,
       org.carbondata.core.carbon.metadata.datatype.DataType dataType, int keySize,
-      Dictionary dictionary) {
+      Dictionary dictionary, boolean isDirectDictionary) {
     super(name, parentname, blockIndex);
     this.dataType = dataType;
     this.keySize = keySize;
@@ -63,6 +68,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     this.name = name;
     this.parentname = parentname;
     this.blockIndex = blockIndex;
+    this.isDirectDictionary = isDirectDictionary;
   }
 
   @Override public void addChildren(GenericQueryType children) {
@@ -159,9 +165,17 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
 
     byte[] data = new byte[keySize];
     surrogateData.get(data);
-    String dictionaryValueForKey =
-        dictionary.getDictionaryValueForKey(unsignedIntFromByteArray(data));
-    Object actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
+    Bits bit = new Bits(new int[]{keySize * 8});
+    int surrgateValue = (int)bit.getKeyArray(data)[0];
+    Object actualData = null;
+    if (isDirectDictionary) {
+      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataType);
+      actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
+    } else {
+      String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue);
+      actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
+    }
     if (null != actualData
         && this.dataType == org.carbondata.core.carbon.metadata.datatype.DataType.STRING)
{
       byte[] dataBytes = ((String) actualData).getBytes(Charset.defaultCharset());
@@ -169,14 +183,4 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     }
     return actualData;
   }
-
-  private int unsignedIntFromByteArray(byte[] bytes) {
-    int res = 0;
-    if (bytes == null) return res;
-
-    for (int i = 0; i < bytes.length; i++) {
-      res = (res * 10) + ((bytes[i] & 0xff));
-    }
-    return res;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/integration/spark/src/test/resources/datasamplecomplex.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datasamplecomplex.csv b/integration/spark/src/test/resources/datasamplecomplex.csv
new file mode 100644
index 0000000..48f82c1
--- /dev/null
+++ b/integration/spark/src/test/resources/datasamplecomplex.csv
@@ -0,0 +1,2 @@
+11,2016-03-14 08:30:00.000,2016-03-14 08:30:09.000$2016-03-14 15:00:09.000$2016-03-14 17:30:35.000,2016-03-14
08:30:09.000$2016-03-14 17:30:35.000,5040.56
+12,2016-04-14 08:30:00.000,2016-04-14 09:30:09.000$2016-04-14 15:30:09.000$2016-04-14 17:30:35.000,2016-04-14
08:30:09.000$2016-04-14 18:30:35.000,1040.56

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
new file mode 100644
index 0000000..43bf7ea
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/complexType/TestComplexPrimitiveTimestampDirectDictionary.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.integration.spark.testsuite.complexType
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.Row
+import org.carbondata.core.carbon.CarbonTableIdentifier
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test class of creating and loading for carbon table with double
+ *
+ */
+class TestComplexPrimitiveTimestampDirectDictionary extends QueryTest with BeforeAndAfterAll
{
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists complexcarbontimestamptable")
+    sql("drop table if exists complexhivetimestamptable")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss.SSS")
+    sql("CREATE TABLE complexcarbontimestamptable (empno string,workdate Timestamp,punchinout
array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary
double) STORED BY 'org.apache.carbondata.format'")
+    sql("LOAD DATA local inpath './src/test/resources/datasamplecomplex.csv' INTO TABLE complexcarbontimestamptable
OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"', 'FILEHEADER'='empno,workdate,punchinout,worktime,salary')");
+    sql("CREATE TABLE complexhivetimestamptable (empno string,workdate Timestamp,punchinout
array<Timestamp>, worktime struct<begintime:Timestamp, endtime:Timestamp>, salary
double)row format delimited fields terminated by ',' collection items terminated by '$'")
+    sql("LOAD DATA local inpath './src/test/resources/datasamplecomplex.csv' INTO TABLE complexhivetimestamptable")
+  }
+
+  test("select * query") {
+     checkAnswer(sql("select * from complexcarbontimestamptable"),
+     sql("select * from complexhivetimestamptable"))
+  }
+  
+  override def afterAll {
+	  sql("drop table if exists complexcarbontimestamptable")
+    sql("drop table if exists complexhivetimestamptable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/processing/src/main/java/org/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/carbondata/processing/datatypes/PrimitiveDataType.java
index b3efbba..3d1da8c 100644
--- a/processing/src/main/java/org/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -156,7 +156,7 @@ public class PrimitiveDataType implements GenericDataType {
       String[] delimiter, int delimiterIndex, DataOutputStream dataOutputStream,
       CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen) throws KettleException, IOException
{
     dataOutputStream.writeInt(surrogateKeyGen.generateSurrogateKeys(inputString, tableName
-        + CarbonCommonConstants.UNDERSCORE + name));
+        + CarbonCommonConstants.UNDERSCORE + name, this.getColumnId()));
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
index 70b6199..dac4b6c 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedDimSurrogateKeyGen.java
@@ -36,13 +36,17 @@ import org.carbondata.core.cache.dictionary.Dictionary;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.file.manager.composite.IFileManagerComposite;
 import org.carbondata.core.keygenerator.KeyGenException;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.carbondata.core.writer.HierarchyValueWriterForCSV;
 import org.carbondata.processing.datatypes.GenericDataType;
 import org.carbondata.processing.schema.metadata.ArrayWrapper;
+import org.carbondata.processing.schema.metadata.ColumnSchemaDetails;
 import org.carbondata.processing.schema.metadata.ColumnsInfo;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
 import org.pentaho.di.core.exception.KettleException;
 
 public abstract class CarbonCSVBasedDimSurrogateKeyGen {
@@ -146,6 +150,29 @@ public abstract class CarbonCSVBasedDimSurrogateKeyGen {
     return key;
   }
 
+  /**
+   * @param tuple         The string value whose surrogate key will be gennerated.
+   * @param tabColumnName The K of dictionaryCaches Map, for example "tablename_columnname"
+   */
+  public Integer generateSurrogateKeys(String tuple, String tabColumnName, String columnId)
+      throws KettleException {
+    Integer key = null;
+    Dictionary dicCache = dictionaryCaches.get(tabColumnName);
+    if (null == dicCache) {
+      ColumnSchemaDetails columnSchemaDetails =
+          this.columnsInfo.getColumnSchemaDetailsWrapper().get(columnId);
+      if (columnSchemaDetails.isDirectDictionary()) {
+        DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(columnSchemaDetails.getColumnType());
+        key = directDictionaryGenerator.generateDirectSurrogateKey(tuple);
+      }
+    } else {
+      key = dicCache.getSurrogateKey(tuple);
+    }
+    return key;
+  }
+
+
   public Integer generateSurrogateKeysForTimeDims(String tuple, String columnName, int index,
       Object[] props) throws KettleException {
     Integer key = null;
@@ -312,6 +339,7 @@ public abstract class CarbonCSVBasedDimSurrogateKeyGen {
   private void setDimensionTables(String[] dimeFileNames) {
     int noOfPrimitiveDims = 0;
     List<String> dimFilesForPrimitives = new ArrayList<String>();
+    List<Boolean> isDirectDictionary = new ArrayList<Boolean>();
     dictionaryCaches = new ConcurrentHashMap<String, Dictionary>();
     for (int i = 0; i < dimeFileNames.length; i++) {
       GenericDataType complexType = columnsInfo.getComplexTypesMap()
@@ -325,13 +353,24 @@ public abstract class CarbonCSVBasedDimSurrogateKeyGen {
                   .getName());
           eachPrimitive.setSurrogateIndex(noOfPrimitiveDims);
           noOfPrimitiveDims++;
+          ColumnSchemaDetails columnSchemaDetails =
+              columnsInfo.getColumnSchemaDetailsWrapper().get(eachPrimitive.getColumnId());
+          if (columnSchemaDetails.isDirectDictionary()) {
+            isDirectDictionary.add(true);
+          }
         }
       } else {
         dimFilesForPrimitives.add(dimeFileNames[i]);
         noOfPrimitiveDims++;
+        isDirectDictionary.add(false);
       }
     }
     max = new int[noOfPrimitiveDims];
+    for(int i = 0; i < isDirectDictionary.size(); i++) {
+      if (isDirectDictionary.get(i)) {
+        max[i] = Integer.MAX_VALUE;
+      }
+    }
     this.dimsFiles = dimFilesForPrimitives.toArray(new String[dimFilesForPrimitives.size()]);
 
     createRespectiveDimFilesForDimTables();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
index 2af5c5e..a920d63 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
@@ -243,6 +243,10 @@ public class FileStoreSurrogateKeyGenForCSV extends CarbonCSVBasedDimSurrogateKe
         List<GenericDataType> primitiveChild = new ArrayList<GenericDataType>();
         complexType.getAllPrimitiveChildren(primitiveChild);
         for (GenericDataType eachPrimitive : primitiveChild) {
+          details = columnSchemaDetailsWrapper.get(eachPrimitive.getColumnId());
+          if (details.isDirectDictionary()) {
+            continue;
+          }
           ColumnIdentifier columnIdentifier = new ColumnIdentifier(eachPrimitive.getColumnId(),
               columnsInfo.getColumnProperties(eachPrimitive.getName()), details.getColumnType());
           String dimColumnName =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f4c3d10e/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
b/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
index 3a789f1..e0b1f1a 100644
--- a/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
+++ b/processing/src/main/java/org/carbondata/processing/util/CarbonSchemaParser.java
@@ -1195,14 +1195,24 @@ public final class CarbonSchemaParser {
     ColumnSchemaDetailsWrapper columnSchemaDetailsWrapper = new ColumnSchemaDetailsWrapper();
     Map<String, ColumnSchemaDetails> columnSchemaDetailsMap =
         new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    fillColumnSchemaDetailsWithComplex(dimensions, columnSchemaDetailsMap);
+    columnSchemaDetailsWrapper.setColumnSchemaDetailsMap(columnSchemaDetailsMap);
+    return columnSchemaDetailsWrapper;
+  }
+
+  private static void fillColumnSchemaDetailsWithComplex(
+      List<CarbonDimension> dimensions,
+      Map<String, ColumnSchemaDetails> columnSchemaDetailsMap) {
     for (CarbonDimension cDimension : dimensions) {
       ColumnSchemaDetails details =
           new ColumnSchemaDetails(cDimension.getColName(), cDimension.getDataType(),
-              CarbonUtil.hasEncoding(cDimension.getEncoder(), Encoding.DIRECT_DICTIONARY));
+          CarbonUtil.hasEncoding(cDimension.getEncoder(), Encoding.DIRECT_DICTIONARY));
       columnSchemaDetailsMap.put(cDimension.getColumnSchema().getColumnUniqueId(), details);
+      if (cDimension.isComplex()) {
+        fillColumnSchemaDetailsWithComplex(cDimension.getListOfChildDimensions(),
+            columnSchemaDetailsMap);
+      }
     }
-    columnSchemaDetailsWrapper.setColumnSchemaDetailsMap(columnSchemaDetailsMap);
-    return columnSchemaDetailsWrapper;
   }
 
   /**


Mime
View raw message