Integrating ML/DL frameworks with Spark

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Integrating ML/DL frameworks with Spark

rxin
Hi all,

Xiangrui and I were discussing with a heavy Apache Spark user last week on their experiences integrating machine learning (and deep learning) frameworks with Spark and some of their pain points. Couple things were obvious and I wanted to share our learnings with the list.

(1) Most organizations already use Spark for data plumbing and want to be able to run their ML part of the stack on Spark as well (not necessarily re-implementing all the algorithms but by integrating various frameworks like tensorflow, mxnet with Spark).

(2) The integration is however painful, from the systems perspective:

  • Performance: data exchange between Spark and other frameworks are slow, because UDFs across process boundaries (with native code) are slow. This works much better now with Pandas UDFs (given a lot of the ML/DL frameworks are in Python). However, there might be some low hanging fruit gaps here.
  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
  • Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of those resources, leading to either over-utilizing the accelerators or under-utilizing the CPUs.

The good thing is that none of these seem very difficult to address (and we have already made progress on one of them). Xiangrui has graciously accepted the challenge to come up with solutions and SPIP to these.

Xiangrui - please also chime in if I didn’t capture everything. 


Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Xiangrui Meng-2
Thanks Reynold for summarizing the offline discussion! I added a few comments inline. -Xiangrui

On Mon, May 7, 2018 at 5:37 PM Reynold Xin <[hidden email]> wrote:
Hi all,

Xiangrui and I were discussing with a heavy Apache Spark user last week on their experiences integrating machine learning (and deep learning) frameworks with Spark and some of their pain points. Couple things were obvious and I wanted to share our learnings with the list.

(1) Most organizations already use Spark for data plumbing and want to be able to run their ML part of the stack on Spark as well (not necessarily re-implementing all the algorithms but by integrating various frameworks like tensorflow, mxnet with Spark).

(2) The integration is however painful, from the systems perspective:

  • Performance: data exchange between Spark and other frameworks are slow, because UDFs across process boundaries (with native code) are slow. This works much better now with Pandas UDFs (given a lot of the ML/DL frameworks are in Python). However, there might be some low hanging fruit gaps here.
The Arrow support behind Pands UDFs can be reused to exchange data with other frameworks. And one possibly performance improvement is to support pipelining when supplying data to other frameworks. For example, while Spark is pumping data from external sources into TensorFlow, TensorFlow starts the computation on GPUs. This would significant improve speed and resource utilization.
  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
  • Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of those resources, leading to either over-utilizing the accelerators or under-utilizing the CPUs.

The good thing is that none of these seem very difficult to address (and we have already made progress on one of them). Xiangrui has graciously accepted the challenge to come up with solutions and SPIP to these.


I will do more home work, exploring existing JIRAs or creating new JIRAs for the proposal. We'd like to hear your feedback and past efforts along those directions if they were not fully captured by our JIRA.
 
Xiangrui - please also chime in if I didn’t capture everything. 


--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Jörn Franke
Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA scheduling, so it might be worth to have the last point generic that not only the Spark scheduler, but all supported schedulers can use GPU.

For the other 2 points I just wonder if it makes sense to address this in the ml frameworks themselves or in Spark.

On 8. May 2018, at 06:59, Xiangrui Meng <[hidden email]> wrote:

Thanks Reynold for summarizing the offline discussion! I added a few comments inline. -Xiangrui

On Mon, May 7, 2018 at 5:37 PM Reynold Xin <[hidden email]> wrote:
Hi all,

Xiangrui and I were discussing with a heavy Apache Spark user last week on their experiences integrating machine learning (and deep learning) frameworks with Spark and some of their pain points. Couple things were obvious and I wanted to share our learnings with the list.

(1) Most organizations already use Spark for data plumbing and want to be able to run their ML part of the stack on Spark as well (not necessarily re-implementing all the algorithms but by integrating various frameworks like tensorflow, mxnet with Spark).

(2) The integration is however painful, from the systems perspective:

  • Performance: data exchange between Spark and other frameworks are slow, because UDFs across process boundaries (with native code) are slow. This works much better now with Pandas UDFs (given a lot of the ML/DL frameworks are in Python). However, there might be some low hanging fruit gaps here.
The Arrow support behind Pands UDFs can be reused to exchange data with other frameworks. And one possibly performance improvement is to support pipelining when supplying data to other frameworks. For example, while Spark is pumping data from external sources into TensorFlow, TensorFlow starts the computation on GPUs. This would significant improve speed and resource utilization.
  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
  • Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of those resources, leading to either over-utilizing the accelerators or under-utilizing the CPUs.

The good thing is that none of these seem very difficult to address (and we have already made progress on one of them). Xiangrui has graciously accepted the challenge to come up with solutions and SPIP to these.


I will do more home work, exploring existing JIRAs or creating new JIRAs for the proposal. We'd like to hear your feedback and past efforts along those directions if they were not fully captured by our JIRA.
 
Xiangrui - please also chime in if I didn’t capture everything. 


--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

rxin
I don't think it's sufficient to have them in YARN (or any other services) without Spark aware of them. If Spark is not aware of them, then there is no way to really efficiently utilize these accelerators when you run anything that require non-accelerators (which is almost 100% of the cases in real world workloads).

For the other two, the point is not to implement all the ML/DL algorithms in Spark, but make Spark integrate well with ML/DL frameworks. Otherwise you will have the problems I described (super low performance when exchanging data between Spark and ML/DL frameworks, and hanging issues with MPI-based programs).


On Mon, May 7, 2018 at 10:05 PM Jörn Franke <[hidden email]> wrote:
Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA scheduling, so it might be worth to have the last point generic that not only the Spark scheduler, but all supported schedulers can use GPU.

For the other 2 points I just wonder if it makes sense to address this in the ml frameworks themselves or in Spark.

On 8. May 2018, at 06:59, Xiangrui Meng <[hidden email]> wrote:

Thanks Reynold for summarizing the offline discussion! I added a few comments inline. -Xiangrui

On Mon, May 7, 2018 at 5:37 PM Reynold Xin <[hidden email]> wrote:
Hi all,

Xiangrui and I were discussing with a heavy Apache Spark user last week on their experiences integrating machine learning (and deep learning) frameworks with Spark and some of their pain points. Couple things were obvious and I wanted to share our learnings with the list.

(1) Most organizations already use Spark for data plumbing and want to be able to run their ML part of the stack on Spark as well (not necessarily re-implementing all the algorithms but by integrating various frameworks like tensorflow, mxnet with Spark).

(2) The integration is however painful, from the systems perspective:

  • Performance: data exchange between Spark and other frameworks are slow, because UDFs across process boundaries (with native code) are slow. This works much better now with Pandas UDFs (given a lot of the ML/DL frameworks are in Python). However, there might be some low hanging fruit gaps here.
The Arrow support behind Pands UDFs can be reused to exchange data with other frameworks. And one possibly performance improvement is to support pipelining when supplying data to other frameworks. For example, while Spark is pumping data from external sources into TensorFlow, TensorFlow starts the computation on GPUs. This would significant improve speed and resource utilization.
  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
  • Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of those resources, leading to either over-utilizing the accelerators or under-utilizing the CPUs.

The good thing is that none of these seem very difficult to address (and we have already made progress on one of them). Xiangrui has graciously accepted the challenge to come up with solutions and SPIP to these.


I will do more home work, exploring existing JIRAs or creating new JIRAs for the proposal. We'd like to hear your feedback and past efforts along those directions if they were not fully captured by our JIRA.
 
Xiangrui - please also chime in if I didn’t capture everything. 


--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Jörn Franke
Hi,

You misunderstood me. I exactly wanted to say that Spark should be aware of them. So I agree with you. The point is to have also the yarn GPU/fpga scheduling as an option aside a potential spark GPU/fpga scheduler.

For the other proposal - yes the interfaces are slow, but one has to think in which part they need to be improved for optimal performance ml framework, Spark or in both. My gut feeling is in both. 

Best regards

Best regards

On 8. May 2018, at 07:11, Reynold Xin <[hidden email]> wrote:

I don't think it's sufficient to have them in YARN (or any other services) without Spark aware of them. If Spark is not aware of them, then there is no way to really efficiently utilize these accelerators when you run anything that require non-accelerators (which is almost 100% of the cases in real world workloads).

For the other two, the point is not to implement all the ML/DL algorithms in Spark, but make Spark integrate well with ML/DL frameworks. Otherwise you will have the problems I described (super low performance when exchanging data between Spark and ML/DL frameworks, and hanging issues with MPI-based programs).


On Mon, May 7, 2018 at 10:05 PM Jörn Franke <[hidden email]> wrote:
Hadoop / Yarn 3.1 added GPU scheduling. 3.2 is planned to add FPGA scheduling, so it might be worth to have the last point generic that not only the Spark scheduler, but all supported schedulers can use GPU.

For the other 2 points I just wonder if it makes sense to address this in the ml frameworks themselves or in Spark.

On 8. May 2018, at 06:59, Xiangrui Meng <[hidden email]> wrote:

Thanks Reynold for summarizing the offline discussion! I added a few comments inline. -Xiangrui

On Mon, May 7, 2018 at 5:37 PM Reynold Xin <[hidden email]> wrote:
Hi all,

Xiangrui and I were discussing with a heavy Apache Spark user last week on their experiences integrating machine learning (and deep learning) frameworks with Spark and some of their pain points. Couple things were obvious and I wanted to share our learnings with the list.

(1) Most organizations already use Spark for data plumbing and want to be able to run their ML part of the stack on Spark as well (not necessarily re-implementing all the algorithms but by integrating various frameworks like tensorflow, mxnet with Spark).

(2) The integration is however painful, from the systems perspective:

  • Performance: data exchange between Spark and other frameworks are slow, because UDFs across process boundaries (with native code) are slow. This works much better now with Pandas UDFs (given a lot of the ML/DL frameworks are in Python). However, there might be some low hanging fruit gaps here.
The Arrow support behind Pands UDFs can be reused to exchange data with other frameworks. And one possibly performance improvement is to support pipelining when supplying data to other frameworks. For example, while Spark is pumping data from external sources into TensorFlow, TensorFlow starts the computation on GPUs. This would significant improve speed and resource utilization.
  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
  • Accelerator-aware scheduling: The DL frameworks leverage GPUs and sometimes FPGAs as accelerators for speedup, and Spark’s scheduler isn’t aware of those resources, leading to either over-utilizing the accelerators or under-utilizing the CPUs.

The good thing is that none of these seem very difficult to address (and we have already made progress on one of them). Xiangrui has graciously accepted the challenge to come up with solutions and SPIP to these.


I will do more home work, exploring existing JIRAs or creating new JIRAs for the proposal. We'd like to hear your feedback and past efforts along those directions if they were not fully captured by our JIRA.
 
Xiangrui - please also chime in if I didn’t capture everything. 


--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Shivaram Venkataraman
In reply to this post by Xiangrui Meng-2

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com


Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Naveen Swamy
I am committer on the MXNet project and very interested in working on Integrating with Spark. 
I am wondering how would training proceed in case of 
1)  training is done on one host with multiple GPUs -- I don't know if Spark's capabilities can leveraged here
2) distributed training with data parallelism -- how can we leverage Spark's map reduce model to fit distributed training. model of execution here is more of iterative in nature.

Please let me know.

Thanks, Naveen



On Tue, May 8, 2018 at 8:53 AM, Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com



Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

rxin
In reply to this post by Shivaram Venkataraman
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com


Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Nan Zhu
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com



Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

rxin
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com



Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Nan Zhu
.....how I skipped the last part........

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <[hidden email]> wrote:
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com




Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Xiangrui Meng-2
Shivaram: Yes, we can call it "gang scheduling" or "barrier synchronization". Spark doesn't support it now. The proposal is to have a proper support in Spark's job scheduler, so we can integrate well with MPI-like frameworks.

On Tue, May 8, 2018 at 11:17 AM Nan Zhu <[hidden email]> wrote:
.....how I skipped the last part........

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <[hidden email]> wrote:
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com




--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com

Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Bryan Cutler
Thanks for starting this discussion, I'd also like to see some improvements in this area and glad to hear that the Pandas UDFs / Arrow functionality might be useful.  I'm wondering if from your initial investigations you found anything lacking from the Arrow format or possible improvements that would simplify the data representation?  Also, while data could be handed off in a UDF, would it make sense to also discuss a more formal way to externalize the data in a way that would also work for the Scala API?

Thanks,
Bryan

On Wed, May 9, 2018 at 4:31 PM, Xiangrui Meng <[hidden email]> wrote:
Shivaram: Yes, we can call it "gang scheduling" or "barrier synchronization". Spark doesn't support it now. The proposal is to have a proper support in Spark's job scheduler, so we can integrate well with MPI-like frameworks.


On Tue, May 8, 2018 at 11:17 AM Nan Zhu <[hidden email]> wrote:
.....how I skipped the last part........

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <[hidden email]> wrote:
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com




--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com


Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Daniel Galvez
In reply to this post by rxin
Hi all,

Paul Ogilvie pointed this thread out to me; we overlapped a little at LinkedIn. It’s good to see that this kind of discussion is going on!

I have some thoughts regarding the discussion going on:

- Practically speaking, one of the lowest hanging fruit is the ability for Spark to request GPUs (and in general, devices). I would be happy to implement this myself, if I were given the go-ahead. I’m familiar with only YARN, not the Mesos or Kubernetes resource schedulers, though. It would be best to be forward-looking and think about how to request arbitrary linux devices rather than just GPUs.

- The discussion here regarding ML/DL seems to focus on DL in particular, and the DL discussion seems to focus vaguely on data-parallel deep learning training.This is probably a fine starting point.

- It is generally challenging to utilize a GPU fully in each kernel call, but there are solutions like CUDA MPS to virtualize a physical GPU as many smaller GPUs. However, each physical GPU is still represented as a single character device, e.g., /dev/nvidia0. This does not mesh well with YARN’s GPU isolation by putting each executor in its own cgroup, with only specific *physical* character devices whitelisted. Alas. Supporting CUDA MPS would be good to keep in mind for inference workloads. I could elaborate if desired.

- For things like all-reduce to work well, you need to keep in mind your I/O bandwidth. This means that you need to keep in mind your “topology” of your compute devices (be they CPU, GPUs, FPGAs, IPUs, or whatever). I’m not sure if Spark is already aware of this at the ethernet level, forgive me. But I am certain that it is not aware of this at the PCIe level. Ring all-reduce does this automatically for you in some sense when it creates its “ring", but only if you give it control of your full topology, which is the traditional MPI style (i.e., you’re normally not sharing a node with other jobs with MPI). Secondly, Infiniband connections exist for GPUs to talk directly to one another via what is called “GPUDirect", effectively bypassing the CPU and running at the highest bandwidth possible today. This is a very popular approach, and not something that Spark would seemingly be able to touch. So I question Spark’s ability to have a hand in large-scale distributed training of deep learning models.

- I would want to know more about claims of UDFs being slow. For perspective, PCI express Gen 3 (Gen 4 is not out yet…) has 12 GB/s bandwidth effectively. Split among 4 GPUs, you have 3 GB/s. In high performance computing, this is always considered the bottleneck.

Anyway, this is something I’m particularly interested in. Feel free to poke me if you want me to answer a specific question.

Sincerely,
Daniel

On 2018/05/09 23:31:10, Xiangrui Meng <[hidden email]> wrote:

> Shivaram: Yes, we can call it "gang scheduling" or "barrier
> synchronization". Spark doesn't support it now. The proposal is to have a
> proper support in Spark's job scheduler, so we can integrate well with
> MPI-like frameworks.
>
> On Tue, May 8, 2018 at 11:17 AM Nan Zhu <[hidden email]> wrote:
>
> > .....how I skipped the last part........
> >
> > On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <[hidden email]> wrote:
> >
> >> Yes, Nan, totally agree. To be on the same page, that's exactly what I
> >> wrote wasn't it?
> >>
> >> On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
> >>
> >>> besides that, one of the things which is needed by multiple frameworks
> >>> is to schedule tasks in a single wave
> >>>
> >>> i.e.
> >>>
> >>> if some frameworks like xgboost/mxnet requires 50 parallel workers,
> >>> Spark is desired to provide a capability to ensure that either we run 50
> >>> tasks at once, or we should quit the complete application/job after some
> >>> timeout period
> >>>
> >>> Best,
> >>>
> >>> Nan
> >>>
> >>> On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]>
> >>> wrote:
> >>>
> >>>> I think that's what Xiangrui was referring to. Instead of retrying a
> >>>> single task, retry the entire stage, and the entire stage of tasks need to
> >>>> be scheduled all at once.
> >>>>
> >>>>
> >>>> On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <
> >>>> [hidden email]> wrote:
> >>>>
> >>>>>
> >>>>>>
> >>>>>>>    - Fault tolerance and execution model: Spark assumes
> >>>>>>>    fine-grained task recovery, i.e. if something fails, only that task is
> >>>>>>>    rerun. This doesn’t match the execution model of distributed ML/DL
> >>>>>>>    frameworks that are typically MPI-based, and rerunning a single task would
> >>>>>>>    lead to the entire system hanging. A whole stage needs to be re-run.
> >>>>>>>
> >>>>>>> This is not only useful for integrating with 3rd-party frameworks,
> >>>>>> but also useful for scaling MLlib algorithms. One of my earliest attempts
> >>>>>> in Spark MLlib was to implement All-Reduce primitive (SPARK-1485
> >>>>>> <https://issues.apache.org/jira/browse/SPARK-1485>). But we ended up
> >>>>>> with some compromised solutions. With the new execution model, we can set
> >>>>>> up a hybrid cluster and do all-reduce properly.
> >>>>>>
> >>>>>>
> >>>>> Is there a particular new execution model you are referring to or do
> >>>>> we plan to investigate a new execution model ?  For the MPI-like model, we
> >>>>> also need gang scheduling (i.e. schedule all tasks at once or none of them)
> >>>>> and I dont think we have support for that in the scheduler right now.
> >>>>>
> >>>>>>
> >>>>>>> --
> >>>>>>
> >>>>>> Xiangrui Meng
> >>>>>>
> >>>>>> Software Engineer
> >>>>>>
> >>>>>> Databricks Inc. [image: http://databricks.com]
> >>>>>> <http://databricks.com/>
> >>>>>>
> >>>>>
> >>>>>
> >>>
> > --
>
> Xiangrui Meng
>
> Software Engineer
>
> Databricks Inc. [image: http://databricks.com] <http://databricks.com/>
>
---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Felix Cheung
In reply to this post by Bryan Cutler
Very cool. We would be very interested in this.

What is the plan forward to make progress in each of the three areas?



From: Bryan Cutler <[hidden email]>
Sent: Monday, May 14, 2018 11:37:20 PM
To: Xiangrui Meng
Cc: Reynold Xin; dev
Subject: Re: Integrating ML/DL frameworks with Spark
 
Thanks for starting this discussion, I'd also like to see some improvements in this area and glad to hear that the Pandas UDFs / Arrow functionality might be useful.  I'm wondering if from your initial investigations you found anything lacking from the Arrow format or possible improvements that would simplify the data representation?  Also, while data could be handed off in a UDF, would it make sense to also discuss a more formal way to externalize the data in a way that would also work for the Scala API?

Thanks,
Bryan

On Wed, May 9, 2018 at 4:31 PM, Xiangrui Meng <[hidden email]> wrote:
Shivaram: Yes, we can call it "gang scheduling" or "barrier synchronization". Spark doesn't support it now. The proposal is to have a proper support in Spark's job scheduler, so we can integrate well with MPI-like frameworks.


On Tue, May 8, 2018 at 11:17 AM Nan Zhu <[hidden email]> wrote:
.....how I skipped the last part........

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <[hidden email]> wrote:
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com




--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com


Reply | Threaded
Open this post in threaded view
|

Re: Integrating ML/DL frameworks with Spark

Xiangrui Meng-2
Hi all,

Thanks for your feedback! I uploaded a SPIP doc for the barrier scheduling feature at https://issues.apache.org/jira/browse/SPARK-24374. Please take a look and leave your comments there. I had some offline discussion with [hidden email] to help me design the APIs. He is quite familiar with Spark job scheduler and he will share some design ideas on the JIRA.

I will work on SPIPs for the other two proposals: 1) fast data exchange, 2) accelerator-aware scheduling. I definitely need some help for the second one because I'm not familiar with YARN/Mesos/k8s.

Best,
Xiangrui

On Sun, May 20, 2018 at 8:19 PM Felix Cheung <[hidden email]> wrote:
Very cool. We would be very interested in this.

What is the plan forward to make progress in each of the three areas?



From: Bryan Cutler <[hidden email]>
Sent: Monday, May 14, 2018 11:37:20 PM
To: Xiangrui Meng
Cc: Reynold Xin; dev

Subject: Re: Integrating ML/DL frameworks with Spark
Thanks for starting this discussion, I'd also like to see some improvements in this area and glad to hear that the Pandas UDFs / Arrow functionality might be useful.  I'm wondering if from your initial investigations you found anything lacking from the Arrow format or possible improvements that would simplify the data representation?  Also, while data could be handed off in a UDF, would it make sense to also discuss a more formal way to externalize the data in a way that would also work for the Scala API?

Thanks,
Bryan

On Wed, May 9, 2018 at 4:31 PM, Xiangrui Meng <[hidden email]> wrote:
Shivaram: Yes, we can call it "gang scheduling" or "barrier synchronization". Spark doesn't support it now. The proposal is to have a proper support in Spark's job scheduler, so we can integrate well with MPI-like frameworks.


On Tue, May 8, 2018 at 11:17 AM Nan Zhu <[hidden email]> wrote:
.....how I skipped the last part........

On Tue, May 8, 2018 at 11:16 AM, Reynold Xin <[hidden email]> wrote:
Yes, Nan, totally agree. To be on the same page, that's exactly what I wrote wasn't it?

On Tue, May 8, 2018 at 11:14 AM Nan Zhu <[hidden email]> wrote:
besides that, one of the things which is needed by multiple frameworks is to schedule tasks in a single wave

i.e.

if some frameworks like xgboost/mxnet requires 50 parallel workers, Spark is desired to provide a capability to ensure that either we run 50 tasks at once, or we should quit the complete application/job after some timeout period

Best,

Nan

On Tue, May 8, 2018 at 11:10 AM, Reynold Xin <[hidden email]> wrote:
I think that's what Xiangrui was referring to. Instead of retrying a single task, retry the entire stage, and the entire stage of tasks need to be scheduled all at once.


On Tue, May 8, 2018 at 8:53 AM Shivaram Venkataraman <[hidden email]> wrote:

  • Fault tolerance and execution model: Spark assumes fine-grained task recovery, i.e. if something fails, only that task is rerun. This doesn’t match the execution model of distributed ML/DL frameworks that are typically MPI-based, and rerunning a single task would lead to the entire system hanging. A whole stage needs to be re-run.
This is not only useful for integrating with 3rd-party frameworks, but also useful for scaling MLlib algorithms. One of my earliest attempts in Spark MLlib was to implement All-Reduce primitive (SPARK-1485). But we ended up with some compromised solutions. With the new execution model, we can set up a hybrid cluster and do all-reduce properly.
 
Is there a particular new execution model you are referring to or do we plan to investigate a new execution model ?  For the MPI-like model, we also need gang scheduling (i.e. schedule all tasks at once or none of them) and I dont think we have support for that in the scheduler right now. 

--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com




--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com


--

Xiangrui Meng

Software Engineer

Databricks Inc. http://databricks.com