How to tune the performance of Tpch query5 within Spark

classic Classic list List threaded Threaded
8 messages Options
163
Reply | Threaded
Open this post in threaded view
|

How to tune the performance of Tpch query5 within Spark

163
I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He

Reply | Threaded
Open this post in threaded view
|

Re: How to tune the performance of Tpch query5 within Spark

cloud0fan
Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He


163
Reply | Threaded
Open this post in threaded view
|

Re: How to tune the performance of Tpch query5 within Spark

163
I change the UDF but the performance seems still slow. What can I do else?


在 2017年7月14日,下午8:34,Wenchen Fan <[hidden email]> 写道:

Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He



Reply | Threaded
Open this post in threaded view
|

Re: How to tune the performance of Tpch query5 within Spark

vaquarkhan
Could you please let us know your Spark version?


Regards, 
vaquar khan 

On Jul 17, 2017 12:18 AM, "163" <[hidden email]> wrote:
I change the UDF but the performance seems still slow. What can I do else?


在 2017年7月14日,下午8:34,Wenchen Fan <[hidden email]> 写道:

Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He



163
Reply | Threaded
Open this post in threaded view
|

Re: How to tune the performance of Tpch query5 within Spark

163
2.1.1

发自网易邮箱大师
On 07/17/2017 20:55, [hidden email] wrote:
Could you please let us know your Spark version?


Regards, 
vaquar khan 

On Jul 17, 2017 12:18 AM, "163" <[hidden email]> wrote:
I change the UDF but the performance seems still slow. What can I do else?


在 2017年7月14日,下午8:34,Wenchen Fan <[hidden email]> 写道:

Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He



Reply | Threaded
Open this post in threaded view
|

Re: How to tune the performance of Tpch query5 within Spark

vaquarkhan
Verify your configuration, following link covered all Spark tuning points.


Regards,
Vaquar khan 

On Jul 17, 2017 6:56 AM, "何文婷" <[hidden email]> wrote:
2.1.1

发自网易邮箱大师
On 07/17/2017 20:55, [hidden email] wrote:
Could you please let us know your Spark version?


Regards, 
vaquar khan 

On Jul 17, 2017 12:18 AM, "163" <[hidden email]> wrote:
I change the UDF but the performance seems still slow. What can I do else?


在 2017年7月14日,下午8:34,Wenchen Fan <[hidden email]> 写道:

Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He




Reply | Threaded
Open this post in threaded view
|

Re: How to tune the performance of Tpch query5 within Spark

Pralabh Kumar
In reply to this post by vaquarkhan
Hi

To read file parallely , you can follow the below code.


 case class readData (fileName : String , spark : SparkSession) extends Callable[Dataset[Row]]{
  override def call(): Dataset[Row] = {
    spark.read.parquet(fileName)
   // spark.read.csv(fileName)
  }
}

val spark =  SparkSession.builder()
     .appName("practice")
     .config("spark.scheduler.mode","FAIR")
     .enableHiveSupport().getOrCreate()
   val pool = Executors.newFixedThreadPool(6)
   val list = new util.ArrayList[Future[Dataset[Row]]]()
  
   for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
     val o1 = new readData(fileName,spark)
     //pool.submit(o1).
     list.add(pool.submit(o1))
   }
   val rddList = new ArrayBuffer[Dataset[Row]]()
   for(result <- list){
     rddList += result.get()
   }

   pool.shutdown()
   pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
   for(finalData<-rddList){
     finalData.show()
   }


This will read data in parallel ,which I think is your main bottleneck.

Regards
Pralabh Kumar



On Mon, Jul 17, 2017 at 6:25 PM, vaquar khan <[hidden email]> wrote:
Could you please let us know your Spark version?


Regards, 
vaquar khan 


On Jul 17, 2017 12:18 AM, "163" <[hidden email]> wrote:
I change the UDF but the performance seems still slow. What can I do else?


在 2017年7月14日,下午8:34,Wenchen Fan <[hidden email]> 写道:

Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He




163
Reply | Threaded
Open this post in threaded view
|

How to tune the performance of TpchQuery5 Multi Dataframe join within Spark

163
Thank you, I’ve tried to read parquet file parallely, and the execution time reduce about 15s, from 1.5m to 1.3m . Thanks a lot. @Pralabh Kumar
And I follow the link of Spark tuning points, when I set “spark.sql.shuffle.partitions” to 320/480. running time is reduced to 1.1 minutes 

My configuration on conf/spark-default.conf:
spark.local.dir /dev/shm/hewenting/tmp
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=400
spark.sql.codegen.wholeStage false
spark.sql.shuffle.partitions 480
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.default.parallelism 100
spark.scheduler.mode FAIR

Below is the main running job. How can I speedup this query? 



My code:
  def main(args: Array[String]): Unit = {
    case class readData (fileName : String , spark : SparkSession) extends Callable[Dataset[Row]]{
      override def call(): Dataset[Row] = {
        spark.read.parquet(fileName)
      }
    }

    val spark =  SparkSession.builder()
      .appName("practice")
      .config("spark.master", "<a href="spark://10.61.2.127:7077" class="">spark://10.61.2.127:7077")
      .config("spark.scheduler.mode","FAIR")
       .getOrCreate()
    import spark.implicits._
    val pool: ExecutorService = Executors.newFixedThreadPool(6)
    val list = new ArrayList[Future[Dataset[Row]]]()

    for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
      val o1 = new readData("<a href="hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/" class="">hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/" + fileName,spark)
      val tmp: Future[Dataset[Row]]= pool.submit(o1)
      list.add(tmp)
    }
    val rddList = new ArrayBuffer[Dataset[Row]]()
    for (i <- 0 to 5) {
      rddList += list.get(i).get()
    }
    pool.shutdown()
    pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)

    val forders = rddList(0).filter("o_orderdate < '1995-01-01' and o_orderdate >= '1994-01-01'").select("o_custkey", "o_orderkey")
    val flineitem = rddList(1).select("l_orderkey", "l_suppkey", "l_discount", "l_extendedprice")
    val fregion = rddList(4).where("r_name = 'ASIA'").select("r_regionkey")
    val fnation = rddList(5).select("n_nationkey", "n_regionkey", "n_name")
    val fsupplier = rddList(3).select("s_nationkey", "s_suppkey")
    val fcustomer = rddList(2).select("c_custkey", "c_nationkey")
      val res = flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
        .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
        .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
        .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
        .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
        .select($"n_name", ($"l_extendedprice" * (lit(1) - $"l_discount")).as("value"))
        .groupBy($"n_name")
        .agg(sum($"value").as("revenue"))
        .sort($"revenue".desc)
      println(res.collect())
}




在 2017年7月17日,下午9:50,Pralabh Kumar <[hidden email]> 写道:

Hi

To read file parallely , you can follow the below code.


 case class readData (fileName : String , spark : SparkSession) extends Callable[Dataset[Row]]{
  override def call(): Dataset[Row] = {
    spark.read.parquet(fileName)
   // spark.read.csv(fileName)
  }
}

val spark =  SparkSession.builder()
     .appName("practice")
     .config("spark.scheduler.mode","FAIR")
     .enableHiveSupport().getOrCreate()
   val pool = Executors.newFixedThreadPool(6)
   val list = new util.ArrayList[Future[Dataset[Row]]]()
  
   for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
     val o1 = new readData(fileName,spark)
     //pool.submit(o1).
     list.add(pool.submit(o1))
   }
   val rddList = new ArrayBuffer[Dataset[Row]]()
   for(result <- list){
     rddList += result.get()
   }

   pool.shutdown()
   pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
   for(finalData<-rddList){
     finalData.show()
   }


This will read data in parallel ,which I think is your main bottleneck.

Regards
Pralabh Kumar



On Mon, Jul 17, 2017 at 6:25 PM, vaquar khan <[hidden email]> wrote:
Could you please let us know your Spark version?


Regards, 
vaquar khan 


On Jul 17, 2017 12:18 AM, "163" <[hidden email]> wrote:
I change the UDF but the performance seems still slow. What can I do else?


在 2017年7月14日,下午8:34,Wenchen Fan <[hidden email]> 写道:

Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.

On 14 Jul 2017, at 5:46 PM, 163 <[hidden email]> wrote:

I modify the tech query5 to DataFrame:
val forders = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders).filter("o_orderdate < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region).where("r_name = 'ASIA'").select($"r_regionkey")
val fnation = spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
.join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
.join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" === fsupplier("s_nationkey"))
.join(fnation, $"s_nationkey" === fnation("n_nationkey"))
.join(fregion, $"n_regionkey" === fregion("r_regionkey"))
.select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
.groupBy($"n_name")
.agg(sum($"value").as("revenue"))
.sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He