Enabling push-based shuffle in Spark

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Enabling push-based shuffle in Spark

mshen
I'd like to start a discussion on enabling push-based shuffle in Spark.
This is meant to address issues with existing shuffle inefficiency in a
large-scale Spark compute infra deployment.
Facebook's previous talks on  SOS shuffle
<https://databricks.com/session/sos-optimizing-shuffle-i-o>   and  Cosco
shuffle service
<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service>  
are solutions dealing with a similar problem.
Note that this is somewhat orthogonal to the work in  SPARK-25299
<https://issues.apache.org/jira/browse/SPARK-25299>  , which is to use
remote storage to store shuffle data.
More details of our proposed design is in  SPARK-30602
<https://issues.apache.org/jira/browse/SPARK-30602>  , with SPIP attached.
Would appreciate comments and discussions from the community.



-----
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Min Shen
Staff Software Engineer
LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

rxin
Thanks for writing this up. 

Usually when people talk about push-based shuffle, they are motivating it primarily to reduce the latency of short queries, by pipelining the map phase, shuffle phase, and the reduce phase (which this design isn't going to address). It's interesting you are targeting throughput by optimizing for random reads instead.

My main questions are ...

1. This is designing for HDDs. But SSD prices have gone lower than HDDs this year, so most new data center storage will be using SSDs from now on. Are we introducing a lot of complexity to address a problem that only exists with legacy that will be phased out soon?

2. Is there a simpler way to address this? E.g. you can simply merge map outputs for each node locally, without involving any type of push. It seems to me you'd address the same issues you have, with the same limitations (of memory buffer limiting the number of concurrent streams you can write to).






On Tue, Jan 21, 2020 at 6:13 PM, mshen <[hidden email]> wrote:

I'd like to start a discussion on enabling push-based shuffle in Spark. This is meant to address issues with existing shuffle inefficiency in a large-scale Spark compute infra deployment.
Facebook's previous talks on SOS shuffle
<https://databricks.com/session/sos-optimizing-shuffle-i-o> and Cosco shuffle service
<https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service> are solutions dealing with a similar problem.
Note that this is somewhat orthogonal to the work in SPARK-25299
<https://issues.apache.org/jira/browse/SPARK-25299> , which is to use remote storage to store shuffle data.
More details of our proposed design is in SPARK-30602
<https://issues.apache.org/jira/browse/SPARK-30602> , with SPIP attached. Would appreciate comments and discussions from the community.

-----
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

--------------------------------------------------------------------- To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

mshen
Hi Reynold,

Thanks for the comments.
Although in the SPIP doc, a big portion of the problem motivation is around
optimizing small random reads for shuffle, I believe the benefit of this
design is beyond that.

In terms of the approach we take, it is true that the map phase would still
need to materialize the intermediate shuffle data and the reduce phase does
not start until the map phase is done.
However, pushing the shuffle data to the location where the reducers will
run while the map stage is running does help to provide additional latency
reduction beyond the disk read improvements.
In fact, for the benchmark job we used to measure improvements, the
reduction of shuffle fetch wait time is only about 30% of the total task
runtime reduction.

Another benefit we can expect is the improved shuffle reliability. By
reducing the # of blocks need to be fetched remotely and by providing a
2-replica of the intermediate shuffle data, we can also reduce the
likelihood of encountering shuffle fetch failures leading to expensive
retries.

For the alternative approach to merge map outputs for each node locally,
which is similar to Facebook's SOS shuffle service, there are a few
downsides:
1. The merge ratio might not be high enough, depending on the avg # of
mapper tasks per node.
2. It does not deliver the shuffle partition data to the reducers. Most of
the reducer task input still needs to be fetched remotely.
3. At least in Facebook's Rifle paper, the local merge is performed by the
shuffle service since it needs to read multiple mappers' output. This means
the memory buffering is happening on the shuffle service side, instead of on
the executor side. While our approach also does memory buffering right now,
we are doing this on the executor side, which makes it much less constraint
compared with doing this inside shuffle service.



-----
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Min Shen
Staff Software Engineer
LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

cloud0fan
The name "push-based shuffle" is a little misleading. This seems like a better shuffle service that co-locates shuffle blocks of one reducer at the map phase. I think this is a good idea. Is it possible to make it completely external via the shuffle plugin API? This looks like a good use case of the plugin API.

On Wed, Jan 22, 2020 at 3:03 PM mshen <[hidden email]> wrote:
Hi Reynold,

Thanks for the comments.
Although in the SPIP doc, a big portion of the problem motivation is around
optimizing small random reads for shuffle, I believe the benefit of this
design is beyond that.

In terms of the approach we take, it is true that the map phase would still
need to materialize the intermediate shuffle data and the reduce phase does
not start until the map phase is done.
However, pushing the shuffle data to the location where the reducers will
run while the map stage is running does help to provide additional latency
reduction beyond the disk read improvements.
In fact, for the benchmark job we used to measure improvements, the
reduction of shuffle fetch wait time is only about 30% of the total task
runtime reduction.

Another benefit we can expect is the improved shuffle reliability. By
reducing the # of blocks need to be fetched remotely and by providing a
2-replica of the intermediate shuffle data, we can also reduce the
likelihood of encountering shuffle fetch failures leading to expensive
retries.

For the alternative approach to merge map outputs for each node locally,
which is similar to Facebook's SOS shuffle service, there are a few
downsides:
1. The merge ratio might not be high enough, depending on the avg # of
mapper tasks per node.
2. It does not deliver the shuffle partition data to the reducers. Most of
the reducer task input still needs to be fetched remotely.
3. At least in Facebook's Rifle paper, the local merge is performed by the
shuffle service since it needs to read multiple mappers' output. This means
the memory buffering is happening on the shuffle service side, instead of on
the executor side. While our approach also does memory buffering right now,
we are doing this on the executor side, which makes it much less constraint
compared with doing this inside shuffle service.



-----
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

mshen
Hi Wenchen,

Glad to know that you like this idea.
We also looked into making this pluggable in our early design phase.
While the ShuffleManager API for pluggable shuffle systems does provide
quite some room for customized behaviors for Spark shuffle, we feel that it
is still not enough for this case.

Right now, the shuffle block location information is tracked inside
MapOutputTracker and updated by DAGScheduler.
Since we are relocating the shuffle blocks to improve overall shuffle
throughput and efficiency, being able to update the information tracked
inside MapOutputTracker so reducers can access their shuffle input more
efficiently is thus necessary.
Letting DAGScheduler orchestrate this process also provides the benefit of
better coping with stragglers.
If DAGScheduler has no control or is agnostic of the block push progress, it
does leave a few gaps.

On the shuffle Netty protocol side, there are a lot that can be leveraged
from the existing code.
With improvements in SPARK-24355 and SPARK-30512, the shuffle service Netty
server is becoming much more reliable.
The work in SPARK-6237 also provided quite some leverage for streaming push
of shuffle blocks.
Instead of building all of these from scratch, we took the alternative route
of building on top of the existing Netty protocol to implement the shuffle
block push operation.

We feel that this design has the potential of further improving Spark
shuffle system's scalability and efficiency, making Spark an even better
compute engine.
Would like to explore how we can leverage the shuffle plugin API to make
this design more acceptable.



-----
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Min Shen
Staff Software Engineer
LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

mshen
Hi Andrew,

We are leveraging SPARK-6237 to control the off-heap memory consumption due to Netty.
With that change, the data is processed in a streaming fashion so Netty does not buffer an entire RPC in memory before handing it over to RPCHandler.
We tested with our internal stress testing framework, and did not see much change in the memory consumption of the shuffle service.
In terms of sharing the code, not sure what would be an effective way to do that.
If interested, maybe we can call a meeting to chat in more depth.

Best,
Min

On Mon, Jan 27, 2020 at 11:30 AM Long, Andrew <[hidden email]> wrote:
Hey Min,

One thing of concern would be off heap memory utilization due to netty.  Depending on the number of connections that you create.

Would it be possible to take a look at your code?  My team has a performance test harness that I'd like to test it with.

Cheers Andrew



On 1/23/20, 10:25 AM, "mshen" <[hidden email]> wrote:

    Hi Wenchen,

    Glad to know that you like this idea.
    We also looked into making this pluggable in our early design phase.
    While the ShuffleManager API for pluggable shuffle systems does provide
    quite some room for customized behaviors for Spark shuffle, we feel that it
    is still not enough for this case.

    Right now, the shuffle block location information is tracked inside
    MapOutputTracker and updated by DAGScheduler.
    Since we are relocating the shuffle blocks to improve overall shuffle
    throughput and efficiency, being able to update the information tracked
    inside MapOutputTracker so reducers can access their shuffle input more
    efficiently is thus necessary.
    Letting DAGScheduler orchestrate this process also provides the benefit of
    better coping with stragglers.
    If DAGScheduler has no control or is agnostic of the block push progress, it
    does leave a few gaps.

    On the shuffle Netty protocol side, there are a lot that can be leveraged
    from the existing code.
    With improvements in SPARK-24355 and SPARK-30512, the shuffle service Netty
    server is becoming much more reliable.
    The work in SPARK-6237 also provided quite some leverage for streaming push
    of shuffle blocks.
    Instead of building all of these from scratch, we took the alternative route
    of building on top of the existing Netty protocol to implement the shuffle
    block push operation.

    We feel that this design has the potential of further improving Spark
    shuffle system's scalability and efficiency, making Spark an even better
    compute engine.
    Would like to explore how we can leverage the shuffle plugin API to make
    this design more acceptable.



    -----
    Min Shen
    Staff Software Engineer
    LinkedIn
    --
    Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]



Min Shen
Staff Software Engineer
LinkedIn
Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

Long, Andrew-2

The easiest would be to create a fork of the code in github.   I can also accept diffs.

 

Cheers Andrew

 

From: Min Shen <[hidden email]>
Date: Monday, January 27, 2020 at 12:48 PM
To: "Long, Andrew" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Enabling push-based shuffle in Spark

 

Hi Andrew,

 

We are leveraging SPARK-6237 to control the off-heap memory consumption due to Netty.

With that change, the data is processed in a streaming fashion so Netty does not buffer an entire RPC in memory before handing it over to RPCHandler.

We tested with our internal stress testing framework, and did not see much change in the memory consumption of the shuffle service.

In terms of sharing the code, not sure what would be an effective way to do that.

If interested, maybe we can call a meeting to chat in more depth.

 

Best,

Min

 

On Mon, Jan 27, 2020 at 11:30 AM Long, Andrew <[hidden email]> wrote:

Hey Min,

One thing of concern would be off heap memory utilization due to netty.  Depending on the number of connections that you create.

Would it be possible to take a look at your code?  My team has a performance test harness that I'd like to test it with.

Cheers Andrew



On 1/23/20, 10:25 AM, "mshen" <[hidden email]> wrote:

    Hi Wenchen,

    Glad to know that you like this idea.
    We also looked into making this pluggable in our early design phase.
    While the ShuffleManager API for pluggable shuffle systems does provide
    quite some room for customized behaviors for Spark shuffle, we feel that it
    is still not enough for this case.

    Right now, the shuffle block location information is tracked inside
    MapOutputTracker and updated by DAGScheduler.
    Since we are relocating the shuffle blocks to improve overall shuffle
    throughput and efficiency, being able to update the information tracked
    inside MapOutputTracker so reducers can access their shuffle input more
    efficiently is thus necessary.
    Letting DAGScheduler orchestrate this process also provides the benefit of
    better coping with stragglers.
    If DAGScheduler has no control or is agnostic of the block push progress, it
    does leave a few gaps.

    On the shuffle Netty protocol side, there are a lot that can be leveraged
    from the existing code.
    With improvements in SPARK-24355 and SPARK-30512, the shuffle service Netty
    server is becoming much more reliable.
    The work in SPARK-6237 also provided quite some leverage for streaming push
    of shuffle blocks.
    Instead of building all of these from scratch, we took the alternative route
    of building on top of the existing Netty protocol to implement the shuffle
    block push operation.

    We feel that this design has the potential of further improving Spark
    shuffle system's scalability and efficiency, making Spark an even better
    compute engine.
    Would like to explore how we can leverage the shuffle plugin API to make
    this design more acceptable.



    -----
    Min Shen
    Staff Software Engineer
    LinkedIn
    --
    Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

    ---------------------------------------------------------------------
    To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Enabling push-based shuffle in Spark

mshen
In reply to this post by mshen
Our paper summarizing this work of push-based shuffle was recently accepted
by VLDB 2020.
We have uploaded a preprint version of the paper to the  JIRA ticket
<https://issues.apache.org/jira/browse/SPARK-30602>  , along with the
production results we have so far.



-----
Min Shen
Staff Software Engineer
LinkedIn
--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Min Shen
Staff Software Engineer
LinkedIn