[DISCUSS][SPARK-25299] SPIP: Shuffle storage API

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Yifei Huang (PD)

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit

Thank you!

Yifei Huang and Matt Cheah

 


smime.p7s (6K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Mridul Muralidharan

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.


- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.
When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.


This would be a welcome and much needed enhancement to spark- looking forward to its progress !


Regards,
Mridul



On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit

Thank you!

Yifei Huang and Matt Cheah

 

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

Yifei Huang (PD)

Hi Mridul - thanks for taking the time to give us feedback! Thoughts on the points that you mentioned:

 

The API is meant to work with the existing SortShuffleManager algorithm. There aren't strict requirements on how other ShuffleManager implementations must behave, so it seems impractical to design an API that could also satisfy those unknown requirements. However, we do believe that the API is rather generic, using OutputStreams for writes and InputStreams for reads, and indexing the data by a shuffleId-mapId-reduceId combo, so if other shuffle algorithms treat the data in the same chunks and want an interface for storage, then they can also use this API from within their implementation.

 

About speculative execution, we originally made the assumption that each shuffle task is deterministic, which meant that even if a later mapper overrode a previous committed mapper's value, it's still the same contents. Having searched some tickets and reading https://github.com/apache/spark/pull/22112/files more carefully, I think there are problems with our original thought if the writer writes all attempts of a task to the same location. One example is if the writer implementation writes each partition to the remote host in a sequence of chunks. In such a situation, a reducer might read data half written by the original task and half written by the running speculative task, which will not be the correct contents if the mapper output is unordered. Therefore, writes by a single mapper might have to be transactioned, which is not clear from the API, and seems rather complex to reason about, so we shouldn't expect this from the implementer.

 

However, this doesn't affect the fundamentals of the API since we only need to add an additional attemptId to the storage data index (which can be stored within the MapStatus) to solve the problem of concurrent writes. This would also make it more clear that the writer should use attempt ID as an index to ensure that writes from speculative tasks don't interfere with one another (we can add that to the API docs as well).

 

From: Mridul Muralidharan <[hidden email]>
Date: Wednesday, May 8, 2019 at 8:18 PM
To: "Yifei Huang (PD)" <[hidden email]>
Cc: Bo Yang <[hidden email]>, Ilan Filonenko <[hidden email]>, Imran Rashid <[hidden email]>, Justin Uang <[hidden email]>, Liang Tang <[hidden email]>, Marcelo Vanzin <[hidden email]>, Matei Zaharia <[hidden email]>, Matt Cheah <[hidden email]>, Min Shen <[hidden email]>, Reynold Xin <[hidden email]>, Ryan Blue <[hidden email]>, Vinoo Ganesh <[hidden email]>, Will Manning <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: [DISCUSS][SPARK-25299] SPIP: Shuffle storage API

 

 

Unfortunately I do not have bandwidth to do a detailed review, but a few things come to mind after a quick read:

 

- While it might be tactically beneficial to align with existing implementation, a clean design which does not tie into existing shuffle implementation would be preferable (if it can be done without over engineering). Shuffle implementation can change and there are custom implementations and experiments which differ quite a bit from what comes with Apache Spark.

 

 

- Please keep speculative execution in mind while designing the interfaces: in spark, implicitly due to task scheduler logic, you won’t have conflicts at an executor for (shuffleId, mapId) and (shuffleId, mapId, reducerId) tuple.

When you externalize it, there can be conflict : passing a way to distinguish different tasks for same partition would be necessary for nontrivial implementations.

 

 

This would be a welcome and much needed enhancement to spark- looking forward to its progress !

 

 

Regards,

Mridul

 

 

 

On Wed, May 8, 2019 at 11:24 AM Yifei Huang (PD) <[hidden email]> wrote:

Hi everyone,

For the past several months, we have been working on an API for pluggable storage of shuffle data. In this SPIP, we describe the proposed API, its implications, and how it fits into other work being done in the Spark shuffle space. If you're interested in Spark shuffle, and especially if you have done some work in this area already, please take a look at the SPIP and give us your thoughts and feedback.

Jira Ticket: https://issues.apache.org/jira/browse/SPARK-25299 [issues.apache.org]
SPIP: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit [docs.google.com]

Thank you!

Yifei Huang and Matt Cheah

 


smime.p7s (6K) Download Attachment