blob: 9e34dbebe2e59a2feebbf6c2424e1887a214e6e4 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Spark Operator API
The core user facing API of the Spark Kubernetes Operator is the `SparkApplication` and
`SparkCluster` Custom Resources Definition (CRD). Spark custom resource extends
standard k8s API, defines Spark Application spec and tracks status.
Once the Spark Operator is installed and running in your Kubernetes environment, it will
continuously watch SparkApplication(s) and SparkCluster(s) submitted, via k8s API client or
kubectl by the user, orchestrate secondary resources (pods, configmaps .etc).
Please check out the [quickstart](../README.md) as well for installing operator.
## SparkApplication
SparkApplication can be defined in YAML format. User may configure the application entrypoint
and configurations. Let's start with the [Spark-Pi example](../examples/pi.yaml):
```yaml
apiVersion: spark.apache.org/v1beta1
kind: SparkApplication
metadata:
name: pi
spec:
# Entry point for the app
mainClass: "org.apache.spark.examples.SparkPi"
jars: "local:///opt/spark/examples/jars/spark-examples.jar"
sparkConf:
spark.dynamicAllocation.enabled: "true"
spark.dynamicAllocation.shuffleTracking.enabled: "true"
spark.dynamicAllocation.maxExecutors: "3"
spark.kubernetes.authenticate.driver.serviceAccountName: "spark"
spark.kubernetes.container.image: "apache/spark:4.0.0"
applicationTolerations:
resourceRetainPolicy: OnFailure
runtimeVersions:
scalaVersion: "2.13"
sparkVersion: "4.0.0"
```
After application is submitted, Operator will add status information to your application based on
the observed state:
```bash
kubectl get sparkapp pi -o yaml
```
### Write and build your SparkApplication
It's straightforward to convert your spark-submit application to `SparkApplication` yaml.
Operators constructs driver spec in the similar approach. To submit Java / scala application,
use `.spec.jars` and `.spec.mainClass`. Similarly, set `pyFiles` for Python applications.
While building images to use by driver and executor, it's recommended to use official
[Spark Docker](https://github.com/apache/spark-docker) as base images. Check the pod template
support (`.spec.driverSpec.podTemplateSpec` and `.spec.executorSpec.podTemplateSpec`) as well for
setting custom Spark home and work dir.
### Pod Template Support
It is possible to configure pod template for driver & executor pods for configure spec that are
not configurable from SparkConf.
Spark Operator supports defining pod template for driver and executor pods in two ways:
1. Set `PodTemplateSpec` in `SparkApplication`
2. Config `spark.kubernetes.[driver/executor].podTemplateFile`
If pod template spec is set in application spec (option 1), it would take higher precedence
than option 2. Also `spark.kubernetes.[driver/executor].podTemplateFile` would be unset to
avoid multiple override.
When pod template is set as remote file in conf properties (option 2), please ensure Spark
Operator has necessary permission to access the remote file location, e.g. deploy operator
with proper workload identity with target S3 / Cloud Storage bucket access. Similar permission
requirements are also needed driver pod: operator needs template file access to create driver,
and driver needs the same for creating executors.
Please be advised that Spark still overrides necessary pod configuration in both options. For
more details,
refer [Spark doc](https://spark.apache.org/docs/latest/running-on-kubernetes.html#pod-template).
## Enable Additional Ingress for Driver
Operator may create [Ingress](https://kubernetes.io/docs/concepts/services-networking/ingress/) for
Spark driver of running applications on demand. For example, to expose Spark UI - which is by
default enabled on driver port 4040, you may configure
```yaml
spec:
driverServiceIngressList:
- serviceMetadata:
name: "spark-ui-service"
serviceSpec:
ports:
- protocol: TCP
port: 80
targetPort: 4040
ingressMetadata:
name: "spark-ui-ingress"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
ingressSpec:
ingressClassName: nginx-example
rules:
- http:
paths:
- path: "/"
pathType: Prefix
backend:
service:
name: spark-ui-service
port:
number: 80
```
Spark Operator by default would populate the `.spec.selector` field of the created Service to match
the driver labels. If `.ingressSpec.rules` is not provided, Spark Operator would also populate one
default rule backed by the associated Service. It's recommended to always provide the ingress spec
to make sure it's compatible with your
[IngressController](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/).
## Create and Mount ConfigMap
It is possible to ask operator to create configmap so they can be used by driver and/or executor
pods on the fly. `configMapSpecs` allows you to specify the desired metadata and data as string
literals for the configmap(s) to be created.
```yaml
spec:
configMapSpecs:
- name: "example-config-map"
data:
foo: "bar"
```
Like other app-specific resources, the created configmap has owner reference to Spark driver and
therefore shares the same lifecycle and garbage collection mechanism with the associated app.
This feature can be used to create lightweight override config files for given Spark app. For
example, below snippet would create and mount a configmap with metrics property file, then use it
in SparkConf:
```yaml
spec:
sparkConf:
spark.metrics.conf: "/etc/metrics/metrics.properties"
driverSpec:
podTemplateSpec:
spec:
containers:
- volumeMounts:
- name: "config-override"
mountPath: "/etc/metrics"
readOnly: true
volumes:
- name: config-override
configMap:
name: metrics-configmap
executorSpec:
podTemplateSpec:
spec:
containers:
- volumeMounts:
- name: "config-override"
mountPath: "/etc/metrics"
readOnly: true
volumes:
- name: config-override
configMap:
name: metrics-configmap
configMapSpecs:
- name: "metrics-configmap"
data:
metrics.properties: "*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink\n"
```
## Understanding Failure Types
In addition to the general `Failed` state (that driver pod fails or driver container exits
with non-zero code), Spark Operator introduces a few different failure state for ease of
app status monitoring at high level, and for ease of setting up different handlers if users
are creating / managing SparkApplications with external microservices or workflow engines.
Spark Operator recognizes "infrastructure failure" in the best effort way. It is possible to
configure different restart policy on general failure(s) vs. on potential infrastructure
failure(s). For example, you may configure the app to restart only upon infrastructure
failures. If Spark application fails as a result of `DriverStartTimedOut`,
`ExecutorsStartTimedOut`, `SchedulingFailure`.
It is more likely that the app failed as a result of infrastructure reason(s), including
scenarios like driver or executors cannot be scheduled or cannot initialize in configured
time window for scheduler reasons, as a result of insufficient capacity, cannot get IP
allocated, cannot pull images, or k8s API server issue at scheduling .etc.
Please be advised that this is a best-effort failure identification. You may still need to
debug actual failure from the driver pods. Spark Operator would stage the last observed
driver pod status with the stopping state for audit purposes.
## Configure the Tolerations for SparkApplication
### Restart
Spark Operator enables configure app restart behavior for different failure types. Here's a
sample restart config snippet:
``` yaml
restartConfig:
# accptable values are 'Never', 'Always', 'OnFailure' and 'OnInfrastructureFailure'
restartPolicy: Never
# operator would retry the application if configured. All resources from current attepmt
# would be deleted before starting next attempt
maxRestartAttempts: 3
# backoff time (in millis) that operator would wait before next attempt
restartBackoffMillis: 30000
```
### Timeouts
It's possible to configure applications to be proactively terminated and resubmitted in particular
cases to avoid resource deadlock.
| Field | Type | Default Value | Descritpion |
|-----------------------------------------------------------------------------------------|---------|---------------|--------------------------------------------------------------------------------------------------------------------|
| .spec.applicationTolerations.applicationTimeoutConfig.driverStartTimeoutMillis | integer | 300000 | Time to wait for driver reaches running state after requested driver. |
| .spec.applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis | integer | 300000 | Time to wait for driver to acquire minimal number of running executors. |
| .spec.applicationTolerations.applicationTimeoutConfig.forceTerminationGracePeriodMillis | integer | 300000 | Time to wait for force delete resources at the end of attempt. |
| .spec.applicationTolerations.applicationTimeoutConfig.driverReadyTimeoutMillis | integer | 300000 | Time to wait for driver reaches ready state. |
| .spec.applicationTolerations.applicationTimeoutConfig.terminationRequeuePeriodMillis | integer | 2000 | Back-off time when releasing resource need to be re-attempted for application. |
### Instance Config
Instance Config helps operator to decide whether an application is running healthy. When
the underlying cluster has batch scheduler enabled, you may configure the apps to be
started if and only if there are sufficient resources. If, however, the cluster does not
have a batch scheduler, operator may help avoid app hanging with `InstanceConfig` that
describes the bare minimal tolerable scenario.
For example, with below spec:
```yaml
applicationTolerations:
instanceConfig:
minExecutors: 3
initExecutors: 5
maxExecutors: 10
sparkConf:
spark.executor.instances: "10"
```
Spark would try to bring up 10 executors as defined in SparkConf. In addition, from
operator perspective,
* If Spark app acquires less than 5 executors in given tine window (.spec.
applicationTolerations.applicationTimeoutConfig.executorStartTimeoutMillis) after
submitted, it would be shut down proactively in order to avoid resource deadlock.
* Spark app would be marked as 'RunningWithBelowThresholdExecutors' if it loses executors after
successfully start up.
* Spark app would be marked as 'RunningHealthy' if it has at least min executors after
successfully started up.
### Delete Resources On Termination
Operator by default would delete all created resources at the end of an attempt. It would
try to record the last observed driver status in `status` field of the application for
troubleshooting purpose.
On the other hand, when developing an application, it's possible to configure
```yaml
applicationTolerations:
# Acceptable values are 'Always', 'OnFailure', 'Never'
resourceRetentionPolicy: OnFailure
```
to avoid operator attempt to delete driver pod and driver resources if app fails. Similarly,
if resourceRetentionPolicy is set to `Always`, operator would not delete driver resources
when app ends. Note that this applies only to operator-created resources (driver pod, SparkConf
configmap .etc). You may also want to tune `spark.kubernetes.driver.service.deleteOnTermination`
and `spark.kubernetes.executor.deleteOnTermination` to control the behavior of driver-created
resources.
## Spark Cluster
Spark Operator also supports launching Spark clusters in k8s via `SparkCluster` custom resource,
which takes minimal effort to specify desired master and worker instances spec.
To deploy a Spark cluster, you may start with specifying the desired Spark version, worker count as
well as the SparkConf as in the [example](../examples/qa-cluster-with-one-worker.yaml). Master &
worker instances would be deployed as [StatefulSets](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/)
and exposed via k8s [service(s)](https://kubernetes.io/docs/concepts/services-networking/service/).
Like Pod Template Support for Applications, it's also possible to submit template(s) for the Spark
instances for `SparkCluster` to configure spec that's not supported via SparkConf. It's worth notice
that Spark may overwrite certain fields.