SPIP: Spark on Kubernetes

classic Classic list List threaded Threaded
46 messages Options
123
Reply | Threaded
Open this post in threaded view
|

SPIP: Spark on Kubernetes

Anirudh Ramanathan-2
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

SPIP Kubernetes as A Native Cluster Manager.pdf (175K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Erik Erlandson-2
+1 (non-binding)

On Tue, Aug 15, 2017 at 8:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.



Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

ifilonenko
This post has NOT been accepted by the mailing list yet.
In reply to this post by Anirudh Ramanathan-2
+1 (nonbinding)
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Sean Suchter
In reply to this post by Anirudh Ramanathan-2
+1 (non-binding)
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Kimoon Kim
+1 (non-binding)

Thanks,
Kimoon

On Tue, Aug 15, 2017 at 9:19 AM, Sean Suchter <[hidden email]> wrote:
+1 (non-binding)



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22150.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Timothy Chen
+1 (non-binding)

Tim

On Tue, Aug 15, 2017 at 9:20 AM, Kimoon Kim <[hidden email]> wrote:

> +1 (non-binding)
>
> Thanks,
> Kimoon
>
> On Tue, Aug 15, 2017 at 9:19 AM, Sean Suchter <[hidden email]>
> wrote:
>>
>> +1 (non-binding)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22150.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Will Benton
In reply to this post by Anirudh Ramanathan-2
+1 (non-binding)

On Tue, Aug 15, 2017 at 10:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Holden Karau
+1 (non-binding)

I (personally) think that Kubernetes as a scheduler backend should eventually get merged in and there is clearly a community interested in the work required to maintain it.

On Tue, Aug 15, 2017 at 9:51 AM William Benton <[hidden email]> wrote:
+1 (non-binding)

On Tue, Aug 15, 2017 at 10:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Cell : 425-233-8271
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Seshu Adunuthula
This post has NOT been accepted by the mailing list yet.
This post was updated on .
+1 (non-binding)

eBay has a roadmap for standardizing service workloads on Kubernetes. It is very interesting to run Batch (Big Data) workloads and use the same platform for service and batch.
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Daniel Imberman
In reply to this post by Holden Karau
+1 (non-binding)

Glad to see this moving forward :D

On Tue, Aug 15, 2017 at 10:10 AM Holden Karau <[hidden email]> wrote:
+1 (non-binding)

I (personally) think that Kubernetes as a scheduler backend should eventually get merged in and there is clearly a community interested in the work required to maintain it.

On Tue, Aug 15, 2017 at 9:51 AM William Benton <[hidden email]> wrote:
+1 (non-binding)

On Tue, Aug 15, 2017 at 10:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Cell : <a href="tel:(425)%20233-8271" value="+14252338271" target="_blank">425-233-8271
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Erik Erlandson-2
In reply to this post by Erik Erlandson-2

Kubernetes has evolved into an important container orchestration platform; it has a large and growing user base and an active ecosystem.  Users of Apache Spark who are also deploying applications on Kubernetes (or are planning to) will have convergence-related motivations for migrating their Spark applications to Kubernetes as well. It avoids the need for deploying separate cluster infra for Spark workloads and allows Spark applications to take full advantage of inhabiting the same orchestration environment as other applications.  In this respect, native Kubernetes support for Spark represents a way to optimize uptake and retention of Apache Spark among the members of the expanding Kubernetes community.

On Tue, Aug 15, 2017 at 8:43 AM, Erik Erlandson <[hidden email]> wrote:
+1 (non-binding)


On Tue, Aug 15, 2017 at 8:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.




Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Shubham Chopra
+1 (non-binding)

~Shubham.

On Tue, Aug 15, 2017 at 2:11 PM, Erik Erlandson <[hidden email]> wrote:

Kubernetes has evolved into an important container orchestration platform; it has a large and growing user base and an active ecosystem.  Users of Apache Spark who are also deploying applications on Kubernetes (or are planning to) will have convergence-related motivations for migrating their Spark applications to Kubernetes as well. It avoids the need for deploying separate cluster infra for Spark workloads and allows Spark applications to take full advantage of inhabiting the same orchestration environment as other applications.  In this respect, native Kubernetes support for Spark represents a way to optimize uptake and retention of Apache Spark among the members of the expanding Kubernetes community.

On Tue, Aug 15, 2017 at 8:43 AM, Erik Erlandson <[hidden email]> wrote:
+1 (non-binding)


On Tue, Aug 15, 2017 at 8:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.





Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Jiri Kremser
+1 (non-binding)

On Tue, Aug 15, 2017 at 10:19 PM, Shubham Chopra <[hidden email]> wrote:
+1 (non-binding)

~Shubham.

On Tue, Aug 15, 2017 at 2:11 PM, Erik Erlandson <[hidden email]> wrote:

Kubernetes has evolved into an important container orchestration platform; it has a large and growing user base and an active ecosystem.  Users of Apache Spark who are also deploying applications on Kubernetes (or are planning to) will have convergence-related motivations for migrating their Spark applications to Kubernetes as well. It avoids the need for deploying separate cluster infra for Spark workloads and allows Spark applications to take full advantage of inhabiting the same orchestration environment as other applications.  In this respect, native Kubernetes support for Spark represents a way to optimize uptake and retention of Apache Spark among the members of the expanding Kubernetes community.

On Tue, Aug 15, 2017 at 8:43 AM, Erik Erlandson <[hidden email]> wrote:
+1 (non-binding)


On Tue, Aug 15, 2017 at 8:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.






Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

liyinan926
In reply to this post by Anirudh Ramanathan-2
+1 (non-binding)
Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Andrew Ash
+1 (non-binding)

We're moving large amounts of infrastructure from a combination of open source and homegrown cluster management systems to unify on Kubernetes and want to bring Spark workloads along with us.

On Tue, Aug 15, 2017 at 2:29 PM, liyinan926 <[hidden email]> wrote:
+1 (non-binding)



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

回复: SPIP: Spark on Kubernetes

李书明
In reply to this post by Jiri Kremser
+1



2017年08月16日 04:53[hidden email] 写道:
+1 (non-binding)

On Tue, Aug 15, 2017 at 10:19 PM, Shubham Chopra <[hidden email]> wrote:
+1 (non-binding)

~Shubham.

On Tue, Aug 15, 2017 at 2:11 PM, Erik Erlandson <[hidden email]> wrote:

Kubernetes has evolved into an important container orchestration platform; it has a large and growing user base and an active ecosystem.  Users of Apache Spark who are also deploying applications on Kubernetes (or are planning to) will have convergence-related motivations for migrating their Spark applications to Kubernetes as well. It avoids the need for deploying separate cluster infra for Spark workloads and allows Spark applications to take full advantage of inhabiting the same orchestration environment as other applications.  In this respect, native Kubernetes support for Spark represents a way to optimize uptake and retention of Apache Spark among the members of the expanding Kubernetes community.

On Tue, Aug 15, 2017 at 8:43 AM, Erik Erlandson <[hidden email]> wrote:
+1 (non-binding)


On Tue, Aug 15, 2017 at 8:32 AM, Anirudh Ramanathan <[hidden email]> wrote:
Spark on Kubernetes effort has been developed separately in a fork, and linked back from the Apache Spark project as an experimental backend. We're ~6 months in, have had 5 releases
  • 2 Spark versions maintained (2.1, and 2.2)
  • Extensive integration testing and refactoring efforts to maintain code quality
  • Developer and user-facing documentation
  • 10+ consistent code contributors from different organizations involved in actively maintaining and using the project, with several more members involved in testing and providing feedback.
  • The community has delivered several talks on Spark-on-Kubernetes generating lots of feedback from users.
  • In addition to these, we've seen efforts spawn off such as:

Following the SPIP process, I'm putting this SPIP up for a vote.
  • +1: Yeah, let's go forward and implement the SPIP.
  • +0: Don't really care.
  • -1: I don't think this is a good idea because of the following technical reasons.
If there is any further clarification desired, on the design or the implementation, please feel free to ask questions or provide feedback.


SPIP: Kubernetes as A Native Cluster Manager


Full Design Doc: link

JIRA: https://issues.apache.org/jira/browse/SPARK-18278

Kubernetes Issue: https://github.com/kubernetes/kubernetes/issues/34377


Authors: Yinan Li, Anirudh Ramanathan, Erik Erlandson, Andrew Ash, Matt Cheah,

Ilan Filonenko, Sean Suchter, Kimoon Kim

Background and Motivation

Containerization and cluster management technologies are constantly evolving in the cluster computing world. Apache Spark currently implements support for Apache Hadoop YARN and Apache Mesos, in addition to providing its own standalone cluster manager. In 2014, Google announced development of Kubernetes which has its own unique feature set and differentiates itself from YARN and Mesos. Since its debut, it has seen contributions from over 1300 contributors with over 50000 commits. Kubernetes has cemented itself as a core player in the cluster computing world, and cloud-computing providers such as Google Container Engine, Google Compute Engine, Amazon Web Services, and Microsoft Azure support running Kubernetes clusters.


This document outlines a proposal for integrating Apache Spark with Kubernetes in a first class way, adding Kubernetes to the list of cluster managers that Spark can be used with. Doing so would allow users to share their computing resources and containerization framework between their existing applications on Kubernetes and their computational Spark applications. Although there is existing support for running a Spark standalone cluster on Kubernetes, there are still major advantages and significant interest in having native execution support. For example, this integration provides better support for multi-tenancy and dynamic resource allocation. It also allows users to run applications of different Spark versions of their choices in the same cluster.


The feature is being developed in a separate fork in order to minimize risk to the main project during development. Since the start of the development in November of 2016, it has received over 100 commits from over 20 contributors and supports two releases based on Spark 2.1 and 2.2 respectively. Documentation is also being actively worked on both in the main project repository and also in the repository https://github.com/apache-spark-on-k8s/userdocs. Regarding real-world use cases, we have seen cluster setup that uses 1000+ cores. We are also seeing growing interests on this project from more and more organizations.


While it is easy to bootstrap the project in a forked repository, it is hard to maintain it in the long run because of the tricky process of rebasing onto the upstream and lack of awareness in the large Spark community. It would be beneficial to both the Spark and Kubernetes community seeing this feature being merged upstream. On one hand, it gives Spark users the option of running their Spark workloads along with other workloads that may already be running on Kubernetes, enabling better resource sharing and isolation, and better cluster administration. On the other hand, it gives Kubernetes a leap forward in the area of large-scale data processing by being an officially supported cluster manager for Spark. The risk of merging into upstream is low because most of the changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. The development is also concentrated in a single place at resource-managers/kubernetes. The risk is further reduced by a comprehensive integration test framework, and an active and responsive community of future maintainers.

Target Personas

Devops, data scientists, data engineers, application developers, anyone who can benefit from having Kubernetes as a native cluster manager for Spark.

Goals

  • Make Kubernetes a first-class cluster manager for Spark, alongside Spark Standalone, Yarn, and Mesos.

  • Support both client and cluster deployment mode.

  • Support dynamic resource allocation.

  • Support Spark Java/Scala, PySpark, and Spark R applications.

  • Support secure HDFS access.

  • Allow running applications of different Spark versions in the same cluster through the ability to specify the driver and executor Docker images on a per-application basis.

  • Support specification and enforcement of limits on both CPU cores and memory.

Non-Goals

  • Support cluster resource scheduling and sharing beyond capabilities offered natively by the Kubernetes per-namespace resource quota model.

Proposed API Changes

Most API changes are purely incremental, i.e., new Kubernetes-aware implementations of existing interfaces/classes in Spark core are introduced. Detailed changes are as follows.

  • A new cluster manager option KUBERNETES is introduced and some changes are made to SparkSubmit to make it be aware of this option.

  • A new implementation of CoarseGrainedSchedulerBackend, namely KubernetesClusterSchedulerBackend is responsible for managing the creation and deletion of executor Pods through the Kubernetes API.

  • A new implementation of TaskSchedulerImpl, namely KubernetesTaskSchedulerImpl, and a new implementation of TaskSetManager, namely Kubernetes TaskSetManager, are introduced for Kubernetes-aware task scheduling.

  • When dynamic resource allocation is enabled, a new implementation of ExternalShuffleService, namely KubernetesExternalShuffleService is introduced.

Design Sketch

Below we briefly describe the design. For more details on the design and architecture, please refer to the architecture documentation. The main idea of this design is to run Spark driver and executors inside Kubernetes Pods. Pods are a co-located and co-scheduled group of one or more containers run in a shared context. The driver is responsible for creating and destroying executor Pods through the Kubernetes API, while Kubernetes is fully responsible for scheduling the Pods to run on available nodes in the cluster. In the cluster mode, the driver also runs in a Pod in the cluster, created through the Kubernetes API by a Kubernetes-aware submission client called by the spark-submit script. Because the driver runs in a Pod, it is reachable by the executors in the cluster using its Pod IP. In the client mode, the driver runs outside the cluster and calls the Kubernetes API to create and destroy executor Pods. The driver must be routable from within the cluster for the executors to communicate with it.


The main component running in the driver is the KubernetesClusterSchedulerBackend, an implementation of CoarseGrainedSchedulerBackend, which manages allocating and destroying executors via the Kubernetes API, as instructed by Spark core via calls to methods doRequestTotalExecutors and doKillExecutors, respectively. Within the KubernetesClusterSchedulerBackend, a separate kubernetes-pod-allocator thread handles the creation of new executor Pods with appropriate throttling and monitoring. Throttling is achieved using a feedback loop that makes decision on submitting new requests for executors based on whether previous executor Pod creation requests have completed. This indirection is necessary because the Kubernetes API server accepts requests for new Pods optimistically, with the anticipation of being able to eventually schedule them to run. However, it is undesirable to have a very large number of Pods that cannot be scheduled and stay pending within the cluster. The throttling mechanism gives us control over how fast an application scales up (which can be configured), and helps prevent Spark applications from DOS-ing the Kubernetes API server with too many Pod creation requests. The executor Pods simply run the CoarseGrainedExecutorBackend class from a pre-built Docker image that contains a Spark distribution.


There are auxiliary and optional components: ResourceStagingServer and KubernetesExternalShuffleService, which serve specific purposes described below. The ResourceStagingServer serves as a file store (in the absence of a persistent storage layer in Kubernetes) for application dependencies uploaded from the submission client machine, which then get downloaded from the server by the init-containers in the driver and executor Pods. It is a Jetty server with JAX-RS and has two endpoints for uploading and downloading files, respectively. Security tokens are returned in the responses for file uploading and must be carried in the requests for downloading the files. The ResourceStagingServer is deployed as a Kubernetes Service backed by a Deployment in the cluster and multiple instances may be deployed in the same cluster. Spark applications specify which ResourceStagingServer instance to use through a configuration property.


The KubernetesExternalShuffleService is used to support dynamic resource allocation, with which the number of executors of a Spark application can change at runtime based on the resource needs. It provides an additional endpoint for drivers that allows the shuffle service to delete driver termination and clean up the shuffle files associated with corresponding application. There are two ways of deploying the KubernetesExternalShuffleService: running a shuffle service Pod on each node in the cluster or a subset of the nodes using a DaemonSet, or running a shuffle service container in each of the executor Pods. In the first option, each shuffle service container mounts a hostPath volume. The same hostPath volume is also mounted by each of the executor containers, which must also have the environment variable SPARK_LOCAL_DIRS point to the hostPath. In the second option, a shuffle service container is co-located with an executor container in each of the executor Pods. The two containers share an emptyDir volume where the shuffle data gets written to. There may be multiple instances of the shuffle service deployed in a cluster that may be used for different versions of Spark, or for different priority levels with different resource quotas.


New Kubernetes-specific configuration options are also introduced to facilitate specification and customization of driver and executor Pods and related Kubernetes resources. For example, driver and executor Pods can be created in a particular Kubernetes namespace and on a particular set of the nodes in the cluster. Users are allowed to apply labels and annotations to the driver and executor Pods.


Additionally, secure HDFS support is being actively worked on following the design here. Both short-running jobs and long-running jobs that need periodic delegation token refresh are supported, leveraging built-in Kubernetes constructs like Secrets. Please refer to the design doc for details.

Rejected Designs

Resource Staging by the Driver

A first implementation effectively included the ResourceStagingServer in the driver container itself. The driver container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to it. The server would then run the driver code after it had received the resources from the submission client machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The Service-based design chosen allows a cluster administrator to expose the ResourceStagingServer in a manner that makes sense for their cluster, such as with an Ingress or with a NodePort service.

Kubernetes External Shuffle Service

Several alternatives were considered for the design of the shuffle service. The first design postulated the use of long-lived executor pods and sidecar containers in them running the shuffle service. The advantage of this model was that it would let us use emptyDir for sharing as opposed to using node local storage, which guarantees better lifecycle management of storage by Kubernetes. The apparent disadvantage was that it would be a departure from the traditional Spark methodology of keeping executors for only as long as required in dynamic allocation mode. It would additionally use up more resources than strictly necessary during the course of long-running jobs, partially losing the advantage of dynamic scaling.


Another alternative considered was to use a separate shuffle service manager as a nameserver. This design has a few drawbacks. First, this means another component that needs authentication/authorization management and maintenance. Second, this separate component needs to be kept in sync with the Kubernetes cluster. Last but not least, most of functionality of this separate component can be performed by a combination of the in-cluster shuffle service and the Kubernetes API server.

Pluggable Scheduler Backends

Fully pluggable scheduler backends were considered as a more generalized solution, and remain interesting as a possible avenue for future-proofing against new scheduling targets.  For the purposes of this project, adding a new specialized scheduler backend for Kubernetes was chosen as the approach due to its very low impact on the core Spark code; making scheduler fully pluggable would be a high-impact high-risk modification to Spark’s core libraries. The pluggable scheduler backends effort is being tracked in JIRA-19700.






Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

lucas.gary@gmail.com
In reply to this post by Andrew Ash
From our perspective, we have invested heavily in Kubernetes as our cluster manager of choice.  

We also make quite heavy use of spark.  We've been experimenting with using these builds (2.1 with pyspark enabled) quite heavily.  Given that we've already 'paid the price' to operate Kubernetes in AWS it seems rational to move our jobs over to spark on k8s.  Having this project merged into the master will significantly ease keeping our Data Munging toolchain primarily on Spark.


Gary Lucas
Data Ops Team Lead
Unbounce

On 15 August 2017 at 15:52, Andrew Ash <[hidden email]> wrote:
+1 (non-binding)

We're moving large amounts of infrastructure from a combination of open source and homegrown cluster management systems to unify on Kubernetes and want to bring Spark workloads along with us.

On Tue, Aug 15, 2017 at 2:29 PM, liyinan926 <[hidden email]> wrote:
+1 (non-binding)



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Ismaël Mejía
+1 (non-binding)

This is something really great to have. More schedulers and runtime
environments are a HUGE win for the Spark ecosystem.
Amazing work, Big kudos for the guys who created and continue working on this.

On Wed, Aug 16, 2017 at 2:07 AM, [hidden email]
<[hidden email]> wrote:

> From our perspective, we have invested heavily in Kubernetes as our cluster
> manager of choice.
>
> We also make quite heavy use of spark.  We've been experimenting with using
> these builds (2.1 with pyspark enabled) quite heavily.  Given that we've
> already 'paid the price' to operate Kubernetes in AWS it seems rational to
> move our jobs over to spark on k8s.  Having this project merged into the
> master will significantly ease keeping our Data Munging toolchain primarily
> on Spark.
>
>
> Gary Lucas
> Data Ops Team Lead
> Unbounce
>
> On 15 August 2017 at 15:52, Andrew Ash <[hidden email]> wrote:
>>
>> +1 (non-binding)
>>
>> We're moving large amounts of infrastructure from a combination of open
>> source and homegrown cluster management systems to unify on Kubernetes and
>> want to bring Spark workloads along with us.
>>
>> On Tue, Aug 15, 2017 at 2:29 PM, liyinan926 <[hidden email]> wrote:
>>>
>>> +1 (non-binding)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: [hidden email]
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Jean-Baptiste Onofré
+1 as well.

Regards
JB
On Aug 16, 2017, at 10:12, "Ismaël Mejía" <[hidden email]> wrote:
+1 (non-binding)

This is something really great to have. More schedulers and runtime
environments are a HUGE win for the Spark ecosystem.
Amazing work, Big kudos for the guys who created and continue working on this.

On Wed, Aug 16, 2017 at 2:07 AM, [hidden email]
<[hidden email]> wrote:
From our perspective, we have invested heavily in Kubernetes as our cluster
manager of choice.

We also make quite heavy use of spark. We've been experimenting with using
these builds (2.1 with pyspark enabled) quite heavily. Given that we've
already 'paid the price' to operate Kubernetes in AWS it seems rational to
move our jobs over to spark on k8s. Having this project merged into the
master will significantly ease keeping our Data Munging toolchain primarily
on Spark.


Gary Lucas
Data Ops Team Lead
Unbounce

On 15 August 2017 at 15:52, Andrew Ash <[hidden email]> wrote:

+1 (non-binding)

We're moving large amounts of infrastructure from a combination of open
source and homegrown cluster management systems to unify on Kubernetes and
want to bring Spark workloads along with us.

On Tue, Aug 15, 2017 at 2:29 PM, liyinan926 <[hidden email]> wrote:

+1 (non-binding)



--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.



To unsubscribe e-mail: [hidden email]






To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: SPIP: Spark on Kubernetes

Alexander Bezzubov
In reply to this post by Ismaël Mejía
+1 (non-binding)


Looking forward using it as part of Apache Spark release, instead of Standalone cluster deployed on top of k8s.


--
Alex

On Wed, Aug 16, 2017 at 11:11 AM, Ismaël Mejía <[hidden email]> wrote:
+1 (non-binding)

This is something really great to have. More schedulers and runtime
environments are a HUGE win for the Spark ecosystem.
Amazing work, Big kudos for the guys who created and continue working on this.

On Wed, Aug 16, 2017 at 2:07 AM, [hidden email]
<[hidden email]> wrote:
> From our perspective, we have invested heavily in Kubernetes as our cluster
> manager of choice.
>
> We also make quite heavy use of spark.  We've been experimenting with using
> these builds (2.1 with pyspark enabled) quite heavily.  Given that we've
> already 'paid the price' to operate Kubernetes in AWS it seems rational to
> move our jobs over to spark on k8s.  Having this project merged into the
> master will significantly ease keeping our Data Munging toolchain primarily
> on Spark.
>
>
> Gary Lucas
> Data Ops Team Lead
> Unbounce
>
> On 15 August 2017 at 15:52, Andrew Ash <[hidden email]> wrote:
>>
>> +1 (non-binding)
>>
>> We're moving large amounts of infrastructure from a combination of open
>> source and homegrown cluster management systems to unify on Kubernetes and
>> want to bring Spark workloads along with us.
>>
>> On Tue, Aug 15, 2017 at 2:29 PM, liyinan926 <[hidden email]> wrote:
>>>
>>> +1 (non-binding)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-tp22147p22164.html
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: [hidden email]
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


123