[DISCUSS][SPARK-25299] SPIP: Shuffle storage API

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Yifei Huang (PD)

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit

Thank you!

Yifei Huang and Matt Cheah

 


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Mridul Muralidharan

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.


- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.
When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.


This would be a welcome and much needed enhancement to spark- looking forward to its progress !


Regards,
Mridul



On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit

Thank you!

Yifei Huang and Matt Cheah

 

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Yifei Huang (PD)

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Matt Cheah

Hi everyone,

 

I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document.

 

We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned.

 

I was hoping to discuss what our next steps could be here. Specifically,

  1. Would any PMC be willing to become the shepherd for this SPIP?
  2. Is there any more feedback regarding this proposal?
  3. What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark?

 

Thanks,

 

-Matt Cheah

 

From: "Yifei Huang (PD)" <[hidden email]>
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Saisai Shao
I'm currently working with MemVerge on the Splash project (one implementation of remote shuffle storage) and followed this ticket for a while. I would like to be a shepherd if no one else volunteered to be.

Best regards,
Saisai

Matt Cheah <[hidden email]> 于2019年6月6日周四 上午8:33写道:

Hi everyone,

 

I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document.

 

We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned.

 

I was hoping to discuss what our next steps could be here. Specifically,

  1. Would any PMC be willing to become the shepherd for this SPIP?
  2. Is there any more feedback regarding this proposal?
  3. What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark?

 

Thanks,

 

-Matt Cheah

 

From: "Yifei Huang (PD)" <[hidden email]>
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Imran Rashid-5
In reply to this post by Matt Cheah
I would be happy to shepherd this.

On Wed, Jun 5, 2019 at 7:33 PM Matt Cheah <[hidden email]> wrote:

Hi everyone,

 

I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document.

 

We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned.

 

I was hoping to discuss what our next steps could be here. Specifically,

  1. Would any PMC be willing to become the shepherd for this SPIP?
  2. Is there any more feedback regarding this proposal?
  3. What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark?

 

Thanks,

 

-Matt Cheah

 

From: "Yifei Huang (PD)" <[hidden email]>
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Saisai Shao
I think maybe we could start a vote on this SPIP. 

This has been discussed for a while, and the current doc is pretty complete as for now. Also we saw lots of demands in the community about building their own shuffle storage.

Thanks
Saisai

Imran Rashid <[hidden email]> 于2019年6月11日周二 上午3:27写道:
I would be happy to shepherd this.

On Wed, Jun 5, 2019 at 7:33 PM Matt Cheah <[hidden email]> wrote:

Hi everyone,

 

I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document.

 

We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned.

 

I was hoping to discuss what our next steps could be here. Specifically,

  1. Would any PMC be willing to become the shepherd for this SPIP?
  2. Is there any more feedback regarding this proposal?
  3. What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark?

 

Thanks,

 

-Matt Cheah

 

From: "Yifei Huang (PD)" <[hidden email]>
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Matt Cheah

We opened a thread for voting yesterday, so please participate!

 

-Matt Cheah

 

From: Yue Li <[hidden email]>
Date: Thursday, June 13, 2019 at 7:22 PM
To: Saisai Shao <[hidden email]>, Imran Rashid <[hidden email]>
Cc: Matt Cheah <[hidden email]>, "Yifei Huang (PD)" <[hidden email]>, Mridul Muralidharan <[hidden email]>, Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, Cedric Zhuang <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

+ Cedric, who is our lead developer of Splash shuffle manager at MemVerge.

 

Fully agreed with Saisai. Thanks!

 

Best,

 

Yue

 

From: Saisai Shao <[hidden email]>
Date: Thursday, June 13, 2019 at 2:52 PM
To: Imran Rashid <[hidden email]>
Cc: Matt Cheah <[hidden email]>, "Yifei Huang (PD)" <[hidden email]>, Mridul Muralidharan <[hidden email]>, Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, Yue Li <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

I think maybe we could start a vote on this SPIP. 

 

This has been discussed for a while, and the current doc is pretty complete as for now. Also we saw lots of demands in the community about building their own shuffle storage.

 

Thanks

Saisai

 

Imran Rashid <[hidden email]> 2019611日周二 上午3:27写道:

I would be happy to shepherd this.

 

On Wed, Jun 5, 2019 at 7:33 PM Matt Cheah <[hidden email]> wrote:

Hi everyone,

 

I wanted to pick this back up again. The discussion has quieted down both on this thread and on the document.

 

We made a few revisions to the document to hopefully make it easier to read and to clarify our criteria for success in the project. Some of the APIs have also been adjusted based on further discussion and things we’ve learned.

 

I was hoping to discuss what our next steps could be here. Specifically,

  1. Would any PMC be willing to become the shepherd for this SPIP?
  2. Is there any more feedback regarding this proposal?
  3. What would we need to do to take this to a voting phase and to begin proposing our work against upstream Spark?

 

Thanks,

 

-Matt Cheah

 

From: "Yifei Huang (PD)" <[hidden email]>
Date: Monday, May 13, 2019 at 1:04 PM
To: Mridul Muralidharan <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files [github.com] more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 


smime.p7s (6K) Download Attachment