[FLINK-33006] add e2e for flink operator ha
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 3734b59..d3970cf 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -87,6 +87,7 @@
- test_sessionjob_operations.sh
- test_multi_sessionjob.sh
- test_autoscaler.sh
+ - test_flink_operator_ha.sh
include:
- namespace: flink
extraArgs: '--create-namespace --set "watchNamespaces={default,flink}"'
@@ -113,12 +114,18 @@
test: test_autoscaler.sh
- version: v1_15
test: test_dynamic_config.sh
+ - version: v1_15
+ test: test_flink_operator_ha.sh
- version: v1_16
test: test_autoscaler.sh
- version: v1_16
test: test_dynamic_config.sh
+ - version: v1_16
+ test: test_flink_operator_ha.sh
- version: v1_17
test: test_dynamic_config.sh
+ - version: v1_17
+ test: test_flink_operator_ha.sh
- version: v1_15
java-version: 17
- version: v1_16
@@ -164,9 +171,14 @@
docker images
- name: Start the operator
run: |
+ if [[ "${{ matrix.test }}" == "test_flink_operator_ha.sh" ]]; then
+ sed -i "s/# kubernetes.operator.leader-election.enabled: false/kubernetes.operator.leader-election.enabled: true/" helm/flink-kubernetes-operator/conf/flink-conf.yaml
+ sed -i "s/# kubernetes.operator.leader-election.lease-name: flink-operator-lease/kubernetes.operator.leader-election.lease-name: flink-operator-lease/" helm/flink-kubernetes-operator/conf/flink-conf.yaml
+ sed -i "s/replicas: 1/replicas: 2/" helm/flink-kubernetes-operator/values.yaml
+ fi
helm --debug install flink-kubernetes-operator -n ${{ matrix.namespace }} helm/flink-kubernetes-operator --set image.repository=flink-kubernetes-operator --set image.tag=ci-latest ${{ matrix.extraArgs }}
kubectl wait --for=condition=Available --timeout=120s -n ${{ matrix.namespace }} deploy/flink-kubernetes-operator
- kubectl get pods
+ kubectl get pods -n ${{ matrix.namespace }}
- name: Run Flink e2e tests
run: |
sed -i "s/image: flink:.*/image: ${{ matrix.image }}/" e2e-tests/data/*.yaml
diff --git a/e2e-tests/test_dynamic_config.sh b/e2e-tests/test_dynamic_config.sh
index 26dfef3..3569d2c 100644
--- a/e2e-tests/test_dynamic_config.sh
+++ b/e2e-tests/test_dynamic_config.sh
@@ -28,11 +28,13 @@
TIMEOUT=360
-operator_namespace=${get_operator_pod_namespace}
+operator_namespace=$(get_operator_pod_namespace)
+operator_pod=$(get_operator_pod_name)
+echo "Current operator pod is ${operator_pod}"
create_namespace dynamic
kubectl config set-context --current --namespace="${operator_namespace}"
patch_flink_config '{"data": {"flink-conf.yaml": "kubernetes.operator.watched.namespaces: default,flink,dynamic"}}'
-wait_for_operator_logs "Setting default configuration to {kubernetes.operator.watched.namespaces=default,flink,dynamic}" ${TIMEOUT} || exit 1
+wait_for_operator_logs "${operator_pod}" "Setting default configuration to {kubernetes.operator.watched.namespaces=default,flink,dynamic}" ${TIMEOUT} || exit 1
echo "Successfully run the dynamic property test"
diff --git a/e2e-tests/test_flink_operator_ha.sh b/e2e-tests/test_flink_operator_ha.sh
new file mode 100644
index 0000000..4ed40d4
--- /dev/null
+++ b/e2e-tests/test_flink_operator_ha.sh
@@ -0,0 +1,71 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################
+
+# This script tests the operator HA:
+# 1. Deploy a new flink deployment and wait for job manager to come up
+# 2. Verify the operator log on existing leader
+# 3. Delete the leader operator pod
+# 4. Verify the new leader is different with the old one
+# 5. Check operator log for the flink deployment in the new leader
+
+SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
+source "${SCRIPT_DIR}/utils.sh"
+
+CLUSTER_ID="flink-example-statemachine"
+APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml"
+TIMEOUT=300
+
+on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
+
+retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
+
+wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
+jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
+
+wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
+wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
+wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
+
+job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
+
+
+# Verify operator status
+operator_namespace=$(get_operator_pod_namespace)
+display_current_lease_info
+old_operator_leader=$(find_operator_pod_with_leadership)
+
+echo "Current operator pod with leadership is ${old_operator_leader}"
+wait_for_operator_logs "${old_operator_leader}" ".default/flink-example-statemachine. Resource fully reconciled, nothing to do" ${TIMEOUT} || exit 1
+
+# Delete the leader operator pod
+delete_operator_pod_with_leadership
+
+# Wait for 20 seconds for leader election
+sleep 20
+display_current_lease_info
+new_operator_leader=$(find_operator_pod_with_leadership)
+echo "Current operator pod with leadership is ${new_operator_leader}"
+
+if [ "${new_operator_leader}" == "${old_operator_leader}" ];then
+ echo "The new operator pod with leadership is the same as old operator pod. New operator pod haven't acquire leadership"
+ exit 1
+fi
+
+wait_for_operator_logs "${new_operator_leader}" ".default/flink-example-statemachine. Resource fully reconciled, nothing to do" ${TIMEOUT} || exit 1
+echo "Successfully run the Flink Kubernetes application HA test in the new operator leader"
\ No newline at end of file
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index e3f5b74..fe48acc 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -37,16 +37,16 @@
}
function wait_for_operator_logs {
- local successful_response_regex=$1
- local timeout=$2
- operator_pod_name=$(get_operator_pod_name)
+ local operator_pod_name=$1
+ local successful_response_regex=$2
+ local timeout=$3
operator_pod_namespace=$(get_operator_pod_namespace)
# wait or timeout until the log shows up
- echo "Waiting for operator log \"$1\"..."
+ echo "Waiting for operator log \"$2\"..."
for i in $(seq 1 ${timeout}); do
- if kubectl logs $operator_pod_name -c flink-kubernetes-operator -n "${operator_pod_namespace}" | grep -E "${successful_response_regex}" >/dev/null; then
- echo "Log \"$1\" shows up."
+ if kubectl logs "${operator_pod_name}" -c flink-kubernetes-operator -n "${operator_pod_namespace}" | grep -E "${successful_response_regex}" >/dev/null; then
+ echo "Log \"$2\" shows up."
return
fi
@@ -124,12 +124,14 @@
}
function get_operator_pod_namespace() {
- operator_pod_namespace=$(kubectl get pods --selector="app.kubernetes.io/name=flink-kubernetes-operator" -o jsonpath='{..metadata.namespace}' --all-namespaces)
- if [ "$(grep -c . <<<"${operator_pod_namespace}")" != 1 ]; then
- echo "Invalid operator pod namespace: ${operator_pod_namespace}" >&2
+ # It will return multiple namespaces split by empty space if there are multiple operator instance in HA mode
+ operator_pod_namespaces=$(kubectl get pods --selector="app.kubernetes.io/name=flink-kubernetes-operator" -o jsonpath='{..metadata.namespace}' --all-namespaces)
+ operator_pod_namespac_array=(${operator_pod_namespaces})
+ if [ "$(grep -c . <<<"${operator_pod_namespac_array[0]}")" != 1 ]; then
+ echo "Invalid operator pod namespace: ${operator_pod_namespac_array[0]}" >&2
exit 1
fi
- echo "${operator_pod_namespace}"
+ echo "${operator_pod_namespac_array[0]}"
}
function get_operator_pod_name() {
@@ -184,17 +186,52 @@
kubectl patch cm flink-operator-config -n "${operator_pod_namespace}" --type merge -p "${patch}"
}
+function display_current_lease_info() {
+ operator_pod_namespace=$(get_operator_pod_namespace)
+ lease=$(kubectl get lease flink-operator-lease -o yaml -n "${operator_pod_namespace}")
+ echo "Current lease content: ${lease}"
+}
+
+function find_operator_pod_with_leadership() {
+ operator_pod_namespace=$(get_operator_pod_namespace)
+ active_pod_name=$(kubectl get lease flink-operator-lease -o jsonpath='{..spec.holderIdentity}' -n "${operator_pod_namespace}")
+ if [ "$(grep -c . <<<"${active_pod_name}")" != 1 ]; then
+ echo "Invalid leader operator pod name: ${active_pod_name}" >&2
+ exit 1
+ fi
+ echo "${active_pod_name}"
+}
+
+function delete_operator_pod_with_leadership() {
+ active_pod_name=$(find_operator_pod_with_leadership)
+ echo "Leader Operator Pod is ${active_pod_name}"
+ kubectl delete pod "${active_pod_name}" -n "${operator_pod_namespace}"
+ echo "Leader Operator Pod ${active_pod_name} is deleted"
+}
+
function debug_and_show_logs {
echo "Debugging failed e2e test:"
echo "Currently existing Kubernetes resources"
kubectl get all
kubectl describe all
- echo "Operator logs:"
operator_pod_namespace=$(get_operator_pod_namespace)
- operator_pod_name=$(get_operator_pod_name)
- echo "Operator namespace: ${operator_pod_namespace} pod: ${operator_pod_name}"
- kubectl logs -n "${operator_pod_namespace}" "${operator_pod_name}"
+ operator_pod_names=$(get_operator_pod_name)
+ echo "Currently existing Kubernetes resources of operator namespace"
+ kubectl get all -n "${operator_pod_namespace}"
+ kubectl describe all -n "${operator_pod_namespace}"
+
+ operator_pod_namespaces_array=(${operator_pod_names})
+ length=${#operator_pod_namespaces_array[@]}
+
+ # There are two operator pods in HA mode
+ for (( i=0; i<${length}; i++ ));
+ do
+ echo "Operator ${operator_pod_namespaces_array[$i]} logs:"
+ echo "Operator namespace: ${operator_pod_namespace} pod: ${operator_pod_namespaces_array[$i]}}"
+
+ kubectl logs -n "${operator_pod_namespace}" "${operator_pod_namespaces_array[$i]}"
+ done
echo "Flink logs:"
kubectl get pods -o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' | while read pod;do