spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 163 <hewenting_...@163.com>
Subject How to tune the performance of Tpch query5 within Spark
Date Fri, 14 Jul 2017 09:46:48 GMT
I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders>”).filter("o_orderdate
< 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem>")
val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer>")
val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier>")
val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region>”).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
<hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation>”)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
     .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
     .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" ===
fsupplier("s_nationkey"))
     .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
     .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
     .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
     .groupBy($"n_name")
     .agg(sum($"value").as("revenue"))
     .sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores
and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential,
How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1,
But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He


Mime
View raw message