carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject carbondata git commit: [CARBONDATA-2837] Added MVExample in example module
Date Mon, 03 Sep 2018 13:31:22 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 884ac915b -> d980d4cd8


[CARBONDATA-2837] Added MVExample in example module

Added MVExample in example module

This closes #2614


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d980d4cd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d980d4cd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d980d4cd

Branch: refs/heads/master
Commit: d980d4cd8d5aef5c4fbf06f73dc1ec300329c9f7
Parents: 884ac91
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Tue Aug 7 16:37:17 2018 +0530
Committer: chenliang613 <chenliang613@huawei.com>
Committed: Mon Sep 3 21:31:09 2018 +0800

----------------------------------------------------------------------
 examples/spark2/pom.xml                         |  10 +
 .../carbondata/examples/MVDataMapExample.scala  | 221 +++++++++++++++++++
 .../carbondata/examplesCI/RunExamples.scala     |   7 +
 3 files changed, 238 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d980d4cd/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 7a55333..9d1adde 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -204,5 +204,15 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>mv</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.carbondata</groupId>
+          <artifactId>carbondata-mv-core</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d980d4cd/examples/spark2/src/main/scala/org/apache/carbondata/examples/MVDataMapExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/MVDataMapExample.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/MVDataMapExample.scala
new file mode 100644
index 0000000..fbff90b
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/MVDataMapExample.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+import java.util.Random
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.examples.util.ExampleUtils
+
+/**
+ * This example is for pre-aggregate tables.
+ */
+
+object MVDataMapExample {
+
+  def main(args: Array[String]) {
+    val spark = ExampleUtils.createCarbonSession("MVDataMapExample")
+    exampleBody(spark)
+    performanceTest(spark)
+    spark.close()
+  }
+
+  def exampleBody(spark: SparkSession): Unit = {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv"
+
+    // 1. simple usage for Pre-aggregate tables creation and query
+    spark.sql("DROP TABLE IF EXISTS mainTable")
+    spark.sql("DROP TABLE IF EXISTS dimtable")
+    spark.sql(
+      """
+        | CREATE TABLE mainTable
+        | (id Int,
+        | name String,
+        | city String,
+        | age Int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    spark.sql(
+      """
+        | CREATE TABLE dimtable
+        | (name String,
+        | address String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    spark.sql(s"""LOAD DATA LOCAL INPATH '$testData' into table mainTable""")
+
+    spark.sql(s"""insert into dimtable select name, concat(city, ' street1') as address from
+           |mainTable group by name, address""".stripMargin)
+
+
+    // 1. create simple sub projection MV datamap
+
+    // sub projections to be hit
+    spark.sql(s"""create datamap simple_sub_projection using 'mv' as
+         | select id,name from mainTable"""
+        .stripMargin)
+    spark.sql(s"""rebuild datamap simple_sub_projection""")
+
+    // Check the physical plan which uses MV table simple_sub_projection_table instead of
mainTable
+    spark.sql(s"""select id from mainTable""").explain(true)
+
+    // Check the physical plan which uses MV table simple_sub_projection_table instead of
mainTable
+    spark.sql(s"""select sum(id) from mainTable""").explain(true)
+
+    // 2. aggregate functions to be hit
+    spark.sql(
+      s"""create datamap simple_agg using 'mv' as
+         | select id,sum(age) from mainTable group by id""".stripMargin)
+
+    spark.sql(s"""rebuild datamap simple_agg""")
+
+    // Check the physical plan which uses MV table simple_agg_table instead of mainTable
+    spark.sql(s"""select id,sum(age) from mainTable group by id""").explain(true)
+
+    // Check the physical plan of subquery which uses MV table simple_agg_table instead of
mainTable
+    spark.sql(s"""select sub.id from (select id ,sum(age) from mainTable group by id) sub
where sub
+           |.id = 4""".stripMargin).explain(true)
+
+
+
+    // 3.join with another table and aggregate functions to be hit
+    spark.sql(s"""create datamap simple_agg_with_join using 'mv' as
+         | select id,address, sum(age) from mainTable inner join dimtable on mainTable
+         | .name=dimtable.name group by id ,address""".stripMargin)
+    spark.sql(s"""rebuild datamap simple_agg_with_join""")
+
+    // Check the physical plan which uses MV table simple_agg_with_join_table instead of
+    // mainTable and dimtable.
+    spark.sql(s"""select id,address, sum(age) from mainTable inner join dimtable on mainTable
+           |.name=dimtable.name group by id ,address""".stripMargin).explain(true)
+
+    // Check the physical plan which uses MV table simple_agg_with_join_table instead of
+    // mainTable and dimtable.
+    spark.sql(s"""select id,address, sum(age) from mainTable inner join dimtable on mainTable
+                 |.name=dimtable.name where id =1 group by id ,address""".stripMargin).explain(true)
+
+    // Show datamaps
+    spark.sql("show datamap").show(false)
+
+    // Drop datamap
+    spark.sql("drop datamap if exists simple_agg_with_join")
+
+    spark.sql("DROP TABLE IF EXISTS mainTable")
+    spark.sql("DROP TABLE IF EXISTS dimtable")
+  }
+
+  private def performanceTest(spark: SparkSession): Unit = {
+    spark.sql("DROP TABLE IF EXISTS employee_salary")
+    spark.sql("DROP TABLE IF EXISTS employee_salary_without_mv")
+    spark.sql("DROP TABLE IF EXISTS emp_address")
+
+    createFactTable(spark, "employee_salary")
+    createFactTable(spark, "employee_salary_without_mv")
+
+    spark.sql(
+      """
+        | CREATE TABLE emp_address
+        | (name String,
+        | address String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    spark.sql(
+      s"""insert into emp_address select name, concat(city, ' street1') as address from
+         |employee_salary group by name, address""".stripMargin)
+
+    spark.sql(
+      s"""create datamap simple_agg_employee using 'mv' as
+         | select id,sum(salary) from employee_salary group by id""".stripMargin)
+    spark.sql(s"""rebuild datamap simple_agg_employee""")
+
+    // Test performance of aggregate queries with mv datamap
+    val timeWithOutMv = time(spark
+      .sql("select id, name, sum(salary) from employee_salary_without_mv group by id,name")
+      .collect())
+    val timeWithMv = time(spark
+      .sql("select id,name,sum(salary) from employee_salary group by id,name").collect())
+    // scalastyle:off
+    println("Time of table with MV is : " + timeWithMv + " time withoutmv : " + timeWithOutMv)
+    // scalastyle:on
+    val timeWithOutMvFilter = time(spark
+      .sql(
+        "select id, name, sum(salary) from employee_salary_without_mv where name='name10'
group " +
+        "by id,name")
+      .collect())
+    val timeWithMvFilter = time(spark
+      .sql("select id,name,sum(salary) from employee_salary where name='name10' group by
id,name")
+      .collect())
+    // scalastyle:off
+    println("Time of table with MV with filter is : " + timeWithMvFilter + " time withoutmv
: " +
+            timeWithOutMvFilter)
+    // scalastyle:on
+
+    // Tests performance of aggregate with join queries.
+    spark.sql(
+      s"""create datamap simple_join_agg_employee using 'mv' as
+         | select id,address, sum(salary) from employee_salary f join emp_address d
+         | on f.name=d.name group by id,address""".stripMargin)
+    spark.sql(s"""rebuild datamap simple_join_agg_employee""")
+
+    val timeWithMVJoin =
+      time(spark.sql(
+        s"""select id,address, sum(salary) from employee_salary f join emp_address d
+           | on f.name=d.name group by id,address""".stripMargin).collect())
+    val timeWithOutMVJoin =
+      time(spark.sql(
+        s"""select id,address, sum(salary) from employee_salary_without_mv f
+           |join emp_address d on f.name=d.name group by id,address""".stripMargin).collect())
+    // scalastyle:off
+    println("Time of table with MV with join is : " + timeWithMVJoin + " time withoutmv :
" +
+            timeWithOutMVJoin)
+    // scalastyle:on
+
+    spark.sql("DROP TABLE IF EXISTS employee_salary")
+    spark.sql("DROP TABLE IF EXISTS emp_address")
+    spark.sql("DROP TABLE IF EXISTS employee_salary_without_mv")
+  }
+
+  private def createFactTable(spark: SparkSession, tableName: String): Unit = {
+    import spark.implicits._
+    val rand = new Random()
+    // Create fact table with datamap
+    val df = spark.sparkContext.parallelize(1 to 1000000)
+      .map(x => (x % 1000, "name" + x % 1000, "city" + x % 100, rand.nextInt()))
+      .toDF("id", "name", "city", "salary")
+
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .save()
+  }
+
+  // define time function
+  private def time(code: => Unit): Double = {
+    val start = System.currentTimeMillis()
+    code
+    // return time in second
+    (System.currentTimeMillis() - start).toDouble / 1000
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d980d4cd/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
index 2b9b999..d3623ba 100644
--- a/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
+++ b/examples/spark2/src/test/scala/org/apache/carbondata/examplesCI/RunExamples.scala
@@ -113,4 +113,11 @@ class RunExamples extends QueryTest with BeforeAndAfterAll {
   test("ExternalTableExample") {
     ExternalTableExample.exampleBody(spark)
   }
+
+  test("MVDataMapExample") {
+    // MV only works for 2.2 and above
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      MVDataMapExample.exampleBody(spark)
+    }
+  }
 }
\ No newline at end of file


Mime
View raw message