How to tune the performance of Tpch query5 within Spark

classic Classic list List threaded Threaded
1 message Options
163
Reply | Threaded
Open this post in threaded view
|

How to tune the performance of Tpch query5 within Spark

163


I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders”).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region”).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation”)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G, stored data on hdfs with parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.
Wenting He.