[Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

Ajith shetty

DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events has to be processed as DAGSchedulerEventProcessLoop is single threaded and it will block other tasks in queue like TaskCompletion.

The JobSubmitted event is time consuming depending on the nature of the job (Example: calculating parent stage dependencies, shuffle dependencies, partitions) and thus it blocks all the events to be processed.

 

I see multiple JIRA referring to this behavior

https://issues.apache.org/jira/browse/SPARK-2647

https://issues.apache.org/jira/browse/SPARK-4961

 

Similarly in my cluster some jobs partition calculation is time consuming (Similar to stack at SPARK-2647) hence it slows down the spark DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if its tasks are finished within seconds, as TaskCompletion Events are processed at a slower rate due to blockage.

 

I think we can split a JobSubmitted Event into 2 events

Step 1. JobSubmittedPreperation - Runs in separate thread on JobSubmission, this will involve steps org.apache.spark.scheduler.DAGScheduler#createResultStage

Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to DAGSchedulerEventProcessLoop and let it process output of org.apache.spark.scheduler.DAGScheduler#createResultStage

 

I can see the effect of doing this may be that Job Submissions may not be FIFO depending on how much time Step 1 mentioned above is going to consume.

 

Does above solution suffice for the problem described? And is there any other side effect of this solution?

 

Regards

Ajith

Reply | Threaded
Open this post in threaded view
|

Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

rxin
Rather than using a separate thread pool, perhaps we can just move the prep code to the call site thread?


On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty <[hidden email]> wrote:

DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events has to be processed as DAGSchedulerEventProcessLoop is single threaded and it will block other tasks in queue like TaskCompletion.

The JobSubmitted event is time consuming depending on the nature of the job (Example: calculating parent stage dependencies, shuffle dependencies, partitions) and thus it blocks all the events to be processed.

 

I see multiple JIRA referring to this behavior

https://issues.apache.org/jira/browse/SPARK-2647

https://issues.apache.org/jira/browse/SPARK-4961

 

Similarly in my cluster some jobs partition calculation is time consuming (Similar to stack at SPARK-2647) hence it slows down the spark DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if its tasks are finished within seconds, as TaskCompletion Events are processed at a slower rate due to blockage.

 

I think we can split a JobSubmitted Event into 2 events

Step 1. JobSubmittedPreperation - Runs in separate thread on JobSubmission, this will involve steps org.apache.spark.scheduler.DAGScheduler#createResultStage

Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to DAGSchedulerEventProcessLoop and let it process output of org.apache.spark.scheduler.DAGScheduler#createResultStage

 

I can see the effect of doing this may be that Job Submissions may not be FIFO depending on how much time Step 1 mentioned above is going to consume.

 

Does above solution suffice for the problem described? And is there any other side effect of this solution?

 

Regards

Ajith


Reply | Threaded
Open this post in threaded view
|

Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

Ryan Blue
I agree with Reynold. We don't need to use a separate pool, which would have the problem you raised about FIFO. We just need to do the planning outside of the scheduler loop. The call site thread sounds like a reasonable place to me.

On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin <[hidden email]> wrote:
Rather than using a separate thread pool, perhaps we can just move the prep code to the call site thread?


On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty <[hidden email]> wrote:

DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted events has to be processed as DAGSchedulerEventProcessLoop is single threaded and it will block other tasks in queue like TaskCompletion.

The JobSubmitted event is time consuming depending on the nature of the job (Example: calculating parent stage dependencies, shuffle dependencies, partitions) and thus it blocks all the events to be processed.

 

I see multiple JIRA referring to this behavior

https://issues.apache.org/jira/browse/SPARK-2647

https://issues.apache.org/jira/browse/SPARK-4961

 

Similarly in my cluster some jobs partition calculation is time consuming (Similar to stack at SPARK-2647) hence it slows down the spark DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if its tasks are finished within seconds, as TaskCompletion Events are processed at a slower rate due to blockage.

 

I think we can split a JobSubmitted Event into 2 events

Step 1. JobSubmittedPreperation - Runs in separate thread on JobSubmission, this will involve steps org.apache.spark.scheduler.DAGScheduler#createResultStage

Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to DAGSchedulerEventProcessLoop and let it process output of org.apache.spark.scheduler.DAGScheduler#createResultStage

 

I can see the effect of doing this may be that Job Submissions may not be FIFO depending on how much time Step 1 mentioned above is going to consume.

 

Does above solution suffice for the problem described? And is there any other side effect of this solution?

 

Regards

Ajith





--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

Shivaram Venkataraman
The problem with doing work in the callsite thread is that there are a
number of data structures that are updated during job submission and
these data structures are guarded by the event loop ensuring only one
thread accesses them.  I dont think there is a very easy fix for this
given the structure of the DAGScheduler.

Thanks
Shivaram

On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue <[hidden email]> wrote:

> I agree with Reynold. We don't need to use a separate pool, which would have
> the problem you raised about FIFO. We just need to do the planning outside
> of the scheduler loop. The call site thread sounds like a reasonable place
> to me.
>
> On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin <[hidden email]> wrote:
>>
>> Rather than using a separate thread pool, perhaps we can just move the
>> prep code to the call site thread?
>>
>>
>> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty <[hidden email]>
>> wrote:
>>>
>>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted
>>> events has to be processed as DAGSchedulerEventProcessLoop is single
>>> threaded and it will block other tasks in queue like TaskCompletion.
>>>
>>> The JobSubmitted event is time consuming depending on the nature of the
>>> job (Example: calculating parent stage dependencies, shuffle dependencies,
>>> partitions) and thus it blocks all the events to be processed.
>>>
>>>
>>>
>>> I see multiple JIRA referring to this behavior
>>>
>>> https://issues.apache.org/jira/browse/SPARK-2647
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4961
>>>
>>>
>>>
>>> Similarly in my cluster some jobs partition calculation is time consuming
>>> (Similar to stack at SPARK-2647) hence it slows down the spark
>>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if
>>> its tasks are finished within seconds, as TaskCompletion Events are
>>> processed at a slower rate due to blockage.
>>>
>>>
>>>
>>> I think we can split a JobSubmitted Event into 2 events
>>>
>>> Step 1. JobSubmittedPreperation - Runs in separate thread on
>>> JobSubmission, this will involve steps
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to
>>> DAGSchedulerEventProcessLoop and let it process output of
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>>
>>>
>>> I can see the effect of doing this may be that Job Submissions may not be
>>> FIFO depending on how much time Step 1 mentioned above is going to consume.
>>>
>>>
>>>
>>> Does above solution suffice for the problem described? And is there any
>>> other side effect of this solution?
>>>
>>>
>>>
>>> Regards
>>>
>>> Ajith
>>
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

rxin
It's mostly just hash maps from some ids to some state, and those can be replaced just with concurrent hash maps?

(I haven't actually looked at code and am just guessing based on recollection.)

On Tue, Mar 6, 2018 at 10:42 AM, Shivaram Venkataraman <[hidden email]> wrote:
The problem with doing work in the callsite thread is that there are a
number of data structures that are updated during job submission and
these data structures are guarded by the event loop ensuring only one
thread accesses them.  I dont think there is a very easy fix for this
given the structure of the DAGScheduler.

Thanks
Shivaram

On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue <[hidden email]> wrote:
> I agree with Reynold. We don't need to use a separate pool, which would have
> the problem you raised about FIFO. We just need to do the planning outside
> of the scheduler loop. The call site thread sounds like a reasonable place
> to me.
>
> On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin <[hidden email]> wrote:
>>
>> Rather than using a separate thread pool, perhaps we can just move the
>> prep code to the call site thread?
>>
>>
>> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty <[hidden email]>
>> wrote:
>>>
>>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted
>>> events has to be processed as DAGSchedulerEventProcessLoop is single
>>> threaded and it will block other tasks in queue like TaskCompletion.
>>>
>>> The JobSubmitted event is time consuming depending on the nature of the
>>> job (Example: calculating parent stage dependencies, shuffle dependencies,
>>> partitions) and thus it blocks all the events to be processed.
>>>
>>>
>>>
>>> I see multiple JIRA referring to this behavior
>>>
>>> https://issues.apache.org/jira/browse/SPARK-2647
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4961
>>>
>>>
>>>
>>> Similarly in my cluster some jobs partition calculation is time consuming
>>> (Similar to stack at SPARK-2647) hence it slows down the spark
>>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if
>>> its tasks are finished within seconds, as TaskCompletion Events are
>>> processed at a slower rate due to blockage.
>>>
>>>
>>>
>>> I think we can split a JobSubmitted Event into 2 events
>>>
>>> Step 1. JobSubmittedPreperation - Runs in separate thread on
>>> JobSubmission, this will involve steps
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to
>>> DAGSchedulerEventProcessLoop and let it process output of
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>>
>>>
>>> I can see the effect of doing this may be that Job Submissions may not be
>>> FIFO depending on how much time Step 1 mentioned above is going to consume.
>>>
>>>
>>>
>>> Does above solution suffice for the problem described? And is there any
>>> other side effect of this solution?
>>>
>>>
>>>
>>> Regards
>>>
>>> Ajith
>>
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

Reply | Threaded
Open this post in threaded view
|

RE: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

Ajith shetty
Thank you all for the responses and feedback. I just checked the code and looks like as Reynold already mentioned, if we change below data structures

private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
val jobIds = new HashSet[Int]
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]

to avoid concurrency issues, it should be good enough.

I would like to work on this, so i will open a JIRA and update with a PR very soon. We can continue discussion on the JIRA

Regards
Ajith

From: Reynold Xin [[hidden email]]
Sent: Wednesday, March 07, 2018 2:47 AM
To: Shivaram Venkataraman
Cc: Ryan Blue; Ajith shetty; [hidden email]
Subject: Re: [Spark][Scheduler] Spark DAGScheduler scheduling performance hindered on JobSubmitted Event

It's mostly just hash maps from some ids to some state, and those can be replaced just with concurrent hash maps?

(I haven't actually looked at code and am just guessing based on recollection.)

On Tue, Mar 6, 2018 at 10:42 AM, Shivaram Venkataraman <[hidden email]> wrote:
The problem with doing work in the callsite thread is that there are a
number of data structures that are updated during job submission and
these data structures are guarded by the event loop ensuring only one
thread accesses them.  I dont think there is a very easy fix for this
given the structure of the DAGScheduler.

Thanks
Shivaram

On Tue, Mar 6, 2018 at 8:53 AM, Ryan Blue <[hidden email]> wrote:
> I agree with Reynold. We don't need to use a separate pool, which would have
> the problem you raised about FIFO. We just need to do the planning outside
> of the scheduler loop. The call site thread sounds like a reasonable place
> to me.
>
> On Mon, Mar 5, 2018 at 12:56 PM, Reynold Xin <[hidden email]> wrote:
>>
>> Rather than using a separate thread pool, perhaps we can just move the
>> prep code to the call site thread?
>>
>>
>> On Sun, Mar 4, 2018 at 11:15 PM, Ajith shetty <[hidden email]>
>> wrote:
>>>
>>> DAGScheduler becomes a bottleneck in cluster when multiple JobSubmitted
>>> events has to be processed as DAGSchedulerEventProcessLoop is single
>>> threaded and it will block other tasks in queue like TaskCompletion.
>>>
>>> The JobSubmitted event is time consuming depending on the nature of the
>>> job (Example: calculating parent stage dependencies, shuffle dependencies,
>>> partitions) and thus it blocks all the events to be processed.
>>>
>>>
>>>
>>> I see multiple JIRA referring to this behavior
>>>
>>> https://issues.apache.org/jira/browse/SPARK-2647
>>>
>>> https://issues.apache.org/jira/browse/SPARK-4961
>>>
>>>
>>>
>>> Similarly in my cluster some jobs partition calculation is time consuming
>>> (Similar to stack at SPARK-2647) hence it slows down the spark
>>> DAGSchedulerEventProcessLoop which results in user jobs to slowdown, even if
>>> its tasks are finished within seconds, as TaskCompletion Events are
>>> processed at a slower rate due to blockage.
>>>
>>>
>>>
>>> I think we can split a JobSubmitted Event into 2 events
>>>
>>> Step 1. JobSubmittedPreperation - Runs in separate thread on
>>> JobSubmission, this will involve steps
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>> Step 2. JobSubmittedExecution - If Step 1 is success, fire an event to
>>> DAGSchedulerEventProcessLoop and let it process output of
>>> org.apache.spark.scheduler.DAGScheduler#createResultStage
>>>
>>>
>>>
>>> I can see the effect of doing this may be that Job Submissions may not be
>>> FIFO depending on how much time Step 1 mentioned above is going to consume.
>>>
>>>
>>>
>>> Does above solution suffice for the problem described? And is there any
>>> other side effect of this solution?
>>>
>>>
>>>
>>> Regards
>>>
>>> Ajith
>>
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix