re: overcommit: cpus / vcores

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

re: overcommit: cpus / vcores

peterliu
Hi there,

I have a system with 80 vcores and a relatively light spark streaming workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a) below) seems to help to improve the average spark batch time (see (b) below).

Is there any best practice guideline on resource overcommit with cpu / vcores, such as yarn config options, candidate cases ideal for overcommiting vcores etc.?

the slide below (from 2016 though) seems to address the memory overcommit topic and hint a "future" topic on cpu overcommit:

Would like to know if this is a reasonable config practice and why this is not achievable without overcommit. Any help/hint would be very much appreciated!

Thanks!

Peter

(a) yarn-site.xml
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>


(b)
FYI:
I have a system with 80 vcores and a relatively light spark streaming workload. overcomming the vocore resource (here 100) seems to help the average spark batch time. need more understanding on this practice.
Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg user cpu (%) nw read (mb/sec)
70vocres 178.20 8154.69 n/a n/a
80vocres 177.40 7865.44 27.85 222.31
100vcores 177.00 7,209.37 30.02 220.86

Reply | Threaded
Open this post in threaded view
|

Re: overcommit: cpus / vcores

Khaled Zaouk
Hi Peter,

What parameters are you putting in your spark streaming configuration? What are you putting as number of executor instances and how many cores per executor are you setting in your Spark job?

Best,

Khaled

On Mon, Oct 15, 2018 at 9:18 PM Peter Liu <[hidden email]> wrote:
Hi there,

I have a system with 80 vcores and a relatively light spark streaming workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a) below) seems to help to improve the average spark batch time (see (b) below).

Is there any best practice guideline on resource overcommit with cpu / vcores, such as yarn config options, candidate cases ideal for overcommiting vcores etc.?

the slide below (from 2016 though) seems to address the memory overcommit topic and hint a "future" topic on cpu overcommit:

Would like to know if this is a reasonable config practice and why this is not achievable without overcommit. Any help/hint would be very much appreciated!

Thanks!

Peter

(a) yarn-site.xml
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>


(b)
FYI:
I have a system with 80 vcores and a relatively light spark streaming workload. overcomming the vocore resource (here 100) seems to help the average spark batch time. need more understanding on this practice.
Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg user cpu (%) nw read (mb/sec)
70vocres 178.20 8154.69 n/a n/a
80vocres 177.40 7865.44 27.85 222.31
100vcores 177.00 7,209.37 30.02 220.86

Reply | Threaded
Open this post in threaded view
|

Re: overcommit: cpus / vcores

peterliu
Hi Khaled,

I have attached the spark streaming config below in (a).
In case of the 100vcore run (see the initial email), I used 50 executors where each executor has 2 vcores and 3g memory. For 70 vcore case, 35 executors, for 80 vcore case, 40 executors.
In the yarn config (yarn-site.xml, (b) below), the  available vcores set over 80 (I call it "overcommit").

Not sure if there is a more proper way to do this (overcommit) and what would be the best practice in this type of situation (say, light cpu workload in a dedicated yarn cluster) to increase the cpu utilization for a better performance.

Any help would be very much appreciated.

Thanks ...

Peter

(a)

   val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) 
      .option("startingOffsets", "latest")
      .option("subscribe", Variables.EVENTS_TOPIC)
      .option("kafkaConsumer.pollTimeoutMs", "5000")
      .load()    
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))  
      .groupBy(millisTime(window($"event_time", "10 seconds").getField("start")) as 'time_window, $"campaign_id") 
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) //original
      .option("topic", Variables.OUTPUT_TOPIC)  
      .option("checkpointLocation", s"/tmp/${java.util.UUID.randomUUID()}") //TBD: ram disk?
      .outputMode("update")
      .start()

(b)
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>

On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk <[hidden email]> wrote:
Hi Peter,

What parameters are you putting in your spark streaming configuration? What are you putting as number of executor instances and how many cores per executor are you setting in your Spark job?

Best,

Khaled

On Mon, Oct 15, 2018 at 9:18 PM Peter Liu <[hidden email]> wrote:
Hi there,

I have a system with 80 vcores and a relatively light spark streaming workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a) below) seems to help to improve the average spark batch time (see (b) below).

Is there any best practice guideline on resource overcommit with cpu / vcores, such as yarn config options, candidate cases ideal for overcommiting vcores etc.?

the slide below (from 2016 though) seems to address the memory overcommit topic and hint a "future" topic on cpu overcommit:

Would like to know if this is a reasonable config practice and why this is not achievable without overcommit. Any help/hint would be very much appreciated!

Thanks!

Peter

(a) yarn-site.xml
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>


(b)
FYI:
I have a system with 80 vcores and a relatively light spark streaming workload. overcomming the vocore resource (here 100) seems to help the average spark batch time. need more understanding on this practice.
Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg user cpu (%) nw read (mb/sec)
70vocres 178.20 8154.69 n/a n/a
80vocres 177.40 7865.44 27.85 222.31
100vcores 177.00 7,209.37 30.02 220.86

Reply | Threaded
Open this post in threaded view
|

Re: overcommit: cpus / vcores

Khaled Zaouk
Hi Peter,

I actually meant the spark configuration that you put in your spark-submit program (such as --conf spark.executor.instances= ..., --conf spark.executor.memory= ..., etc...).

I advice you to check the number of partitions that you get in each stage of your workload the Spark GUI while the workload is running. I feel like this number is beyond 80, and this is why overcommiting cpu cores can achieve better latency if the workload is not cpu intensive.

Another question, did you try different values for spark.executor.cores? (for example 3, 4 or 5 cores per executor in addition to 2?) Try to play a little bit with this parameter and check how it affects your latency...

Best,

Khaled



On Tue, Oct 16, 2018 at 3:06 AM Peter Liu <[hidden email]> wrote:
Hi Khaled,

I have attached the spark streaming config below in (a).
In case of the 100vcore run (see the initial email), I used 50 executors where each executor has 2 vcores and 3g memory. For 70 vcore case, 35 executors, for 80 vcore case, 40 executors.
In the yarn config (yarn-site.xml, (b) below), the  available vcores set over 80 (I call it "overcommit").

Not sure if there is a more proper way to do this (overcommit) and what would be the best practice in this type of situation (say, light cpu workload in a dedicated yarn cluster) to increase the cpu utilization for a better performance.

Any help would be very much appreciated.

Thanks ...

Peter

(a)

   val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) 
      .option("startingOffsets", "latest")
      .option("subscribe", Variables.EVENTS_TOPIC)
      .option("kafkaConsumer.pollTimeoutMs", "5000")
      .load()    
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))  
      .groupBy(millisTime(window($"event_time", "10 seconds").getField("start")) as 'time_window, $"campaign_id") 
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) //original
      .option("topic", Variables.OUTPUT_TOPIC)  
      .option("checkpointLocation", s"/tmp/${java.util.UUID.randomUUID()}") //TBD: ram disk?
      .outputMode("update")
      .start()

(b)
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>

On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk <[hidden email]> wrote:
Hi Peter,

What parameters are you putting in your spark streaming configuration? What are you putting as number of executor instances and how many cores per executor are you setting in your Spark job?

Best,

Khaled

On Mon, Oct 15, 2018 at 9:18 PM Peter Liu <[hidden email]> wrote:
Hi there,

I have a system with 80 vcores and a relatively light spark streaming workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a) below) seems to help to improve the average spark batch time (see (b) below).

Is there any best practice guideline on resource overcommit with cpu / vcores, such as yarn config options, candidate cases ideal for overcommiting vcores etc.?

the slide below (from 2016 though) seems to address the memory overcommit topic and hint a "future" topic on cpu overcommit:

Would like to know if this is a reasonable config practice and why this is not achievable without overcommit. Any help/hint would be very much appreciated!

Thanks!

Peter

(a) yarn-site.xml
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>


(b)
FYI:
I have a system with 80 vcores and a relatively light spark streaming workload. overcomming the vocore resource (here 100) seems to help the average spark batch time. need more understanding on this practice.
Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg user cpu (%) nw read (mb/sec)
70vocres 178.20 8154.69 n/a n/a
80vocres 177.40 7865.44 27.85 222.31
100vcores 177.00 7,209.37 30.02 220.86

Reply | Threaded
Open this post in threaded view
|

Re: configure yarn to use more vcores as the node provides?

peterliu
Hi Khaled,

the 50-2-3g I mentioned below is meant for the --conf spark.executor.* config, in particular,  spark.executor.instances=50,  spark.executor.cores=2 and  spark.executor.memory=3g.
for each run, I configured the streaming producer and kafka broker to have the partitions aligned with the consumer side (in this case, partition is 100), viewable on the spark UI.

yarn scheduler does not seem to check the actual hardware threads (logical cores) on the actual node. on the other hand, there seems to be some mechanism in yarn to allow overcommit for certain jobs (with high or low watermark etc).

not sure  how this should work in a good practice. Would appreciate any hint from the experts here.

Thanks for your reply!

Peter / Gang

On Tue, Oct 16, 2018 at 3:43 AM Khaled Zaouk <[hidden email]> wrote:
Hi Peter,

I actually meant the spark configuration that you put in your spark-submit program (such as --conf spark.executor.instances= ..., --conf spark.executor.memory= ..., etc...).

I advice you to check the number of partitions that you get in each stage of your workload the Spark GUI while the workload is running. I feel like this number is beyond 80, and this is why overcommiting cpu cores can achieve better latency if the workload is not cpu intensive.

Another question, did you try different values for spark.executor.cores? (for example 3, 4 or 5 cores per executor in addition to 2?) Try to play a little bit with this parameter and check how it affects your latency...

Best,

Khaled



On Tue, Oct 16, 2018 at 3:06 AM Peter Liu <[hidden email]> wrote:
Hi Khaled,

I have attached the spark streaming config below in (a).
In case of the 100vcore run (see the initial email), I used 50 executors where each executor has 2 vcores and 3g memory. For 70 vcore case, 35 executors, for 80 vcore case, 40 executors.
In the yarn config (yarn-site.xml, (b) below), the  available vcores set over 80 (I call it "overcommit").

Not sure if there is a more proper way to do this (overcommit) and what would be the best practice in this type of situation (say, light cpu workload in a dedicated yarn cluster) to increase the cpu utilization for a better performance.

Any help would be very much appreciated.

Thanks ...

Peter

(a)

   val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) 
      .option("startingOffsets", "latest")
      .option("subscribe", Variables.EVENTS_TOPIC)
      .option("kafkaConsumer.pollTimeoutMs", "5000")
      .load()    
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))  
      .groupBy(millisTime(window($"event_time", "10 seconds").getField("start")) as 'time_window, $"campaign_id") 
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) //original
      .option("topic", Variables.OUTPUT_TOPIC)  
      .option("checkpointLocation", s"/tmp/${java.util.UUID.randomUUID()}") //TBD: ram disk?
      .outputMode("update")
      .start()

(b)
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>

On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk <[hidden email]> wrote:
Hi Peter,

What parameters are you putting in your spark streaming configuration? What are you putting as number of executor instances and how many cores per executor are you setting in your Spark job?

Best,

Khaled

On Mon, Oct 15, 2018 at 9:18 PM Peter Liu <[hidden email]> wrote:
Hi there,

I have a system with 80 vcores and a relatively light spark streaming workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a) below) seems to help to improve the average spark batch time (see (b) below).

Is there any best practice guideline on resource overcommit with cpu / vcores, such as yarn config options, candidate cases ideal for overcommiting vcores etc.?

the slide below (from 2016 though) seems to address the memory overcommit topic and hint a "future" topic on cpu overcommit:

Would like to know if this is a reasonable config practice and why this is not achievable without overcommit. Any help/hint would be very much appreciated!

Thanks!

Peter

(a) yarn-site.xml
<property>
    <name>yarn.nodemanager.resource.cpu-vcores</name>
    <value>110</value>
</property>

<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>110</value>
</property>


(b)
FYI:
I have a system with 80 vcores and a relatively light spark streaming workload. overcomming the vocore resource (here 100) seems to help the average spark batch time. need more understanding on this practice.
Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms (avg) avg user cpu (%) nw read (mb/sec)
70vocres 178.20 8154.69 n/a n/a
80vocres 177.40 7865.44 27.85 222.31
100vcores 177.00 7,209.37 30.02 220.86