Spark Scheduler - Task and job levels - How it Works?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Scheduler - Task and job levels - How it Works?

Miguel F. S. Vasconcelos
Hello,

I'm new to Spark and trying to understand how exactly spark scheduler works.

In the article /"Resilient Distributed Datasets: A Fault-Tolerant
Abstraction for In-Memory Cluster Computing"/ in section  5.1 Job
Scheduling" its said that:
/
"Whenever a user runs an action (e.g., count or save) on an RDD, the
scheduler examines that RDD’s lineage graph to build a DAG of stages to
execute, as illustrated in Figure 5. Each stage contains as many pipelined
transformations with narrow dependencies as possible. The boundaries of the
stages are the shuffle operations required for wide dependencies, or any
already computed partitions that can shortcircuit the computation of a
parent RDD. The scheduler then launches tasks to compute missing partitions
from each stage until it has computed the target RDD.
Our scheduler assigns tasks to machines based on data locality using delay
scheduling [32]. If a task needs to process a partition that is available in
memory on a node, we send it to that node. Otherwise, if a task processes a
partition for which the containing RDD provides preferred locations (e.g.,
an HDFS file), we send it to those. "/

After reading the gitbook/ "Mastering Apache Spark"/ by Jacek Laskowski and
some of the Spark's code, what I have understand about schedulling on spark
is this:

When an action is performed onto a RDD, Spark send it as a job to the
DAGScheduler;
The DAGScheduler compute the execution DAG based on the RDD's lineage, and
split the job into stages (using wide dependencies);
The resulting stages are transformed into a set of tasks, that are sent to
the TaskScheduler;
The TaskScheduler send the set of tasks to the executors, where they will
run.

Is this flow correct?

And are the jobs  discovered during the application execution and sent
sequentially to the DAGScheduler?

In the file /DAGScheduler.scala/ there's this comment:
/
* The high-level scheduling layer that implements stage-oriented scheduling.
It computes a DAG of
* stages for each job, keeps track of which RDDs and stage outputs are
materialized, *and finds a*
* *minimal schedule to run the job*. It then submits stages as TaskSets to
an underlying
* TaskScheduler implementation that runs them on the cluster. A TaskSet
contains fully independent
* tasks that can run right away based on the data that's already on the
cluster (e.g. map output
* files from previous stages), though it may fail if this data becomes
unavailable.
/

Regarding this part /"finds a minimal schedule to run the job"/, I have not
found this algorithm for getting the minimal schedule. Can you help me?


And, based on these comments:

File /TaskScheduler.scala/
/* Low-level task scheduler interface, currently implemented exclusively by
* [[org.apache.spark.scheduler.TaskSchedulerImpl]].
* This interface allows plugging in different task schedulers. Each
TaskScheduler schedules tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to
them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to
the cluster, running
* them, retrying if there are failures, and mitigating stragglers. They
return events to the
* DAGScheduler.
/
File/ TaskSchedulerImpl.scala/
/* Schedules tasks for multiple types of clusters by acting through a
SchedulerBackend.
/
File /SchedulerBackend.scala
/**
* A backend interface for scheduling systems that allows plugging in
different ones under
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets
resource offers as
* machines become available and can launch tasks on them.
*/
/

And this from Spark docs:

/"Scheduling Within an Application
Inside a given Spark application (SparkContext instance), multiple parallel
jobs can run simultaneously if they were submitted from separate threads. By
“job”, in this section, we mean a Spark action (e.g. save, collect) and any
tasks that need to run to evaluate that action. Spark’s scheduler is fully
thread-safe and supports this use case to enable applications that serve
multiple requests (e.g. queries for multiple users).
By default, Spark’s scheduler runs jobs in FIFO fashion. Each job is divided
into “stages” (e.g. map and reduce phases), and the first job gets priority
on all available resources while its stages have tasks to launch, then the
second job gets priority, etc. If the jobs at the head of the queue don’t
need to use the whole cluster, later jobs can start to run right away, but
if the jobs at the head of the queue are large, then later jobs may be
delayed significantly.
Starting in Spark 0.8, it is also possible to configure fair sharing between
jobs. Under fair sharing, Spark assigns tasks between jobs in a “round
robin” fashion, so that all jobs get a roughly equal share of cluster
resources. This means that short jobs submitted while a long job is running
can start receiving resources right away and still get good response times,
without waiting for the long job to finish. This mode is best for multi-user
settings."/

I'm in doubt if the scheduling is at Task level, job level, or both. These
scheduling modes: FIFO and FAIR, are for tasks or jobs?

Also, as the TaskScheduler is an interface, is possible to "plug" different
scheduling algorithms to it, correct?

But what about the DAGScheduler, is there any interface that allows plugging
different scheduling algorithms to it?

In the video "Introduction to AmpLab Spark Internals" its said that
pluggable inter job scheduling is a possible future extension. Anyone knows
if this has already been addressed ?

I'm starting a master degree and I'd really like to contribute to Spark. Are
there suggestions of issues in the spark scheduling that could be
addressed??

Best,
Miguel



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Scheduler - Task and job levels - How it Works?

Imran Rashid-4
Hi Miguel,

On Sun, Jan 6, 2019 at 11:35 AM Miguel F. S. Vasconcelos <[hidden email]> wrote:
When an action is performed onto a RDD, Spark send it as a job to the
DAGScheduler;
The DAGScheduler compute the execution DAG based on the RDD's lineage, and
split the job into stages (using wide dependencies);
The resulting stages are transformed into a set of tasks, that are sent to
the TaskScheduler;
The TaskScheduler send the set of tasks to the executors, where they will
run.

Is this flow correct?

yes, more or less, though that's really just the beginning.  Then there is an endless back-and-forth as executors complete tasks, send info back to the driver, the driver updates some state, perhaps just launches more tasks in the existing tasksets, or creates more, or finishes jobs, etc.

And are the jobs  discovered during the application execution and sent
sequentially to the DAGScheduler?

yes, there are very specific apis to create a job -- in the user guide, these are called "actions".  Most of these are blocking, eg. when the user calls rdd.count(), a job is created, potentially with a very long lineage and many stages, and only when the entire job is completed does rdd.count() complete.  There are a few async versions (eg. countAsync()), but from what I've seen, the more common way to submit multiple concurrent jobs is to use the regular apis from multiple threads.
 
Regarding this part /"finds a minimal schedule to run the job"/, I have not
found this algorithm for getting the minimal schedule. Can you help me?

I think its just using narrow dependencies, and re-using existing cached data & shuffle data whenever possible.  that's implicit in the DAGScheduler & RDD code.
 
I'm in doubt if the scheduling is at Task level, job level, or both. These
scheduling modes: FIFO and FAIR, are for tasks or jobs?

FIFO and FAIR only matter if you've got multiple concurrent jobs.  But then it controls how you schedule tasks *within* those jobs.  When a job is submitted, the DAGScheduler will still compute the DAG and the next taskset to run for that job, even if the entire cluster is busy.  But then as resources free up, it needs to decide which job to submit tasks from.  Note you might have 2 active jobs, with 5 stages ready to submit tasks, and another 20 stages still waiting for their dependencies to be computed, further down the pipeline for those jobs.
 
Also, as the TaskScheduler is an interface, is possible to "plug" different
scheduling algorithms to it, correct?

yes, though spark itself only has one implementation (I believe Facebook has their own, not sure of others?).  There is a ExternalClusterManager api to let users plug in their own.
 
But what about the DAGScheduler, is there any interface that allows plugging
different scheduling algorithms to it?

there is no interface currently. 

In the video "Introduction to AmpLab Spark Internals" its said that
pluggable inter job scheduling is a possible future extension. Anyone knows
if this has already been addressed ?

don't think so.
 
I'm starting a master degree and I'd really like to contribute to Spark. Are
there suggestions of issues in the spark scheduling that could be
addressed??

there is lots to do, but this is tough to answer.  Depends on your interests in particular.  Also, to be honest, the work that needs to be done often doesn't align well with the kind of work you need to do for a research project.  For example, adding existing features into cluster managers (eg. kubernetes), or adding tests & chasing down concurrency bugs might not interest a student.  OTOH, if you create an entirely different implementation of the DAGScheduler and do some tests on its properties under various loads -- that would be really interesting, but its also unlikely to get accepted given the quality of code that normal comes from research projects, and without finding folks in the community that understand it well and are ready to maintain it.


there was some high level discussion a while back about a set of changes we might consider making to scheduler, particularly for dealing w/ failures on large clusters, but this never really picked up steam.  Might be interesting for a research project: https://issues.apache.org/jira/browse/SPARK-20178