carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kunalkap...@apache.org
Subject [carbondata] branch master updated: [CARBONDATA-3452] dictionary include udf handle all the scenarios
Date Mon, 09 Sep 2019 06:37:42 GMT
This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f875b79  [CARBONDATA-3452] dictionary include udf handle all the scenarios
f875b79 is described below

commit f875b791f094b1a7ad690e53634d480d58dfb58b
Author: ajantha-bhat <ajanthabhat@gmail.com>
AuthorDate: Wed Aug 14 20:36:13 2019 +0530

    [CARBONDATA-3452] dictionary include udf handle all the scenarios
    
    Problem: select query failure when substring on dictionary column with join.
    Cause: when dictionary include is present, data type is updated to int from
    string in plan attribute. so substring was unresolved on int column.
    Join operation try to reference this attribute which is unresolved.
    Solution: Need to handle this for all the scenarios in CarbonLateDecodeRule
    
    This closes #3358
---
 .../hadoop/api/CarbonTableOutputFormat.java        |   5 +-
 .../spark/sql/optimizer/CarbonLateDecodeRule.scala | 141 ++++++++++++++-------
 .../carbondata/query/SubQueryJoinTestSuite.scala   |  19 +++
 .../processing/util/CarbonDataProcessorUtil.java   |   5 +-
 4 files changed, 120 insertions(+), 50 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index 9ba5e97..16703bf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -221,8 +222,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable,
Obje
       return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString);
     }
     return new String[] {
-        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext
-            .getTaskAttemptID().toString() };
+        System.getProperty("java.io.tmpdir") + "/" + UUID.randomUUID().toString().replace("-",
"")
+            + "_" + taskAttemptContext.getTaskAttemptID().toString() };
   }
 
   @Override
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 961bf11..99a8e70 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -619,6 +619,21 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
     }
   }
 
+  private def needDataTypeUpdate(exp: Expression): Boolean = {
+    var needChangeDatatype: Boolean = true
+    exp.transform {
+      case attr: AttributeReference => attr
+      case a@Alias(attr: AttributeReference, _) => a
+      case others =>
+        // datatype need to change for dictionary columns if only alias
+        // or attribute ref present.
+        // If anything else present, no need to change data type.
+        needChangeDatatype = false
+        others
+    }
+    needChangeDatatype
+  }
+
   private def updateTempDecoder(plan: LogicalPlan,
       aliasMapOriginal: CarbonAliasDecoderRelation,
       attrMap: java.util.HashMap[AttributeReferenceWrapper, CarbonDecoderRelation]):
@@ -650,44 +665,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
         cd
       case sort: Sort =>
         val sortExprs = sort.order.map { s =>
-          s.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-          }.asInstanceOf[SortOrder]
+          if (needDataTypeUpdate(s)) {
+            s.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }.asInstanceOf[SortOrder]
+          } else {
+            s
+          }
         }
         Sort(sortExprs, sort.global, sort.child)
       case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
         val aggExps = agg.aggregateExpressions.map { aggExp =>
-          aggExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(aggExp)) {
+            aggExp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            aggExp
           }
         }.asInstanceOf[Seq[NamedExpression]]
-
         val grpExps = agg.groupingExpressions.map { gexp =>
-          gexp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(gexp)) {
+            gexp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            gexp
           }
         }
         Aggregate(grpExps, aggExps, agg.child)
       case expand: Expand =>
-        val ex = expand.transformExpressions {
-          case attr: AttributeReference =>
-            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+        // can't use needDataTypeUpdate here as argument is of type Expand
+        var needChangeDatatype: Boolean = true
+        expand.transformExpressions {
+          case attr: AttributeReference => attr
+          case a@Alias(attr: AttributeReference, _) => a
+          case others =>
+            // datatype need to change for dictionary columns if only alias
+            // or attribute ref present.
+            // If anything else present, no need to change data type.
+            needChangeDatatype = false
+            others
         }
-        // Update the datatype of literal type as per the output type, otherwise codegen
fails.
-        val updatedProj = ex.projections.map { projs =>
-          projs.zipWithIndex.map { case(p, index) =>
-            p.transform {
-              case l: Literal
-                if l.dataType != ex.output(index).dataType &&
-                   !isComplexColumn(ex.output(index), ex.child.output) =>
-                Literal(l.value, ex.output(index).dataType)
+        if (needChangeDatatype) {
+          val ex = expand.transformExpressions {
+            case attr: AttributeReference =>
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          }
+          // Update the datatype of literal type as per the output type, otherwise codegen
fails.
+          val updatedProj = ex.projections.map { projs =>
+            projs.zipWithIndex.map { case (p, index) =>
+              p.transform {
+                case l: Literal
+                  if l.dataType != ex.output(index).dataType &&
+                     !isComplexColumn(ex.output(index), ex.child.output) =>
+                  Literal(l.value, ex.output(index).dataType)
+              }
             }
           }
+          Expand(updatedProj, ex.output, ex.child)
+        } else {
+          expand
         }
-        Expand(updatedProj, ex.output, ex.child)
       case filter: Filter =>
         filter
       case j: Join =>
@@ -698,18 +740,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
         u
       case p: Project if relations.nonEmpty =>
         val prExps = p.projectList.map { prExp =>
-          var needChangeDatatype = true
-          prExp.transform {
-            case attr: AttributeReference => attr
-            case a@Alias(attr: AttributeReference, _) => a
-            case others =>
-              // datatype need to change for dictionary columns if only alias
-              // or attribute ref present.
-              // If anything else present, no need to change data type.
-              needChangeDatatype = false
-              others
-          }
-          if (needChangeDatatype) {
+          if (needDataTypeUpdate(prExp)) {
             prExp.transform {
               case attr: AttributeReference =>
                 updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
@@ -721,27 +752,43 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper
{
         Project(prExps, p.child)
       case wd: Window if relations.nonEmpty =>
         val prExps = wd.output.map { prExp =>
-          prExp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(prExp)) {
+            prExp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            prExp
           }
         }.asInstanceOf[Seq[Attribute]]
         val wdExps = wd.windowExpressions.map { gexp =>
-          gexp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(gexp)) {
+            gexp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            gexp
           }
         }.asInstanceOf[Seq[NamedExpression]]
         val partitionSpec = wd.partitionSpec.map{ exp =>
-          exp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(exp)) {
+            exp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            exp
           }
         }
         val orderSpec = wd.orderSpec.map { exp =>
-          exp.transform {
-            case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+          if (needDataTypeUpdate(exp)) {
+            exp.transform {
+              case attr: AttributeReference =>
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
+          } else {
+            exp
           }
         }.asInstanceOf[Seq[SortOrder]]
         Window(wdExps, partitionSpec, orderSpec, wd.child)
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
index 635445a..4552b4f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryJoinTestSuite.scala
@@ -56,4 +56,23 @@ class SubQueryJoinTestSuite extends Spark2QueryTest with BeforeAndAfterAll
{
     sql("drop table t1")
     sql("drop table t2")
   }
+
+  test("test join with dictionary include with udf") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql(
+      "create table t1 (m_month smallint, hs_code string, country smallint, dollar_value
double, " +
+      "quantity double, unit smallint, b_country smallint, imex int, y_year smallint) stored
by " +
+      "'carbondata' tblproperties('dictionary_include'='m_month,hs_code,b_country,unit,y_year,"
+
+      "imex', 'sort_columns'='y_year,m_month,country,b_country,imex')")
+    sql(
+      "create table t2(id bigint, hs string, hs_cn string, hs_en string) stored by 'carbondata'
" +
+      "tblproperties ('dictionary_include'='id,hs,hs_cn,hs_en')")
+    checkAnswer(sql(
+      "select a.hs,count(*) tb from (select substring(hs_code,1,2) as hs,count(*) v2000 from
t1 " +
+      "group by substring(hs_code,1,2),y_year) a left join t2 h on (a.hs=h.hs) group by a.hs"),
+      Seq())
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 9ce96d4..588d4ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -94,8 +94,11 @@ public final class CarbonDataProcessorUtil {
       if (dir.exists()) {
         LOGGER.warn("dir already exists, skip dir creation: " + loc);
       } else {
-        if (!dir.mkdirs()) {
+        if (!dir.mkdirs() && !dir.exists()) {
+          // concurrent scenario mkdir may fail, so checking dir
           LOGGER.error("Error occurs while creating dir: " + loc);
+        } else {
+          LOGGER.info("Successfully created dir: " + loc);
         }
       }
     }


Mime
View raw message