[FLINK-35267][snapshot] FlinkStateSnapshot documentation and examples
diff --git a/docs/content/docs/concepts/controller-flow.md b/docs/content/docs/concepts/controller-flow.md
index b6d876b..8e8d84c 100644
--- a/docs/content/docs/concepts/controller-flow.md
+++ b/docs/content/docs/concepts/controller-flow.md
@@ -98,7 +98,7 @@
The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation flow for all Flink resource types. Let’s take a look at the high level flow before we go into specifics for session, application and session job resources.
1. Check if the resource is ready for reconciliation or if there are any pending operations that should not be interrupted (manual savepoints for example)
-2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `initialSavepointPath` provided in the spec.
+2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `flinkStateSnapshotReference` provided in the spec.
3. Next we determine if the desired spec changed and the type of change: `IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to execute further reconciliation logic.
4. If we have upgrade/scale spec changes we execute the upgrade logic specific for the resource type
5. If we did not receive any spec change we still have to ensure that the currently deployed resources are fully reconciled:
diff --git a/docs/content/docs/concepts/overview.md b/docs/content/docs/concepts/overview.md
index 2544f60..d1b1a9c 100644
--- a/docs/content/docs/concepts/overview.md
+++ b/docs/content/docs/concepts/overview.md
@@ -56,6 +56,8 @@
- Collect lag and utilization metrics
- Scale job vertices to the ideal parallelism
- Scale up and down as the load changes
+- [Snapshot management]({{< ref "docs/custom-resource/snapshots" >}})
+ - Manage snapshots via Kubernetes CRs
### Operations
- Operator [Metrics]({{< ref "docs/operations/metrics-logging#metrics" >}})
- Utilizes the well-established [Flink Metric System](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics)
diff --git a/docs/content/docs/custom-resource/job-management.md b/docs/content/docs/custom-resource/job-management.md
index 74d3b02..790c825 100644
--- a/docs/content/docs/custom-resource/job-management.md
+++ b/docs/content/docs/custom-resource/job-management.md
@@ -167,56 +167,6 @@
Restarts work exactly the same way as other application upgrades and follow the semantics detailed in the previous section.
-## Savepoint management
-
-Savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
-
-For backup, job forking and other purposes savepoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation.
-
-### Manual Savepoint Triggering
-
-Users can trigger savepoints manually by defining a new (different/random) value to the variable `savepointTriggerNonce` in the job specification:
-
-```yaml
- job:
- ...
- savepointTriggerNonce: 123
-```
-
-Changing the nonce value will trigger a new savepoint. Information about pending and last savepoint is stored in the resource status.
-
-### Periodic Savepoint Triggering
-
-The operator also supports periodic savepoint triggering through the following config option which can be configured on a per job level:
-
-```yaml
- flinkConfiguration:
- ...
- kubernetes.operator.periodic.savepoint.interval: 6h
-```
-
-There is no guarantee on the timely execution of the periodic savepoints as they might be delayed by unhealthy job status or other interfering user operation.
-
-### Savepoint History
-
-The operator automatically keeps track of the savepoint history triggered by upgrade or manual savepoint operations.
-This is necessary so cleanup can be performed by the operator for old savepoints.
-
-Users can control the cleanup behaviour by specifying a maximum age and maximum count for the savepoints in the history.
-
-```
-kubernetes.operator.savepoint.history.max.age: 24 h
-kubernetes.operator.savepoint.history.max.count: 5
-```
-
-{{< hint info >}}
-Savepoint cleanup happens lazily and only when the application is running.
-It is therefore very likely that savepoints live beyond the max age configuration.
-{{< /hint >}}
-
-To disable savepoint cleanup by the operator you can set `kubernetes.operator.savepoint.cleanup.enabled: false`.
-When savepoint cleanup is disabled the operator will still collect and populate the savepoint history but not perform any dispose operations.
-
## Recovery of missing job deployments
When HA is enabled, the operator can recover the Flink cluster deployments in cases when it was accidentally deleted
@@ -297,16 +247,17 @@
### Redeploy using the savepointRedeployNonce
-It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `initialSavepointPath` in the job spec:
+It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `flinkStateSnapshotReference` in the job spec:
```yaml
job:
- initialSavepointPath: file://redeploy-target-savepoint
+ flinkStateSnapshotReference:
+ path: file://redeploy-target-savepoint
# If not set previously, set to 1, otherwise increment, e.g. 2
savepointRedeployNonce: 1
```
-When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `initialSavepointPath`. The savepoint path must not be empty.
+When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint path must not be empty.
{{< hint warning >}}
Rollbacks are not supported after redeployments.
@@ -320,7 +271,7 @@
1. Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory.
2. Delete the `FlinkDeployment` resource for your application
3. Check that you have the current savepoint, and that your `FlinkDeployment` is deleted completely
- 4. Modify your `FlinkDeployment` JobSpec and set the `initialSavepointPath` to your last checkpoint location
+ 4. Modify your `FlinkDeployment` JobSpec and set `flinkStateSnapshotReference.path` to your last checkpoint location
5. Recreate the deployment
These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover.
diff --git a/docs/content/docs/custom-resource/overview.md b/docs/content/docs/custom-resource/overview.md
index 3c7cc94..6c3aadc 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -37,6 +37,9 @@
- Flink application managed by the `FlinkDeployment`
- Empty Flink session managed by the `FlinkDeployment` + multiple jobs managed by the `FlinkSessionJobs`. The operations on the session jobs are independent of each other.
+To help managing snapshots, there is another CR called FlinkStateSnapshot. This can be created by the operator in case of periodic and upgrade savepoints/checkpoints, or manually by the user to trigger a savepoint/checkpoint for a job.
+FlinkStateSnapshots will always have a FlinkDeployment or FlinkSessionJob linked to them in their spec.
+
## FlinkDeployment
FlinkDeployment objects are defined in YAML format by the user and must contain the following required fields:
@@ -218,6 +221,7 @@
## Further information
+ - [Snapshots]({{< ref "docs/custom-resource/snapshots" >}})
- [Job Management and Stateful upgrades]({{< ref "docs/custom-resource/job-management" >}})
- [Deployment customization and pod templates]({{< ref "docs/custom-resource/pod-template" >}})
- [Full Reference]({{< ref "docs/custom-resource/reference" >}})
diff --git a/docs/content/docs/custom-resource/snapshots.md b/docs/content/docs/custom-resource/snapshots.md
new file mode 100644
index 0000000..6c874c6
--- /dev/null
+++ b/docs/content/docs/custom-resource/snapshots.md
@@ -0,0 +1,195 @@
+---
+title: "Snapshots"
+weight: 3
+type: docs
+aliases:
+ - /custom-resource/snapshots.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Snapshots
+
+To create, list and delete snapshots you can use the custom resource called FlinkStateSnapshot.
+The operator will use the same controller flow as in the case of FlinkDeployment and FlinkSessionJob to trigger the savepoint/checkpoint and observe its status.
+
+This feature deprecates the old `savepointInfo` and `checkpointInfo` fields found in the Flink resource CR status, alongside with spec fields `initialSavepointPath`, `savepointTriggerNonce` and `checkpointTriggerNonce`.
+It is enabled by default using the configuration option `kubernetes.operator.snapshot.resource.enabled`.
+If you set this to false, the operator will keep using the deprecated status fields to track snapshots.
+
+## Overview
+
+To create a savepoint or checkpoint, exactly one of the spec fields `savepoint` or `checkpoint` must present.
+Furthermore, in case of a savepoint you can signal to the operator that the savepoint already exists using the `alreadyExists` field, and the operator will mark it as a successful snapshot in the next reconciliation phase.
+
+You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using `flinkStateSnapshotReference` in the job spec.
+
+## Examples
+
+### Savepoint
+
+```yaml
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-savepoint
+spec:
+ backoffLimit: 1 # retry count, -1 for infinite, 0 for no retries (default: -1)
+ jobReference:
+ kind: FlinkDeployment # FlinkDeployment or FlinkSessionJob
+ name: example-deployment # name of the resource
+ savepoint:
+ alreadyExists: false # optional (default: false), if true, the path is considered to already exist and state will be COMPLETED on first reconciliation
+ disposeOnDelete: true # optional (default: true), dispose of savepoint when this FlinkStateSnapshot is removed, job needs to be running
+ formatType: CANONICAL # optional (default: CANONICAL), format type of savepoint
+ path: /flink-data/savepoints-custom # optional (default: job savepoint path)
+```
+
+### Checkpoint
+
+```yaml
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-checkpoint
+spec:
+ backoffLimit: 1
+ jobReference:
+ kind: FlinkDeployment
+ name: example-deployment
+ checkpoint: {}
+```
+
+### Start job from existing snapshot
+
+```yaml
+ job:
+ flinkStateSnapshotReference:
+ namespace: flink # not required if it's in the same namespace
+ name: example-savepoint
+```
+
+{{< hint warning >}}
+While it is possible to start a job from a FlinkStateSnapshot with checkpoint type, checkpoint data is owned by Flink, and might be deleted by Flink anytime after triggering the checkpoint.
+{{< /hint >}}
+
+
+## Snapshot CR lifecycle
+
+### Snapshot creation
+
+When a new FlinkStateSnapshot CR is created, in the first reconciliation phase the operator will trigger the savepoint/checkpoint for the linked deployment via REST API.
+The resulting trigger ID will be added to the CR Status.
+
+In the next observation phase the operator will check all the in-progress snapshots and query their state.
+If the snapshot was successful, the path will be added to the CR Status.
+
+If the triggered snapshot is a savepoint and `spec.savepoint.alreadyExists` is set to true, on the first reconciliation the operator will populate its `status` fields with `COMPLETED` state, and copy the savepoint path found in the spec to `status.path`.
+
+### Snapshot errors
+
+If the operator encountered any errors during snapshot observation/reconciliation, the `error` field will be populated in the CR status and the `failures` field will be incremented by 1.
+If the backoff limit specified in the spec is reached, the snapshot will enter a `FAILED` state, and won't be retried.
+If it's not reached, the Operator will continuously back off retrying the snapshot (10s, 20s, 40s, ...).
+
+In case of any error there will also be a new Event generated for the snapshot resource containing the error message.
+
+{{< hint info >}}
+For checkpoints, after the operator has ensured that the checkpoint was successful, it will attempt to fetch its final path via Flink REST API.
+Any errors experienced during this step will generate a Kubernetes event, but will not populate the `error` field, and will mark the checkpoint as `COMPLETED`.
+The `path` field will stay empty though.
+{{< /hint >}}
+
+### Snapshot abandonment
+
+If the referenced Flink job can't be found or is stopped after triggering a snapshot, the state of the snapshot will be `ABANDONED` and won't be retried.
+
+### Savepoint disposal on deletion
+
+In case of savepoints, if `spec.savepoint.disposeOnDelete` is true, the operator will automatically dispose the savepoint on the filesystem when the CR gets deleted.
+This however requires the referenced Flink resource to be alive, as this operation is done using Flink REST API.
+
+This feature is not available for checkpoints.
+
+
+## Triggering snapshots
+
+Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
+In this case, the savepoint path will also be recorded in the `upgradeSnapshotReference` job status field, which the operator will use when restarting the job.
+
+For backup, job forking and other purposes savepoint and checkpoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation.
+
+### Manual Checkpoint Triggering
+
+Users can trigger snapshots manually by defining a new (different/random) value to the variable `savepointTriggerNonce` or `checkpointTriggerNonce` in the job specification:
+
+```yaml
+ job:
+ ...
+ savepointTriggerNonce: 123
+ checkpointTriggerNonce: 123
+ ...
+```
+
+Changing the nonce value will trigger a new snapshot. If FlinkStateSnapshot resources are enabled, a new snapshot CR will be automatically created.
+If disabled, information about pending and last snapshots is stored in the FlinkDeployment/FlinkSessionJob CR status.
+
+### Periodic Snapshot Triggering
+
+The operator also supports periodic snapshot triggering through the following config option which can be configured on a per job level:
+
+```yaml
+ flinkConfiguration:
+ ...
+ kubernetes.operator.periodic.savepoint.interval: 6h
+ kubernetes.operator.periodic.checkpoint.interval: 6h
+```
+
+There is no guarantee on the timely execution of the periodic snapshots as they might be delayed by unhealthy job status or other interfering user operation.
+
+### Snapshot History
+
+The operator automatically keeps track of the snapshot history triggered by upgrade, manual and periodic snapshot operations.
+This is necessary so cleanup can be performed by the operator for old snapshots.
+
+Users can control the cleanup behaviour by specifying a maximum age and maximum count for the savepoint and checkpoint resources in the history.
+
+```
+kubernetes.operator.savepoint.history.max.age: 24 h
+kubernetes.operator.savepoint.history.max.count: 5
+
+kubernetes.operator.checkpoint.history.max.age: 24 h
+kubernetes.operator.checkpoint.history.max.count: 5
+```
+
+{{< hint warning >}}
+Checkpoint history history cleanup is only supported if FlinkStateSnapshot resources are enabled.
+This operation will only delete the FlinkStateSnapshot CR, and will never delete any checkpoint data on the filesystem.
+{{< /hint >}}
+
+{{< hint info >}}
+Savepoint cleanup happens lazily and only when the Flink resource associated with the snapshot is running.
+It is therefore very likely that savepoints live beyond the max age configuration.
+{{< /hint >}}
+
+To also dispose of savepoint data on savepoint cleanup, set `kubernetes.operator.savepoint.dispose-on-delete: true`.
+This config will set `spec.savepoint.disposeOnDelete` to true for FlinkStateSnapshot CRs created by periodic savepoints and manual ones created using `savepointTriggerNonce`.
+
+To disable savepoint/checkpoint cleanup by the operator you can set `kubernetes.operator.savepoint.cleanup.enabled: false` and `kubernetes.operator.checkpoint.cleanup.enabled: false`.
+
diff --git a/docs/content/docs/operations/upgrade.md b/docs/content/docs/operations/upgrade.md
index e9b7ec9..65f188b 100644
--- a/docs/content/docs/operations/upgrade.md
+++ b/docs/content/docs/operations/upgrade.md
@@ -148,19 +148,20 @@
```
5. Restore the job:
- Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.initialSavepointPath` to the savepoint location obtained from the step 1.
+ Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` to the savepoint location obtained from the step 1.
```
spec:
...
job:
- initialSavepointPath: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
+ flinkStateSnapshotReference:
+ path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
upgradeMode: savepoint
...
```
Alternatively, we may use this command to edit and deploy the manifest:
```sh
- wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.initialSavepointPath" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
+ wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.flinkStateSnapshotReference.path" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
```
Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
```
diff --git a/examples/snapshot/checkpoint.yaml b/examples/snapshot/checkpoint.yaml
new file mode 100644
index 0000000..0e3a56d
--- /dev/null
+++ b/examples/snapshot/checkpoint.yaml
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-checkpoint
+spec:
+ backoffLimit: 0
+ jobReference:
+ kind: FlinkDeployment
+ name: example-deployment
+ checkpoint: {} # This will specify that we want a checkpoint
diff --git a/examples/snapshot/job-from-savepoint.yaml b/examples/snapshot/job-from-savepoint.yaml
new file mode 100644
index 0000000..698ad22
--- /dev/null
+++ b/examples/snapshot/job-from-savepoint.yaml
@@ -0,0 +1,69 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Create a FlinkStateSnapshot with an already existing savepoint on the FS, and starts a new job with that snapshot.
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-savepoint
+spec:
+ savepoint:
+ alreadyExists: true
+ disposeOnDelete: false
+ path: file:///flink-data/savepoints/savepoint-45305c-d867504446e0
+
+---
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+ name: example-deployment
+spec:
+ image: flink:1.19
+ flinkVersion: v1_19
+ flinkConfiguration:
+ state.checkpoints.dir: file:///flink-data/checkpoints
+ state.savepoints.dir: file:///flink-data/savepoints
+ serviceAccount: flink
+ jobManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ podTemplate:
+ spec:
+ containers:
+ - name: flink-main-container
+ volumeMounts:
+ - mountPath: /flink-data
+ name: flink-volume
+ volumes:
+ - name: flink-volume
+ hostPath:
+ path: /tmp/flink
+ type: Directory
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: savepoint
+ flinkStateSnapshotReference:
+ name: example-savepoint
+ namespace: flink
diff --git a/examples/snapshot/savepoint-already-exists.yaml b/examples/snapshot/savepoint-already-exists.yaml
new file mode 100644
index 0000000..ebacde1
--- /dev/null
+++ b/examples/snapshot/savepoint-already-exists.yaml
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-savepoint
+spec:
+ savepoint:
+ alreadyExists: true
+ disposeOnDelete: false
+ path: /flink-data/savepoints/savepoint-98682a-21931bf10c39
diff --git a/examples/snapshot/savepoint-job-defaults.yaml b/examples/snapshot/savepoint-job-defaults.yaml
new file mode 100644
index 0000000..9cd26e3
--- /dev/null
+++ b/examples/snapshot/savepoint-job-defaults.yaml
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-savepoint
+spec:
+ backoffLimit: 0
+ jobReference:
+ kind: FlinkDeployment
+ name: example-deployment
+ savepoint: {} # This will use the savepoint path configured in the job config.
diff --git a/examples/snapshot/savepoint-job-full-spec.yaml b/examples/snapshot/savepoint-job-full-spec.yaml
new file mode 100644
index 0000000..5a0356a
--- /dev/null
+++ b/examples/snapshot/savepoint-job-full-spec.yaml
@@ -0,0 +1,32 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkStateSnapshot
+metadata:
+ name: example-savepoint
+spec:
+ backoffLimit: 0
+ jobReference:
+ kind: FlinkDeployment
+ name: example-deployment
+ savepoint:
+ alreadyExists: false
+ disposeOnDelete: true
+ formatType: CANONICAL
+ path: /flink-data/savepoints-custom