barrier execution mode with DataFrame and dynamic allocation

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

barrier execution mode with DataFrame and dynamic allocation

Ilya Matiach-2

[Note: I sent this earlier but it looks like the email was blocked because I had another email group on the CC line]

Hi Spark Dev,

I would like to use the new barrier execution mode introduced in spark 2.4 with LightGBM in the spark package mmlspark but I ran into some issues and I had a couple questions.

Currently, the LightGBM distributed learner tries to figure out the number of cores on the cluster and then does a coalesce and a mapPartitions, and inside the mapPartitions we do a NetworkInit (where the address:port of all workers needs to be passed in the constructor) and pass the data in-memory to the native layer of the distributed lightgbm learner.

 

With barrier execution mode, I think the code would become much more robust.  However, there are several issues that I am running into when trying to move my code over to the new barrier execution mode scheduler:

  1. Does not support dynamic allocation – however, I think it would be convenient if it restarted the job when the number of workers has decreased and allowed the dev to decide whether to restart the job if the number of workers increased
  2. Does not work with DataFrame or Dataset API, but I think it would be much more convenient if it did
  3. How does barrier execution mode deal with #partitions > #tasks?  If the number of partitions is larger than the number of “tasks” or workers, can barrier execution mode automatically coalesce the dataset to have # partitions == # tasks?
  4. It would be convenient to be able to get network information about all other workers in the cluster that are in the same barrier execution, eg the host address and some task # or identifier of all workers

 

I would love to hear more about this new feature – also I had trouble finding documentation (JIRA: https://issues.apache.org/jira/browse/SPARK-24374, High level design: https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1), are there any good examples of spark packages that have moved to use the new barrier execution mode in spark 2.4?

 

Thank you, Ilya

Reply | Threaded
Open this post in threaded view
|

Re: barrier execution mode with DataFrame and dynamic allocation

Xiangrui Meng


On Wed, Dec 19, 2018 at 7:34 AM Ilya Matiach <[hidden email]> wrote:

>
> [Note: I sent this earlier but it looks like the email was blocked because I had another email group on the CC line]
>
> Hi Spark Dev,
>
> I would like to use the new barrier execution mode introduced in spark 2.4 with LightGBM in the spark package mmlspark but I ran into some issues and I had a couple questions.
>
> Currently, the LightGBM distributed learner tries to figure out the number of cores on the cluster and then does a coalesce and a mapPartitions, and inside the mapPartitions we do a NetworkInit (where the address:port of all workers needs to be passed in the constructor) and pass the data in-memory to the native layer of the distributed lightgbm learner.
>
>  
>
> With barrier execution mode, I think the code would become much more robust.  However, there are several issues that I am running into when trying to move my code over to the new barrier execution mode scheduler:
>
> Does not support dynamic allocation – however, I think it would be convenient if it restarted the job when the number of workers has decreased and allowed the dev to decide whether to restart the job if the number of workers increased

How does mmlspark handle dynamic allocation? Do you have a watch thread on the driver to restart the job if there are more workers? And when the number of workers decrease, can training continue without driver involved?

> Does not work with DataFrame or Dataset API, but I think it would be much more convenient if it did

DataFrame/Dataset do not have APIs to let users scan through the entire partition. The closest is Pandas UDF, which scans data per batch. I'm thinking about the following:

If we change Pandas UDF to take an iterator of record batches (instead of a single batch), and per contract we say this iterator will iterate through the entire partition. So you only need to do NetworkInit once.

> How does barrier execution mode deal with #partitions > #tasks?  If the number of partitions is larger than the number of “tasks” or workers, can barrier execution mode automatically coalesce the dataset to have # partitions == # tasks?

It will hang there and print warning messages. We didn't assume user code can correctly handle dynamic worker sizes.

> It would be convenient to be able to get network information about all other workers in the cluster that are in the same barrier execution, eg the host address and some task # or identifier of all workers

See getTaskInfos() at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.BarrierTaskContext.

We also provide a barrier() method there to assist simple coordination among workers.

>
>  
>
> I would love to hear more about this new feature – also I had trouble finding documentation (JIRA: https://issues.apache.org/jira/browse/SPARK-24374, High level design: https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1), are there any good examples of spark packages that have moved to use the new barrier execution mode in spark 2.4?

Databricks (which I'm an employee of) implemented HorovodRunner, which fully utilizes barrier execution mode. There is also a work-in-process open-source integration of Horovod/PySpark from Horovod author. Doing distributed deep learning training was the main use case considered in the design.

Shall we have an offline meeting or open a JIRA to discuss more details about integrating mmlspark w/ barrier execution mode?

>
>  
>
> Thank you, Ilya
Reply | Threaded
Open this post in threaded view
|

Re: barrier execution mode with DataFrame and dynamic allocation

Xiangrui Meng
(don't know why your email ends with ".invalid")

On Wed, Dec 19, 2018 at 9:13 AM Xiangrui Meng <[hidden email]> wrote:


On Wed, Dec 19, 2018 at 7:34 AM Ilya Matiach <[hidden email]> wrote:

>
> [Note: I sent this earlier but it looks like the email was blocked because I had another email group on the CC line]
>
> Hi Spark Dev,
>
> I would like to use the new barrier execution mode introduced in spark 2.4 with LightGBM in the spark package mmlspark but I ran into some issues and I had a couple questions.
>
> Currently, the LightGBM distributed learner tries to figure out the number of cores on the cluster and then does a coalesce and a mapPartitions, and inside the mapPartitions we do a NetworkInit (where the address:port of all workers needs to be passed in the constructor) and pass the data in-memory to the native layer of the distributed lightgbm learner.
>
>  
>
> With barrier execution mode, I think the code would become much more robust.  However, there are several issues that I am running into when trying to move my code over to the new barrier execution mode scheduler:
>
> Does not support dynamic allocation – however, I think it would be convenient if it restarted the job when the number of workers has decreased and allowed the dev to decide whether to restart the job if the number of workers increased

How does mmlspark handle dynamic allocation? Do you have a watch thread on the driver to restart the job if there are more workers? And when the number of workers decrease, can training continue without driver involved?

> Does not work with DataFrame or Dataset API, but I think it would be much more convenient if it did

DataFrame/Dataset do not have APIs to let users scan through the entire partition. The closest is Pandas UDF, which scans data per batch. I'm thinking about the following:

If we change Pandas UDF to take an iterator of record batches (instead of a single batch), and per contract we say this iterator will iterate through the entire partition. So you only need to do NetworkInit once.

> How does barrier execution mode deal with #partitions > #tasks?  If the number of partitions is larger than the number of “tasks” or workers, can barrier execution mode automatically coalesce the dataset to have # partitions == # tasks?

It will hang there and print warning messages. We didn't assume user code can correctly handle dynamic worker sizes.

> It would be convenient to be able to get network information about all other workers in the cluster that are in the same barrier execution, eg the host address and some task # or identifier of all workers

See getTaskInfos() at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.BarrierTaskContext.

We also provide a barrier() method there to assist simple coordination among workers.

>
>  
>
> I would love to hear more about this new feature – also I had trouble finding documentation (JIRA: https://issues.apache.org/jira/browse/SPARK-24374, High level design: https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1), are there any good examples of spark packages that have moved to use the new barrier execution mode in spark 2.4?

Databricks (which I'm an employee of) implemented HorovodRunner, which fully utilizes barrier execution mode. There is also a work-in-process open-source integration of Horovod/PySpark from Horovod author. Doing distributed deep learning training was the main use case considered in the design.

Shall we have an offline meeting or open a JIRA to discuss more details about integrating mmlspark w/ barrier execution mode?

>
>  
>
> Thank you, Ilya
Reply | Threaded
Open this post in threaded view
|

RE: barrier execution mode with DataFrame and dynamic allocation

Ilya Matiach-2

Hi Xiangrui,

Thank you for the quick reply and the great questions.

 

“How does mmlspark handle dynamic allocation? Do you have a watch thread on the driver to restart the job if there are more workers? And when the number of workers decrease, can training continue without driver involved?”

Currently in the released version, LightGBM doesn’t handle dynamic allocation – however, I do have an ongoing PR (https://github.com/Azure/mmlspark/pull/369) that I am working on to add dynamic allocation which detects if more workers have been added on the driver, saves the learner at the current iteration on the workers and then restarts the training process.  However, I haven’t fully tested it yet, I’ve seen some issues that I need to debug more and it needs some refactoring – I was hoping that by moving to barrier execution mode in Spark 2.4 I could resolve some of these issues.  The code also always saves the learner trained at the current iteration so when the number of workers decrease the job can be restarted.

 

“DataFrame/Dataset do not have APIs to let users scan through the entire partition. The closest is Pandas UDF, which scans data per batch.”

Sorry for my misunderstanding, isn’t this what mapPartitions does on DataFrame (similar to RDD) or why does it not scan through the entire partition?  Right now with barrier execution we have to do:

rdd.barrier().mapPartitions { ...code… }

but why can’t we do something like:

dataframe.barrier().mapPartitions { …code… }

 

“It will hang there and print warning messages. We didn't assume user code can correctly handle dynamic worker sizes.”

Isn’t this the usual case in user code however – that a DataFrame will have more partitions than there are workers?  In that case, is the user expected to know how many workers there are and repartition the dataframe to the number of workers always?  It seems like it would be better for barrier execution mode to do that repartitioning automatically then, no?

 

“Databricks (which I'm an employee of) implemented HorovodRunner, which fully utilizes barrier execution mode. There is also a work-in-process open-source integration of Horovod/PySpark from Horovod author. Doing distributed deep learning training was the main use case considered in the design.”

That is very cool!  I would be interested in taking a look at the Horovod/PySpark integration when it is available.

 

“Shall we have an offline meeting or open a JIRA to discuss more details about integrating mmlspark w/ barrier execution mode?”

Sure, that would be great, I’ve created a JIRA here: https://issues.apache.org/jira/browse/SPARK-26498.  I would like to help out with the effort and am interested to learn more about how we could enable dynamic allocation with barrier execution mode.

 

“(don't know why your email ends with ".invalid")”

I think that’s just what the spark dev email list does by default to email senders, I’ve seen it added to other emails on the mailing list before.

 

Thank you and Happy Holidays, Ilya

 

From: Xiangrui Meng <[hidden email]>
Sent: Wednesday, December 19, 2018 12:16 PM
To: Ilya Matiach <[hidden email]>
Cc: [hidden email]
Subject: Re: barrier execution mode with DataFrame and dynamic allocation

 

(don't know why your email ends with ".invalid")

 

On Wed, Dec 19, 2018 at 9:13 AM Xiangrui Meng <[hidden email]> wrote:



On Wed, Dec 19, 2018 at 7:34 AM Ilya Matiach <[hidden email]> wrote:
>
> [Note: I sent this earlier but it looks like the email was blocked because I had another email group on the CC line]
>
> Hi Spark Dev,
>
> I would like to use the new barrier execution mode introduced in spark 2.4 with LightGBM in the spark package mmlspark but I ran into some issues and I had a couple questions.
>
> Currently, the LightGBM distributed learner tries to figure out the number of cores on the cluster and then does a coalesce and a mapPartitions, and inside the mapPartitions we do a NetworkInit (where the address:port of all workers needs to be passed in the constructor) and pass the data in-memory to the native layer of the distributed lightgbm learner.
>
>  
>
> With barrier execution mode, I think the code would become much more robust.  However, there are several issues that I am running into when trying to move my code over to the new barrier execution mode scheduler:
>
> Does not support dynamic allocation – however, I think it would be convenient if it restarted the job when the number of workers has decreased and allowed the dev to decide whether to restart the job if the number of workers increased

How does mmlspark handle dynamic allocation? Do you have a watch thread on the driver to restart the job if there are more workers? And when the number of workers decrease, can training continue without driver involved?

> Does not work with DataFrame or Dataset API, but I think it would be much more convenient if it did

DataFrame/Dataset do not have APIs to let users scan through the entire partition. The closest is Pandas UDF, which scans data per batch. I'm thinking about the following:

If we change Pandas UDF to take an iterator of record batches (instead of a single batch), and per contract we say this iterator will iterate through the entire partition. So you only need to do NetworkInit once.

> How does barrier execution mode deal with #partitions > #tasks?  If the number of partitions is larger than the number of “tasks” or workers, can barrier execution mode automatically coalesce the dataset to have # partitions == # tasks?

It will hang there and print warning messages. We didn't assume user code can correctly handle dynamic worker sizes.

> It would be convenient to be able to get network information about all other workers in the cluster that are in the same barrier execution, eg the host address and some task # or identifier of all workers

See getTaskInfos() at https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.BarrierTaskContext.

 

We also provide a barrier() method there to assist simple coordination among workers.

>
>  
>
> I would love to hear more about this new feature – also I had trouble finding documentation (JIRA: https://issues.apache.org/jira/browse/SPARK-24374, High level design: https://www.slideshare.net/hadoop/the-zoo-expands?qid=b2efbd75-97af-4f71-9add-abf84970eaef&v=&b=&from_search=1), are there any good examples of spark packages that have moved to use the new barrier execution mode in spark 2.4?

 

Databricks (which I'm an employee of) implemented HorovodRunner, which fully utilizes barrier execution mode. There is also a work-in-process open-source integration of Horovod/PySpark from Horovod author. Doing distributed deep learning training was the main use case considered in the design.

 

Shall we have an offline meeting or open a JIRA to discuss more details about integrating mmlspark w/ barrier execution mode?


>
>  
>
> Thank you, Ilya