carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-1899] Optimize CarbonData concurrency test case
Date Mon, 02 Apr 2018 06:47:41 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master fca960e37 -> 52f8d7111


[CARBONDATA-1899] Optimize CarbonData concurrency test case

This closes #1713


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/52f8d711
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/52f8d711
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/52f8d711

Branch: refs/heads/master
Commit: 52f8d7111c0d29ced3974fb9e8a63b52fb57d5a4
Parents: fca960e
Author: xubo245 <601450868@qq.com>
Authored: Fri Dec 22 16:55:01 2017 +0800
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Mon Apr 2 14:47:06 2018 +0800

----------------------------------------------------------------------
 .../benchmark/ConcurrentQueryBenchmark.scala    | 573 +++++++++++++++++++
 .../carbondata/benchmark/DataGenerator.scala    |  83 +++
 .../org/apache/carbondata/benchmark/Query.scala |  27 +
 .../benchmark/SimpleQueryBenchmark.scala        | 341 +++++++++++
 .../carbondata/examples/CompareTest.scala       | 397 -------------
 .../carbondata/examples/ConcurrencyTest.scala   | 358 ------------
 6 files changed, 1024 insertions(+), 755 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
new file mode 100644
index 0000000..7da8c29
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util
+import java.util.Date
+import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
+
+import scala.util.Random
+
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonVersionConstants}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+// scalastyle:off println
+/**
+ * Test concurrent query performance of CarbonData
+ *
+ * This benchmark will print out some information:
+ * 1.Environment information
+ * 2.Parameters information
+ * 3.concurrent query performance result using parquet format
+ * 4.concurrent query performance result using CarbonData format
+ *
+ * This benchmark default run in local model,
+ * user can change 'runInLocal' to false if want to run in cluster,
+ * user can change variables like:
+ *
+ * spark-submit \
+ * --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \
+ * --master  yarn \
+ * --deploy-mode client \
+ * --driver-memory 16g \
+ * --executor-cores 4g \
+ * --executor-memory 24g \
+ * --num-executors 3  \
+ * concurrencyTest.jar \
+ * totalNum threadNum taskNum resultIsEmpty runInLocal generateFile deleteFile
+ * details in initParameters method of this benchmark
+ */
+object ConcurrentQueryBenchmark {
+
+  // generate number of data
+  var totalNum = 1 * 10 * 1000
+  // the number of thread pool
+  var threadNum = 16
+  // task number of spark sql query
+  var taskNum = 100
+  // whether is result empty, if true then result is empty
+  var resultIsEmpty = true
+  // the store path of task details
+  var path: String = "/tmp/carbondata"
+  // whether run in local or cluster
+  var runInLocal = true
+  // whether generate new file
+  var generateFile = true
+  // whether delete file
+  var deleteFile = true
+
+  val cardinalityId = 100 * 1000 * 1000
+  val cardinalityCity = 6
+
+  def parquetTableName: String = "Num" + totalNum + "_" + "comparetest_parquet"
+
+  def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc"
+
+  def carbonTableName(version: String): String =
+    "Num" + totalNum + "_" + s"comparetest_carbonV$version"
+
+  // performance test queries, they are designed to test various data access type
+  val r = new Random()
+  lazy val tmpId = r.nextInt(cardinalityId) % totalNum
+  lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum)
+  // different query SQL
+  lazy val queries: Array[Query] = Array(
+    Query(
+      "select * from $table" + s" where id = '$tmpId' ",
+      "filter scan",
+      "filter on high card dimension"
+    )
+    , Query(
+      "select id from $table" + s" where id = '$tmpId' ",
+      "filter scan",
+      "filter on high card dimension"
+    ),
+    Query(
+      "select city from $table" + s" where id = '$tmpId' ",
+      "filter scan",
+      "filter on high card dimension"
+    ),
+    Query(
+      "select * from $table" + s" where city = '$tmpCity' limit 100",
+      "filter scan",
+      "filter on low card dimension, medium result set, fetch all columns"
+    ),
+
+    Query(
+      "select city from $table" + s" where city = '$tmpCity' limit 100",
+      "filter scan",
+      "filter on low card dimension"
+    ),
+
+    Query(
+      "select id from $table" + s" where city = '$tmpCity'  limit 100",
+      "filter scan",
+      "filter on low card dimension"
+    ),
+
+    Query(
+      "select country, sum(m1) from $table group by country",
+      "aggregate",
+      "group by on big data, on medium card column, medium result set,"
+    ),
+
+    Query(
+      "select country, sum(m1) from $table" +
+        s" where id = '$tmpId' group by country",
+      "aggregate",
+      "group by on big data, on medium card column, medium result set,"
+    ),
+
+    Query(
+      "select t1.country, sum(t1.m1) from $table t1 join $table t2"
+        + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country",
+      "aggregate",
+      "group by on big data, on medium card column, medium result set,"
+    ),
+
+    Query(
+      "select t2.country, sum(t2.m1) " +
+        "from $table t1 join $table t2 join $table t3 " +
+        "join $table t4 join $table t5 join $table t6 join $table t7 " +
+        s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " +
+        s"and t1.id=t5.id and t1.id=t6.id and " +
+        s"t1.id=t7.id " +
+        s" where t2.id = '$tmpId' " +
+        s" group by t2.country",
+      "aggregate",
+      "group by on big data, on medium card column, medium result set,"
+    )
+  )
+
+  /**
+   * generate parquet format table
+   *
+   * @param spark SparkSession
+   * @param input DataFrame
+   * @param table table name
+   * @return the time of generating parquet format table
+   */
+  private def generateParquetTable(spark: SparkSession, input: DataFrame, table: String)
+  : Double = time {
+    // partitioned by last 1 digit of id column
+    val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
+    dfWithPartition.write
+      .partitionBy("partitionCol")
+      .mode(SaveMode.Overwrite)
+      .parquet(table)
+  }
+
+  /**
+   * generate ORC format table
+   *
+   * @param spark SparkSession
+   * @param input DataFrame
+   * @param table table name
+   * @return the time of generating ORC format table
+   */
+  private def generateOrcTable(spark: SparkSession, input: DataFrame, table: String): Double =
+    time {
+      // partitioned by last 1 digit of id column
+      input.write
+        .mode(SaveMode.Overwrite)
+        .orc(table)
+    }
+
+  /**
+   * generate carbon format table
+   *
+   * @param spark     SparkSession
+   * @param input     DataFrame
+   * @param tableName table name
+   * @return the time of generating carbon format table
+   */
+  private def generateCarbonTable(spark: SparkSession, input: DataFrame, tableName: String)
+  : Double = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+      "3"
+    )
+    spark.sql(s"drop table if exists $tableName")
+    time {
+      input.write
+        .format("carbondata")
+        .option("tableName", tableName)
+        .option("tempCSV", "false")
+        .option("single_pass", "true")
+        .option("dictionary_exclude", "id") // id is high cardinality column
+        .option("table_blocksize", "32")
+        .mode(SaveMode.Overwrite)
+        .save()
+    }
+  }
+
+  /**
+   * load data into parquet, carbonV2, carbonV3
+   *
+   * @param spark  SparkSession
+   * @param table1 table1 name
+   * @param table2 table2 name
+   */
+  def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
+    val df = if (generateFile) {
+      DataGenerator.generateDataFrame(spark, totalNum).cache
+    } else {
+      null
+    }
+
+    val table1Time = time {
+      if (table1.endsWith("parquet")) {
+        if (generateFile) {
+          generateParquetTable(spark, df, table1)
+        }
+        spark.read.parquet(table1).createOrReplaceTempView(table1)
+      } else if (table1.endsWith("orc")) {
+        if (generateFile) {
+          generateOrcTable(spark, df, table1)
+          spark.read.orc(table1).createOrReplaceTempView(table1)
+        }
+      } else {
+        sys.error("invalid table: " + table1)
+      }
+    }
+    println(s"$table1 completed, time: $table1Time sec")
+
+    val table2Time: Double = if (generateFile) {
+      generateCarbonTable(spark, df, table2)
+    } else {
+      0.0
+    }
+    println(s"$table2 completed, time: $table2Time sec")
+    if (null != df) {
+      df.unpersist()
+    }
+  }
+
+  /**
+   * Run all queries for the specified table
+   *
+   * @param spark     SparkSession
+   * @param tableName table name
+   */
+  private def runQueries(spark: SparkSession, tableName: String): Unit = {
+    println()
+    println(s"Start running queries for $tableName...")
+    println(
+      "Min: min time" +
+        "\tMax: max time" +
+        "\t90%: 90% time" +
+        "\t99%: 99% time" +
+        "\tAvg: average time" +
+        "\tCount: number of result" +
+        "\tQuery X: running different query sql" +
+        "\tResult: show it when ResultIsEmpty is false" +
+        "\tTotal execute time: total runtime")
+    queries.zipWithIndex.map { case (query, index) =>
+      val sqlText = query.sqlText.replace("$table", tableName)
+
+      val executorService = Executors.newFixedThreadPool(threadNum)
+      val tasks = new java.util.ArrayList[Callable[Results]]()
+      val tasksStartTime = System.nanoTime()
+      for (num <- 1 to taskNum) {
+        tasks.add(new QueryTask(spark, sqlText))
+      }
+      val results = executorService.invokeAll(tasks)
+
+      executorService.shutdown()
+      executorService.awaitTermination(600, TimeUnit.SECONDS)
+
+      val tasksEndTime = System.nanoTime()
+      val sql = s"Query ${index + 1}: $sqlText "
+      printResults(results, sql, tasksStartTime)
+      val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 * 1000 * 1000)
+      println("Total execute time: " + taskTime.formatted("%.3f") + " s")
+
+      val timeString = new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
+      writeResults(spark, results, sql, tasksStartTime,
+        path + s"/${tableName}_query${index + 1}_$timeString")
+    }
+  }
+
+  /**
+   * save the result for subsequent  analysis
+   *
+   * @param spark    SparkSession
+   * @param results  Results
+   * @param sql      query sql
+   * @param start    tasks start time
+   * @param filePath write file path
+   */
+  def writeResults(
+      spark: SparkSession,
+      results: java.util.List[Future[Results]],
+      sql: String = "",
+      start: Long,
+      filePath: String): Unit = {
+    val timeArray = new Array[(Double, Double, Double)](results.size())
+    for (i <- 0 until results.size()) {
+      timeArray(i) =
+        ((results.get(i).get().startTime - start) / (1000.0 * 1000),
+          (results.get(i).get().endTime - start) / (1000.0 * 1000),
+          (results.get(i).get().endTime - results.get(i).get().startTime) / (1000.0 * 1000))
+    }
+    val timeArraySorted = timeArray.sortBy(x => x._1)
+    val timeArrayString = timeArraySorted.map { e =>
+      e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" + e._3.formatted("%.3f")
+    }
+    val saveArray = Array(sql, "startTime, endTime, runtime, measure time by the microsecond",
+      s"${timeArrayString.length}")
+      .union(timeArrayString)
+    val rdd = spark.sparkContext.parallelize(saveArray, 1)
+    rdd.saveAsTextFile(filePath)
+  }
+
+  /**
+   * print out results
+   *
+   * @param results        Results
+   * @param sql            query sql
+   * @param tasksStartTime tasks start time
+   */
+  def printResults(results: util.List[Future[Results]], sql: String = "", tasksStartTime: Long) {
+    val timeArray = new Array[Double](results.size())
+    val sqlResult = results.get(0).get().sqlResult
+    for (i <- 0 until results.size()) {
+      results.get(i).get()
+    }
+    for (i <- 0 until results.size()) {
+      timeArray(i) = results.get(i).get().time
+    }
+    val sortTimeArray = timeArray.sorted
+
+    // the time of 90 percent sql are finished
+    val time90 = ((sortTimeArray.length) * 0.9).toInt - 1
+    // the time of 99 percent sql are finished
+    val time99 = ((sortTimeArray.length) * 0.99).toInt - 1
+    print(
+      "Min: " + sortTimeArray.head.formatted("%.3f") + " s," +
+        "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," +
+        "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," +
+        "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," +
+        "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," +
+        "\t\tCount: " + results.get(0).get.count +
+        "\t\t\t\t" + sql +
+        "\t" + sqlResult.mkString(",") + "\t")
+  }
+
+  /**
+   * save result after finishing each task/thread
+   *
+   * @param time      each task time of executing query sql  and with millis time
+   * @param sqlResult query sql result
+   * @param count     result count
+   * @param startTime task start time with nano time
+   * @param endTime   task end time with nano time
+   */
+  case class Results(
+      time: Double,
+      sqlResult: Array[Row],
+      count: Int,
+      startTime: Long,
+      endTime: Long)
+
+
+  class QueryTask(spark: SparkSession, query: String)
+    extends Callable[Results] with Serializable {
+    override def call(): Results = {
+      var result: Array[Row] = null
+      val startTime = System.nanoTime()
+      val rt = time {
+        result = spark.sql(query).collect()
+      }
+      val endTime = System.nanoTime()
+      if (resultIsEmpty) {
+        Results(rt, Array.empty[Row], count = result.length, startTime, endTime)
+      } else {
+        Results(rt, result, count = result.length, startTime, endTime)
+      }
+    }
+  }
+
+  /**
+   * run testcases and print comparison result
+   *
+   * @param spark  SparkSession
+   * @param table1 table1 name
+   * @param table2 table2 name
+   */
+  def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
+    // run queries on parquet and carbon
+    runQueries(spark, table1)
+    // do GC and sleep for some time before running next table
+    System.gc()
+    Thread.sleep(1000)
+    System.gc()
+    Thread.sleep(1000)
+    runQueries(spark, table2)
+  }
+
+  /**
+   * the time of running code
+   *
+   * @param code the code
+   * @return the run time
+   */
+  def time(code: => Unit): Double = {
+    val start = System.currentTimeMillis()
+    code
+    // return time in second
+    (System.currentTimeMillis() - start).toDouble / 1000
+  }
+
+  /**
+   * init parameters
+   *
+   * @param arr parameters
+   */
+  def initParameters(arr: Array[String]): Unit = {
+    if (arr.length > 0) {
+      totalNum = arr(0).toInt
+    }
+    if (arr.length > 1) {
+      threadNum = arr(1).toInt
+    }
+    if (arr.length > 2) {
+      taskNum = arr(2).toInt
+    }
+    if (arr.length > 3) {
+      resultIsEmpty = if (arr(3).equalsIgnoreCase("true")) {
+        true
+      } else if (arr(3).equalsIgnoreCase("false")) {
+        false
+      } else {
+        throw new Exception("error parameter, should be true or false")
+      }
+    }
+    if (arr.length > 4) {
+      path = arr(4)
+    }
+    if (arr.length > 5) {
+      runInLocal = if (arr(5).equalsIgnoreCase("true")) {
+        true
+      } else if (arr(5).equalsIgnoreCase("false")) {
+        false
+      } else {
+        throw new Exception("error parameter, should be true or false")
+      }
+    }
+    if (arr.length > 6) {
+      generateFile = if (arr(6).equalsIgnoreCase("true")) {
+        true
+      } else if (arr(6).equalsIgnoreCase("false")) {
+        false
+      } else {
+        throw new Exception("error parameter, should be true or false")
+      }
+    }
+    if (arr.length > 7) {
+      deleteFile = if (arr(7).equalsIgnoreCase("true")) {
+        true
+      } else if (arr(7).equalsIgnoreCase("false")) {
+        false
+      } else {
+        throw new Exception("error parameter, should be true or false")
+      }
+    }
+  }
+
+  /**
+   * main method of this benchmark
+   *
+   * @param args parameters
+   */
+  def main(args: Array[String]): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty("carbon.enable.vector.reader", "true")
+      .addProperty("enable.unsafe.sort", "true")
+      .addProperty("carbon.blockletgroup.size.in.mb", "32")
+      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+    import org.apache.spark.sql.CarbonSession._
+
+    // 1. initParameters
+    initParameters(args)
+    val table1 = parquetTableName
+    val table2 = carbonTableName("3")
+    val parameters = "totalNum: " + totalNum +
+      "\tthreadNum: " + threadNum +
+      "\ttaskNum: " + taskNum +
+      "\tresultIsEmpty: " + resultIsEmpty +
+      "\tfile path: " + path +
+      "\trunInLocal: " + runInLocal +
+      "\tgenerateFile: " + generateFile +
+      "\tdeleteFile: " + deleteFile
+
+    val spark = if (runInLocal) {
+      val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+      val storeLocation = s"$rootPath/examples/spark2/target/store"
+      SparkSession
+        .builder()
+        .appName(parameters)
+        .master("local[8]")
+        .enableHiveSupport()
+        .getOrCreateCarbonSession(storeLocation)
+    } else {
+      SparkSession
+        .builder()
+        .appName(parameters)
+        .enableHiveSupport()
+        .getOrCreateCarbonSession()
+    }
+    spark.sparkContext.setLogLevel("ERROR")
+
+    println("\nEnvironment information:")
+    val env = Array(
+      "spark.master",
+      "spark.driver.cores",
+      "spark.driver.memory",
+      "spark.executor.cores",
+      "spark.executor.memory",
+      "spark.executor.instances")
+    env.foreach { each =>
+      println(each + ":\t" + spark.conf.get(each, "default value") + "\t")
+    }
+    println("SPARK_VERSION:" + spark.version + "\t")
+    println("CARBONDATA_VERSION:" + CarbonVersionConstants.CARBONDATA_VERSION + "\t")
+    println("\nParameters information:")
+    println(parameters)
+
+    // 2. prepareTable
+    prepareTable(spark, table1, table2)
+
+    // 3. runTest
+    runTest(spark, table1, table2)
+
+    if (deleteFile) {
+      CarbonUtil.deleteFoldersAndFiles(new File(table1))
+      spark.sql(s"drop table $table2")
+    }
+    spark.close()
+  }
+}
+
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
new file mode 100644
index 0000000..e3e67b1
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/DataGenerator.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+// scalastyle:off println
+object DataGenerator {
+  // Table schema:
+  // +-------------+-----------+-------------+-------------+------------+
+  // | Column name | Data type | Cardinality | Column type | Dictionary |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | id          | string    | 100,000,000 | dimension   | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | city        | string    | 6           | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | country     | string    | 6           | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | planet      | string    | 10,007      | dimension   | yes        |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m1          | short     | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m2          | int       | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m3          | big int   | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m4          | double    | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  // | m5          | decimal   | NA          | measure     | no         |
+  // +-------------+-----------+-------------+-------------+------------+
+  /**
+   * generate DataFrame with above table schema
+   *
+   * @param spark SparkSession
+   * @return Dataframe of test data
+   */
+  def generateDataFrame(spark: SparkSession, totalNum: Int): DataFrame = {
+    val rdd = spark.sparkContext
+      .parallelize(1 to totalNum, 4)
+      .map { x =>
+        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007,
+          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+          BigDecimal.valueOf(x.toDouble / 11))
+      }.map { x =>
+      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+    }
+
+    val schema = StructType(
+      Seq(
+        StructField("id", StringType, nullable = false),
+        StructField("city", StringType, nullable = false),
+        StructField("country", StringType, nullable = false),
+        StructField("planet", StringType, nullable = false),
+        StructField("m1", ShortType, nullable = false),
+        StructField("m2", IntegerType, nullable = false),
+        StructField("m3", LongType, nullable = false),
+        StructField("m4", DoubleType, nullable = false),
+        StructField("m5", DecimalType(30, 10), nullable = false)
+      )
+    )
+
+    val df = spark.createDataFrame(rdd, schema)
+    println(s"Start generate ${df.count} records, schema: ${df.schema}")
+    df
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala
new file mode 100644
index 0000000..9978bda
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/Query.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+/**
+ * A query test case
+ *
+ * @param sqlText SQL statement
+ * @param queryType type of query: scan, filter, aggregate, topN
+ * @param desc description of the goal of this test case
+ */
+case class Query(sqlText: String, queryType: String, desc: String)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
new file mode 100644
index 0000000..880f476
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SimpleQueryBenchmark.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+
+
+
+// scalastyle:off println
+object SimpleQueryBenchmark {
+
+  def parquetTableName: String = "comparetest_parquet"
+  def orcTableName: String = "comparetest_orc"
+  def carbonTableName(version: String): String = s"comparetest_carbonV$version"
+
+  // performance test queries, they are designed to test various data access type
+  val queries: Array[Query] = Array(
+    // ===========================================================================
+    // ==                     FULL SCAN AGGREGATION                             ==
+    // ===========================================================================
+    Query(
+      "select sum(m1) from $table",
+      "full scan",
+      "full scan query, 1 aggregate"
+    ),
+    Query(
+      "select sum(m1), sum(m2) from $table",
+      "full scan",
+      "full scan query, 2 aggregate"
+    ),
+    Query(
+      "select sum(m1), sum(m2), sum(m3) from $table",
+      "full scan",
+      "full scan query, 3 aggregate"
+    ),
+    Query(
+      "select sum(m1), sum(m2), sum(m3), sum(m4) from $table",
+      "full scan",
+      "full scan query, 4 aggregate"
+    ),
+    Query(
+      "select sum(m1), sum(m2), sum(m3), sum(m4), avg(m5) from $table",
+      "full scan",
+      "full scan query, 5 aggregate"
+    ),
+    Query(
+      "select count(distinct id) from $table",
+      "full scan",
+      "full scan and count distinct of high card column"
+    ),
+    Query(
+      "select count(distinct country) from $table",
+      "full scan",
+      "full scan and count distinct of medium card column"
+    ),
+    Query(
+      "select count(distinct city) from $table",
+      "full scan",
+      "full scan and count distinct of low card column"
+    ),
+    // ===========================================================================
+    // ==                      FULL SCAN GROUP BY AGGREGATE                     ==
+    // ===========================================================================
+    Query(
+      "select country, sum(m1) as metric from $table group by country order by metric",
+      "aggregate",
+      "group by on big data, on medium card column, medium result set,"
+    ),
+    Query(
+      "select city, sum(m1) as metric from $table group by city order by metric",
+      "aggregate",
+      "group by on big data, on low card column, small result set,"
+    ),
+    Query(
+      "select id, sum(m1) as metric from $table group by id order by metric desc limit 100",
+      "topN",
+      "top N on high card column"
+    ),
+    Query(
+      "select country,sum(m1) as metric from $table group by country order by metric desc limit 10",
+      "topN",
+      "top N on medium card column"
+    ),
+    Query(
+      "select city,sum(m1) as metric from $table group by city order by metric desc limit 10",
+      "topN",
+      "top N on low card column"
+    ),
+    // ===========================================================================
+    // ==                  FILTER SCAN GROUP BY AGGREGATION                     ==
+    // ===========================================================================
+    Query(
+      "select country, sum(m1) as metric from $table where city='city8' group by country " +
+          "order by metric",
+      "filter scan and aggregate",
+      "group by on large data, small result set"
+    ),
+    Query(
+      "select id, sum(m1) as metric from $table where planet='planet10' group by id " +
+          "order by metric",
+      "filter scan and aggregate",
+      "group by on medium data, large result set"
+    ),
+    Query(
+      "select city, sum(m1) as metric from $table where country='country12' group by city " +
+          "order by metric",
+      "filter scan and aggregate",
+      "group by on medium data, small result set"
+    ),
+    // ===========================================================================
+    // ==                             FILTER SCAN                               ==
+    // ===========================================================================
+    Query(
+      "select * from $table where city = 'city3' limit 10000",
+      "filter scan",
+      "filter on low card dimension, limit, medium result set, fetch all columns"
+    ),
+    Query(
+      "select * from $table where country = 'country9' ",
+      "filter scan",
+      "filter on low card dimension, medium result set, fetch all columns"
+    ),
+    Query(
+      "select * from $table where planet = 'planet101' ",
+      "filter scan",
+      "filter on medium card dimension, small result set, fetch all columns"
+    ),
+    Query(
+      "select * from $table where id = '408938' ",
+      "filter scan",
+      "filter on high card dimension"
+    ),
+    Query(
+      "select * from $table where country='country10000'  ",
+      "filter scan",
+      "filter on low card dimension, not exist"
+    ),
+    Query(
+      "select * from $table where country='country2' and city ='city8' ",
+      "filter scan",
+      "filter on 2 dimensions, small result set, fetch all columns"
+    ),
+    Query(
+      "select * from $table where city='city1' and country='country2' and planet ='planet3' ",
+      "filter scan",
+      "filter on 3 dimensions, small result set, fetch all columns"
+    ),
+    Query(
+      "select * from $table where m1 < 3",
+      "filter scan",
+      "filter on measure, small result set, fetch all columns"
+    ),
+    Query(
+      "select * from $table where id like '1%' ",
+      "fuzzy filter scan",
+      "like filter, big result set"
+    ),
+    Query(
+      "select * from $table where id like '%111'",
+      "fuzzy filter scan",
+      "like filter, medium result set"
+    ),
+    Query(
+      "select * from $table where id like 'xyz%' ",
+      "fuzzy filter scan",
+      "like filter, full scan but not exist"
+    )
+  )
+
+  private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
+  : Double = time {
+    // partitioned by last 1 digit of id column
+    val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
+    dfWithPartition.write
+        .partitionBy("partitionCol")
+        .mode(SaveMode.Overwrite)
+        .parquet(table)
+    spark.read.parquet(table).createOrReplaceTempView(table)
+  }
+
+  private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time {
+    // partitioned by last 1 digit of id column
+    input.write
+        .mode(SaveMode.Overwrite)
+        .orc(table)
+    spark.read.orc(table).createOrReplaceTempView(table)
+  }
+
+  private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+      "V3"
+    )
+    spark.sql(s"drop table if exists $tableName")
+    time {
+      input.write
+          .format("carbondata")
+          .option("tableName", tableName)
+          .option("single_pass", "true")
+          .option("table_blocksize", "32")
+          .mode(SaveMode.Overwrite)
+          .save()
+    }
+  }
+
+  // load data into parquet, carbonV2, carbonV3
+  private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
+    val df = DataGenerator.generateDataFrame(spark, totalNum = 10 * 10 * 1000).cache
+    println(s"loading ${df.count} records, schema: ${df.schema}")
+    val table1Time = if (table1.endsWith("parquet")) {
+      loadParquetTable(spark, df, table1)
+    } else if (table1.endsWith("orc")) {
+      loadOrcTable(spark, df, table1)
+    } else {
+      sys.error("invalid table: " + table1)
+    }
+    val table2Time = loadCarbonTable(spark, df, table2)
+    println(s"load completed, time: $table1Time, $table2Time")
+    df.unpersist()
+  }
+
+  // Run all queries for the specified table
+  private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = {
+    println(s"start running queries for $tableName...")
+    var result: Array[Row] = null
+    queries.zipWithIndex.map { case (query, index) =>
+      val sqlText = query.sqlText.replace("$table", tableName)
+      print(s"running query ${index + 1}: $sqlText ")
+      val rt = time {
+        result = spark.sql(sqlText).collect()
+      }
+      println(s"=> $rt sec")
+      (rt, result)
+    }
+  }
+
+  private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row],
+      table2: String, result2: Array[Row]): Unit = {
+    // check result size instead of result value, because some test case include
+    // aggregation on double column which will give different result since carbon
+    // records are sorted
+    if (result1.length != result2.length) {
+      val num = index + 1
+      println(s"$table1 result for query $num: ")
+      println(s"""${result1.mkString(",")}""")
+      println(s"$table2 result for query $num: ")
+      println(s"""${result2.mkString(",")}""")
+      sys.error(s"result not matching for query $num (${queries(index).desc})")
+    }
+  }
+
+  // run testcases and print comparison result
+  private def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
+    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val date = new Date
+    // run queries on parquet and carbon
+    val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1)
+    // do GC and sleep for some time before running next table
+    System.gc()
+    Thread.sleep(1000)
+    System.gc()
+    Thread.sleep(1000)
+    val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2)
+    // check result by comparing output from parquet and carbon
+    table1Result.zipWithIndex.foreach { case (result, index) =>
+      printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2)
+    }
+    // print all response time in JSON format, so that it can be analyzed later
+    queries.zipWithIndex.foreach { case (query, index) =>
+      println("{" +
+          s""""query":"${index + 1}", """ +
+          s""""$table1 time":${table1Result(index)._1}, """ +
+          s""""$table2 time":${table2Result(index)._1}, """ +
+          s""""fetched":${table1Result(index)._2.length}, """ +
+          s""""type":"${query.queryType}", """ +
+          s""""desc":"${query.desc}",  """ +
+          s""""date": "${formatter.format(date)}" """ +
+          "}"
+      )
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    CarbonProperties.getInstance()
+        .addProperty("carbon.enable.vector.reader", "true")
+        .addProperty("enable.unsafe.sort", "true")
+        .addProperty("carbon.blockletgroup.size.in.mb", "32")
+        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+    import org.apache.spark.sql.CarbonSession._
+    val rootPath = new File(this.getClass.getResource("/").getPath
+        + "../../../..").getCanonicalPath
+    val storeLocation = s"$rootPath/examples/spark2/target/store"
+    val spark = SparkSession
+        .builder()
+        .master("local")
+        .enableHiveSupport()
+        .config("spark.driver.host", "127.0.0.1")
+        .getOrCreateCarbonSession(storeLocation)
+    spark.sparkContext.setLogLevel("warn")
+
+    val table1 = parquetTableName
+    val table2 = carbonTableName("3")
+    prepareTable(spark, table1, table2)
+    runTest(spark, table1, table2)
+
+    CarbonUtil.deleteFoldersAndFiles(new File(table1))
+    spark.sql(s"drop table if exists $table2")
+    spark.close()
+  }
+
+  def time(code: => Unit): Double = {
+    val start = System.currentTimeMillis()
+    code
+    // return time in second
+    (System.currentTimeMillis() - start).toDouble / 1000
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
deleted file mode 100644
index d27b1c4..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.File
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-
-/**
- * A query test case
- * @param sqlText SQL statement
- * @param queryType type of query: scan, filter, aggregate, topN
- * @param desc description of the goal of this test case
- */
-case class Query(sqlText: String, queryType: String, desc: String)
-
-// scalastyle:off println
-object CompareTest {
-
-  def parquetTableName: String = "comparetest_parquet"
-  def orcTableName: String = "comparetest_orc"
-  def carbonTableName(version: String): String = s"comparetest_carbonV$version"
-
-  // Table schema:
-  // +-------------+-----------+-------------+-------------+------------+
-  // | Column name | Data type | Cardinality | Column type | Dictionary |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | city        | string    | 8           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | country     | string    | 1103        | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | planet      | string    | 10,007      | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | id          | string    | 10,000,000  | dimension   | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m1          | short     | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m2          | int       | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m3          | big int   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m4          | double    | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m5          | decimal   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  private def generateDataFrame(spark: SparkSession): DataFrame = {
-    val rdd = spark.sparkContext
-        .parallelize(1 to 10 * 1000 * 1000, 4)
-        .map { x =>
-          ("city" + x % 8, "country" + x % 1103, "planet" + x % 10007, "IDENTIFIER" + x.toString,
-              (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
-              BigDecimal.valueOf(x.toDouble / 11))
-        }.map { x =>
-      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
-    }
-
-    val schema = StructType(
-      Seq(
-        StructField("city", StringType, nullable = false),
-        StructField("country", StringType, nullable = false),
-        StructField("planet", StringType, nullable = false),
-        StructField("id", StringType, nullable = false),
-        StructField("m1", ShortType, nullable = false),
-        StructField("m2", IntegerType, nullable = false),
-        StructField("m3", LongType, nullable = false),
-        StructField("m4", DoubleType, nullable = false),
-        StructField("m5", DecimalType(30, 10), nullable = false)
-      )
-    )
-
-    spark.createDataFrame(rdd, schema)
-  }
-
-  // performance test queries, they are designed to test various data access type
-  val queries: Array[Query] = Array(
-    // ===========================================================================
-    // ==                     FULL SCAN AGGREGATION                             ==
-    // ===========================================================================
-    Query(
-      "select sum(m1) from $table",
-      "full scan",
-      "full scan query, 1 aggregate"
-    ),
-    Query(
-      "select sum(m1), sum(m2) from $table",
-      "full scan",
-      "full scan query, 2 aggregate"
-    ),
-    Query(
-      "select sum(m1), sum(m2), sum(m3) from $table",
-      "full scan",
-      "full scan query, 3 aggregate"
-    ),
-    Query(
-      "select sum(m1), sum(m2), sum(m3), sum(m4) from $table",
-      "full scan",
-      "full scan query, 4 aggregate"
-    ),
-    Query(
-      "select sum(m1), sum(m2), sum(m3), sum(m4), avg(m5) from $table",
-      "full scan",
-      "full scan query, 5 aggregate"
-    ),
-    Query(
-      "select count(distinct id) from $table",
-      "full scan",
-      "full scan and count distinct of high card column"
-    ),
-    Query(
-      "select count(distinct country) from $table",
-      "full scan",
-      "full scan and count distinct of medium card column"
-    ),
-    Query(
-      "select count(distinct city) from $table",
-      "full scan",
-      "full scan and count distinct of low card column"
-    ),
-    // ===========================================================================
-    // ==                      FULL SCAN GROUP BY AGGREGATE                     ==
-    // ===========================================================================
-    Query(
-      "select country, sum(m1) as metric from $table group by country order by metric",
-      "aggregate",
-      "group by on big data, on medium card column, medium result set,"
-    ),
-    Query(
-      "select city, sum(m1) as metric from $table group by city order by metric",
-      "aggregate",
-      "group by on big data, on low card column, small result set,"
-    ),
-    Query(
-      "select id, sum(m1) as metric from $table group by id order by metric desc limit 100",
-      "topN",
-      "top N on high card column"
-    ),
-    Query(
-      "select country,sum(m1) as metric from $table group by country order by metric desc limit 10",
-      "topN",
-      "top N on medium card column"
-    ),
-    Query(
-      "select city,sum(m1) as metric from $table group by city order by metric desc limit 10",
-      "topN",
-      "top N on low card column"
-    ),
-    // ===========================================================================
-    // ==                  FILTER SCAN GROUP BY AGGREGATION                     ==
-    // ===========================================================================
-    Query(
-      "select country, sum(m1) as metric from $table where city='city8' group by country " +
-          "order by metric",
-      "filter scan and aggregate",
-      "group by on large data, small result set"
-    ),
-    Query(
-      "select id, sum(m1) as metric from $table where planet='planet10' group by id " +
-          "order by metric",
-      "filter scan and aggregate",
-      "group by on medium data, large result set"
-    ),
-    Query(
-      "select city, sum(m1) as metric from $table where country='country12' group by city " +
-          "order by metric",
-      "filter scan and aggregate",
-      "group by on medium data, small result set"
-    ),
-    // ===========================================================================
-    // ==                             FILTER SCAN                               ==
-    // ===========================================================================
-    Query(
-      "select * from $table where city = 'city3' limit 10000",
-      "filter scan",
-      "filter on low card dimension, limit, medium result set, fetch all columns"
-    ),
-    Query(
-      "select * from $table where country = 'country9' ",
-      "filter scan",
-      "filter on low card dimension, medium result set, fetch all columns"
-    ),
-    Query(
-      "select * from $table where planet = 'planet101' ",
-      "filter scan",
-      "filter on medium card dimension, small result set, fetch all columns"
-    ),
-    Query(
-      "select * from $table where id = '408938' ",
-      "filter scan",
-      "filter on high card dimension"
-    ),
-    Query(
-      "select * from $table where country='country10000'  ",
-      "filter scan",
-      "filter on low card dimension, not exist"
-    ),
-    Query(
-      "select * from $table where country='country2' and city ='city8' ",
-      "filter scan",
-      "filter on 2 dimensions, small result set, fetch all columns"
-    ),
-    Query(
-      "select * from $table where city='city1' and country='country2' and planet ='planet3' ",
-      "filter scan",
-      "filter on 3 dimensions, small result set, fetch all columns"
-    ),
-    Query(
-      "select * from $table where m1 < 3",
-      "filter scan",
-      "filter on measure, small result set, fetch all columns"
-    ),
-    Query(
-      "select * from $table where id like '1%' ",
-      "fuzzy filter scan",
-      "like filter, big result set"
-    ),
-    Query(
-      "select * from $table where id like '%111'",
-      "fuzzy filter scan",
-      "like filter, medium result set"
-    ),
-    Query(
-      "select * from $table where id like 'xyz%' ",
-      "fuzzy filter scan",
-      "like filter, full scan but not exist"
-    )
-  )
-
-  private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
-  : Double = time {
-    // partitioned by last 1 digit of id column
-    val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
-    dfWithPartition.write
-        .partitionBy("partitionCol")
-        .mode(SaveMode.Overwrite)
-        .parquet(table)
-    spark.read.parquet(table).createOrReplaceTempView(table)
-  }
-
-  private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time {
-    // partitioned by last 1 digit of id column
-    input.write
-        .mode(SaveMode.Overwrite)
-        .orc(table)
-    spark.read.orc(table).createOrReplaceTempView(table)
-  }
-
-  private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = {
-    CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
-      "V3"
-    )
-    spark.sql(s"drop table if exists $tableName")
-    time {
-      input.write
-          .format("carbondata")
-          .option("tableName", tableName)
-          .option("single_pass", "true")
-          .option("table_blocksize", "32")
-          .mode(SaveMode.Overwrite)
-          .save()
-    }
-  }
-
-  // load data into parquet, carbonV2, carbonV3
-  private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
-    val df = generateDataFrame(spark).cache
-    println(s"loading ${df.count} records, schema: ${df.schema}")
-    val table1Time = if (table1.endsWith("parquet")) {
-      loadParquetTable(spark, df, table1)
-    } else if (table1.endsWith("orc")) {
-      loadOrcTable(spark, df, table1)
-    } else {
-      sys.error("invalid table: " + table1)
-    }
-    val table2Time = loadCarbonTable(spark, df, table2)
-    println(s"load completed, time: $table1Time, $table2Time")
-    df.unpersist()
-  }
-
-  // Run all queries for the specified table
-  private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])] = {
-    println(s"start running queries for $tableName...")
-    var result: Array[Row] = null
-    queries.zipWithIndex.map { case (query, index) =>
-      val sqlText = query.sqlText.replace("$table", tableName)
-      print(s"running query ${index + 1}: $sqlText ")
-      val rt = time {
-        result = spark.sql(sqlText).collect()
-      }
-      println(s"=> $rt sec")
-      (rt, result)
-    }
-  }
-
-  private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row],
-      table2: String, result2: Array[Row]): Unit = {
-    // check result size instead of result value, because some test case include
-    // aggregation on double column which will give different result since carbon
-    // records are sorted
-    if (result1.length != result2.length) {
-      val num = index + 1
-      println(s"$table1 result for query $num: ")
-      println(s"""${result1.mkString(",")}""")
-      println(s"$table2 result for query $num: ")
-      println(s"""${result2.mkString(",")}""")
-      sys.error(s"result not matching for query $num (${queries(index).desc})")
-    }
-  }
-
-  // run testcases and print comparison result
-  private def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
-    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-    val date = new Date
-    // run queries on parquet and carbon
-    val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1)
-    // do GC and sleep for some time before running next table
-    System.gc()
-    Thread.sleep(1000)
-    System.gc()
-    Thread.sleep(1000)
-    val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2)
-    // check result by comparing output from parquet and carbon
-    table1Result.zipWithIndex.foreach { case (result, index) =>
-      printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2)
-    }
-    // print all response time in JSON format, so that it can be analyzed later
-    queries.zipWithIndex.foreach { case (query, index) =>
-      println("{" +
-          s""""query":"${index + 1}", """ +
-          s""""$table1 time":${table1Result(index)._1}, """ +
-          s""""$table2 time":${table2Result(index)._1}, """ +
-          s""""fetched":${table1Result(index)._2.length}, """ +
-          s""""type":"${query.queryType}", """ +
-          s""""desc":"${query.desc}",  """ +
-          s""""date": "${formatter.format(date)}" """ +
-          "}"
-      )
-    }
-  }
-
-  def main(args: Array[String]): Unit = {
-    CarbonProperties.getInstance()
-        .addProperty("carbon.enable.vector.reader", "true")
-        .addProperty("enable.unsafe.sort", "true")
-        .addProperty("carbon.blockletgroup.size.in.mb", "32")
-        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
-    import org.apache.spark.sql.CarbonSession._
-    val rootPath = new File(this.getClass.getResource("/").getPath
-        + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-    val spark = SparkSession
-        .builder()
-        .master("local")
-        .enableHiveSupport()
-        .config("spark.driver.host", "127.0.0.1")
-        .getOrCreateCarbonSession(storeLocation)
-    spark.sparkContext.setLogLevel("warn")
-
-    val table1 = parquetTableName
-    val table2 = carbonTableName("3")
-    prepareTable(spark, table1, table2)
-    runTest(spark, table1, table2)
-
-    CarbonUtil.deleteFoldersAndFiles(new File(table1))
-    spark.sql(s"drop table if exists $table2")
-    spark.close()
-  }
-
-  def time(code: => Unit): Double = {
-    val start = System.currentTimeMillis()
-    code
-    // return time in second
-    (System.currentTimeMillis() - start).toDouble / 1000
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/52f8d711/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
deleted file mode 100644
index 09921cb..0000000
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/ConcurrencyTest.scala
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.examples
-
-import java.io.File
-import java.util
-import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
-
-import scala.util.Random
-
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-
-// scalastyle:off println
-object ConcurrencyTest {
-
-  var totalNum = 100 * 1000 * 1000
-  var ThreadNum = 16
-  var TaskNum = 100
-  var ResultIsEmpty = true
-  val cardinalityId = 10000 * 10000
-  val cardinalityCity = 6
-
-  def parquetTableName: String = "comparetest_parquet"
-
-  def orcTableName: String = "comparetest_orc"
-
-  def carbonTableName(version: String): String = s"comparetest_carbonV$version"
-
-  // Table schema:
-  // +-------------+-----------+-------------+-------------+------------+
-  // | id          | string    | 100,000,000 | dimension   | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | Column name | Data type | Cardinality | Column type | Dictionary |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | city        | string    | 6           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | country     | string    | 6           | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | planet      | string    | 100,007     | dimension   | yes        |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m1          | short     | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m2          | int       | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m3          | big int   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m4          | double    | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-  // | m5          | decimal   | NA          | measure     | no         |
-  // +-------------+-----------+-------------+-------------+------------+
-
-  private def generateDataFrame(spark: SparkSession): DataFrame = {
-    val rdd = spark.sparkContext
-      .parallelize(1 to totalNum, 4)
-      .map { x =>
-        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, "planet" + x % 10007,
-          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
-          BigDecimal.valueOf(x.toDouble / 11))
-      }.map { x =>
-      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
-    }
-
-    val schema = StructType(
-      Seq(
-        StructField("id", StringType, nullable = false),
-        StructField("city", StringType, nullable = false),
-        StructField("country", StringType, nullable = false),
-        StructField("planet", StringType, nullable = false),
-        StructField("m1", ShortType, nullable = false),
-        StructField("m2", IntegerType, nullable = false),
-        StructField("m3", LongType, nullable = false),
-        StructField("m4", DoubleType, nullable = false),
-        StructField("m5", DecimalType(30, 10), nullable = false)
-      )
-    )
-
-    spark.createDataFrame(rdd, schema)
-  }
-
-  // performance test queries, they are designed to test various data access type
-  val r = new Random()
-  val tmpId = r.nextInt(cardinalityId) % totalNum
-  val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum)
-  val queries: Array[Query] = Array(
-    Query(
-      "select * from $table" + s" where id = '$tmpId' ",
-      "filter scan",
-      "filter on high card dimension"
-    ),
-
-    Query(
-      "select id from $table" + s" where id = '$tmpId' ",
-      "filter scan",
-      "filter on high card dimension"
-    ),
-
-    Query(
-      "select * from $table" + s" where city = '$tmpCity' ",
-      "filter scan",
-      "filter on high card dimension"
-    ),
-
-    Query(
-      "select city from $table" + s" where city = '$tmpCity' ",
-      "filter scan",
-      "filter on high card dimension"
-    ),
-
-    Query(
-      "select country, sum(m1) from $table group by country",
-      "aggregate",
-      "group by on big data, on medium card column, medium result set,"
-    ),
-
-    Query(
-      "select country, sum(m1) from $table" +
-        s" where id = '$tmpId' group by country",
-      "aggregate",
-      "group by on big data, on medium card column, medium result set,"
-    ),
-
-    Query(
-      "select t1.country, sum(t1.m1) from $table t1 join $table t2"
-        + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country",
-      "aggregate",
-      "group by on big data, on medium card column, medium result set,"
-    )
-    ,
-    Query(
-      "select t2.country, sum(t2.m1) " +
-        "from $table t1 join $table t2 join $table t3 " +
-        "join $table t4 join $table t5 join $table t6 join $table t7 " +
-        s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " +
-        s"and t1.id=t5.id and t1.id=t6.id and " +
-        s"t1.id=t7.id " +
-        s" where t2.id = '$tmpId' " +
-        s" group by t2.country",
-      "aggregate",
-      "group by on big data, on medium card column, medium result set,"
-    )
-  )
-
-  private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
-  : Double = time {
-    // partitioned by last 1 digit of id column
-    val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
-    dfWithPartition.write
-      .partitionBy("partitionCol")
-      .mode(SaveMode.Overwrite)
-      .parquet(table)
-    spark.read.parquet(table).createOrReplaceTempView(table)
-  }
-
-  private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double = time {
-    // partitioned by last 1 digit of id column
-    input.write
-      .mode(SaveMode.Overwrite)
-      .orc(table)
-    spark.read.orc(table).createOrReplaceTempView(table)
-  }
-
-  private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String): Double = {
-    CarbonProperties.getInstance().addProperty(
-      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
-      "3"
-    )
-    spark.sql(s"drop table if exists $tableName")
-    time {
-      input.write
-        .format("carbondata")
-        .option("tableName", tableName)
-        .option("tempCSV", "false")
-        .option("single_pass", "true")
-        .option("dictionary_exclude", "id") // id is high cardinality column
-        .option("table_blocksize", "32")
-        .mode(SaveMode.Overwrite)
-        .save()
-    }
-  }
-
-  // load data into parquet, carbonV2, carbonV3
-  def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
-    val df = generateDataFrame(spark).cache
-    println(s"generating ${df.count} records, schema: ${df.schema}")
-    val table1Time = if (table1.endsWith("parquet")) {
-      loadParquetTable(spark, df, table1)
-    } else if (table1.endsWith("orc")) {
-      loadOrcTable(spark, df, table1)
-    } else {
-      sys.error("invalid table: " + table1)
-    }
-    val table2Time = loadCarbonTable(spark, df, table2)
-    println(s"load completed, time: $table1Time, $table2Time")
-    df.unpersist()
-  }
-
-  // Run all queries for the specified table
-  private def runQueries(spark: SparkSession, tableName: String): Unit = {
-    println(s"start running queries for $tableName...")
-    val start = System.currentTimeMillis()
-    println("90% time: xx.xx sec\t99% time: xx.xx sec\tlast time: xx.xx sec\t " +
-      "running query sql\taverage time: xx.xx sec\t result: show it when ResultIsEmpty is false")
-    queries.zipWithIndex.map { case (query, index) =>
-      val sqlText = query.sqlText.replace("$table", tableName)
-
-      val executorService = Executors.newFixedThreadPool(ThreadNum)
-      val tasks = new java.util.ArrayList[Callable[Results]]()
-
-      for (num <- 1 to TaskNum) {
-        tasks.add(new QueryTask(spark, sqlText))
-      }
-      val results = executorService.invokeAll(tasks)
-
-      val sql = s"query ${index + 1}: $sqlText "
-      printResult(results, sql)
-      executorService.shutdown()
-      executorService.awaitTermination(600, TimeUnit.SECONDS)
-
-      val taskTime = (System.currentTimeMillis() - start).toDouble / 1000
-      println("task time: " + taskTime.formatted("%.3f") + " s")
-    }
-  }
-
-  def printResult(results: java.util.List[Future[Results]], sql: String = "") {
-    val timeArray = new Array[Double](results.size())
-    val sqlResult = results.get(0).get().sqlResult
-    for (i <- 0 until results.size()) {
-      results.get(i).get()
-    }
-    for (i <- 0 until results.size()) {
-      timeArray(i) = results.get(i).get().time
-    }
-    val sortTimeArray = timeArray.sorted
-
-    // the time of 90 percent sql are finished
-    val time90 = ((sortTimeArray.length) * 0.9).toInt - 1
-    // the time of 99 percent sql are finished
-    val time99 = ((sortTimeArray.length) * 0.99).toInt - 1
-    print("90%:" + sortTimeArray(time90).formatted("%.3f") + " s," +
-      "\t99%:" + sortTimeArray(time99).formatted("%.3f") + " s," +
-      "\tlast:" + sortTimeArray.last.formatted("%.3f") + " s," +
-      "\t" + sql +
-      "\taverage:" + (timeArray.sum / timeArray.length).formatted("%.3f") + " s," +
-      "\t" + sqlResult.mkString(",") + "\t")
-  }
-
-  case class Results(time: Double, sqlResult: Array[Row])
-
-
-  class QueryTask(spark: SparkSession, query: String)
-    extends Callable[Results] with Serializable {
-    override def call(): Results = {
-      var result: Array[Row] = null
-      val rt = time {
-        result = spark.sql(query).head(1)
-      }
-      if (ResultIsEmpty) {
-        Results(rt, Array.empty[Row])
-      } else {
-        Results(rt, result)
-      }
-    }
-  }
-
-  // run testcases and print comparison result
-  def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
-    // run queries on parquet and carbon
-    runQueries(spark, table1)
-    // do GC and sleep for some time before running next table
-    System.gc()
-    Thread.sleep(1000)
-    System.gc()
-    Thread.sleep(1000)
-    runQueries(spark, table2)
-  }
-
-  def time(code: => Unit): Double = {
-    val start = System.currentTimeMillis()
-    code
-    // return time in second
-    (System.currentTimeMillis() - start).toDouble / 1000
-  }
-
-  def initParameters(arr: Array[String]): Unit = {
-    if (arr.length > 0) {
-      totalNum = arr(0).toInt
-    }
-    if (arr.length > 1) {
-      ThreadNum = arr(1).toInt
-    }
-    if (arr.length > 2) {
-      TaskNum = arr(2).toInt
-    }
-    if (arr.length > 3) {
-      ResultIsEmpty = if (arr(3).equalsIgnoreCase("true")) {
-        true
-      } else if (arr(3).equalsIgnoreCase("false")) {
-        true
-      } else {
-        throw new Exception("error parameter, should be true or false")
-      }
-    }
-  }
-
-  def main(args: Array[String]): Unit = {
-    CarbonProperties.getInstance()
-      .addProperty("carbon.enable.vector.reader", "true")
-      .addProperty("enable.unsafe.sort", "true")
-      .addProperty("carbon.blockletgroup.size.in.mb", "32")
-      .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
-    import org.apache.spark.sql.CarbonSession._
-    val rootPath = new File(this.getClass.getResource("/").getPath
-      + "../../../..").getCanonicalPath
-    val storeLocation = s"$rootPath/examples/spark2/target/store"
-
-    val spark = SparkSession
-      .builder()
-      .master("local[8]")
-      .enableHiveSupport()
-      .config("spark.driver.host", "127.0.0.1")
-      .getOrCreateCarbonSession(storeLocation)
-    spark.sparkContext.setLogLevel("warn")
-
-    initParameters(args)
-
-    val table1 = parquetTableName
-    val table2 = carbonTableName("3")
-    prepareTable(spark, table1, table2)
-    println("totalNum:" + totalNum + "\tThreadNum:" + ThreadNum +
-      "\tTaskNum:" + TaskNum + "\tResultIsEmpty:" + ResultIsEmpty)
-    runTest(spark, table1, table2)
-
-    CarbonUtil.deleteFoldersAndFiles(new File(table1))
-    spark.sql(s"drop table $table2")
-    spark.close()
-  }
-}
-
-// scalastyle:on println


Mime
View raw message