carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] incubator-carbondata git commit: Fixed broadcast filter join
Date Wed, 03 May 2017 09:19:29 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 6b01ed646 -> f2fdf2962


Fixed broadcast filter join

Fix testcase.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c0fd3091
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c0fd3091
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c0fd3091

Branch: refs/heads/master
Commit: c0fd3091c5c40f08d71006d5cec6351da33dd640
Parents: 6b01ed6
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Mon May 1 07:16:29 2017 +0530
Committer: jackylk <jacky.likun@huawei.com>
Committed: Wed May 3 17:18:29 2017 +0800

----------------------------------------------------------------------
 .../sql/CarbonDatasourceHadoopRelation.scala      |  1 +
 .../bucketing/TableBucketingTestCase.scala        |  4 ++++
 .../carbondata/query/SubQueryTestSuite.scala      | 18 ++++++++++++++++++
 3 files changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c0fd3091/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index ea3e40d..aab46af 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -75,4 +75,5 @@ case class CarbonDatasourceHadoopRelation(
     ", " + "Table name :" + carbonTable.getFactTableName + ", Schema :" + tableSchema + "
]"
   }
 
+  override def sizeInBytes: Long = carbonRelation.sizeInBytes
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c0fd3091/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 642b330..8d3eed7 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -30,10 +30,13 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
 
+  var threshold: Int = _
+
   override def beforeAll {
 
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    threshold = sqlContext.getConf("spark.sql.autoBroadcastJoinThreshold").toInt
     sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "-1")
     sql("DROP TABLE IF EXISTS t3")
     sql("DROP TABLE IF EXISTS t4")
@@ -212,5 +215,6 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll
{
     sql("DROP TABLE IF EXISTS t6")
     sql("DROP TABLE IF EXISTS t7")
     sql("DROP TABLE IF EXISTS t8")
+    sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", threshold.toString)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c0fd3091/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
index fbc859f..f6ad961 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/query/SubQueryTestSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.carbondata.query
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.scalatest.BeforeAndAfterAll
 
 class SubQueryTestSuite extends QueryTest with BeforeAndAfterAll {
@@ -38,6 +40,22 @@ class SubQueryTestSuite extends QueryTest with BeforeAndAfterAll {
       Seq(Row(2,"ghj",2.0), Row(3,"ghj",3.0)))
   }
 
+  test("test to check Broad cast filter works") {
+    sql("drop table if exists anothertable")
+    sql("create table anothertable(id int, name string, rating float) stored by 'carbondata'")
+    sql(s"load data local inpath '$tempDirPath/data1.csv' into table anothertable")
+
+    val executedPlan =
+      sql("select * from subquery t1, anothertable t2 where t1.id=t2.id").
+        queryExecution.executedPlan
+    var broadCastExists = false
+    executedPlan.collect {
+      case s: BroadcastHashJoinExec => broadCastExists = true
+    }
+    assert(broadCastExists, "Broad cast join does not exist on small table")
+    sql("drop table if exists anothertable")
+  }
+
   override def afterAll() {
     sql("drop table if exists subquery")
   }


Mime
View raw message