title: “Custom Operator Plugins” weight: 5 type: docs aliases:
The operator provides a customizable platform for Flink resource management. Users can develop plugins to tailor the operator behaviour to their own needs.
FlinkResourceValidator
, an interface for validating the resources of FlinkDeployment
and FlinkSessionJob
, is a pluggable component based on the Plugins mechanism. During development, we can customize the implementation of FlinkResourceValidator
and make sure to retain the service definition in META-INF/services
. The following steps demonstrate how to develop and use a custom validator.
Implement FlinkResourceValidator
interface:
package org.apache.flink.kubernetes.operator.validation; import org.apache.flink.kubernetes.operator.crd.FlinkDeployment; import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob; import java.util.Optional; /** Custom validator implementation of {@link FlinkResourceValidator}. */ public class CustomValidator implements FlinkResourceValidator { @Override public Optional<String> validateDeployment(FlinkDeployment deployment) { if (deployment.getSpec().getFlinkVersion() == null) { return Optional.of("Flink Version must be defined."); } return Optional.empty(); } @Override public Optional<String> validateSessionJob( FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) { if (sessionJob.getSpec().getJob() == null) { return Optional.of("The job spec should not be empty"); } return Optional.empty(); } }
Create service definition file org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
in META-INF/services
. With custom FlinkResourceValidator
implementation, the service definition describes as follows:
org.apache.flink.kubernetes.operator.validation.CustomValidator
Use the Maven tool to package the project and generate the custom validator JAR.
Create Dockerfile to build a custom image from the apache/flink-kubernetes-operator
official image and copy the generated JAR to custom validator plugin directory. /opt/flink/plugins
is the value of FLINK_PLUGINS_DIR
environment variable in the flink-kubernetes-operator helm chart. The structure of custom validator directory under /opt/flink/plugins
is as follows:
/opt/flink/plugins ├── custom-validator │ ├── custom-validator.jar └── ...
With the custom validator directory location, the Dockerfile is defined as follows:
FROM apache/flink-kubernetes-operator ENV FLINK_PLUGINS_DIR=/opt/flink/plugins ENV CUSTOM_VALIDATOR_DIR=custom-validator RUN mkdir $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR COPY custom-validator.jar $FLINK_PLUGINS_DIR/$CUSTOM_VALIDATOR_DIR/
Install the flink-kubernetes-operator helm chart with the custom image and verify the deploy/flink-kubernetes-operator
log has:
2022-05-04 14:01:46,551 o.a.f.k.o.u.FlinkUtils [INFO ] Discovered resource validator from plugin directory[/opt/flink/plugins]: org.apache.flink.kubernetes.operator.validation.CustomValidator.
The Flink Kubernetes Operator allows users to listen to events and status updates triggered for the Flink Resources managed by the operator. This feature enables tighter integration with the user's own data platform.
By implementing the FlinkResourceListener
interface users can listen to both events and status updates per resource type (FlinkDeployment
/ FlinkSessionJob
). These methods will be called after the respective events have been triggered by the system. Using the context provided on each listener method users can also get access to the related Flink resource and the KubernetesClient
itself in order to trigger any further events etc on demand.
Similar to custom validator implementations, resource listeners are loaded via the Flink Plugins mechanism.
In order to enable your custom FlinkResourceListener
you need to:
org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener
in META-INF/services
/opt/flink/plugins
)In some cases, users may need to add required dependencies onto the operator classpath.
When building the custom image, The additional dependencies shall be copied to /opt/flink/operator-lib
with the environment variable: OPERATOR_LIB
That folder is added to classpath upon initialization.
FROM apache/flink-kubernetes-operator ARG ARTIFACT1=custom-artifact1.jar ARG ARTIFACT2=custom-artifact2.jar COPY target/$ARTIFACT1 $OPERATOR_LIB COPY target/$ARTIFACT2 $OPERATOR_LIB