[FLINK-37236] Update e2e jar handling logic and improve debugging
diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml
index 9a1f6ce..26f90a8 100644
--- a/.github/workflows/e2e.yaml
+++ b/.github/workflows/e2e.yaml
@@ -84,10 +84,19 @@
if [[ "${{ inputs.append-java-version }}" == "true" ]]; then
FLINK_IMAGE=${FLINK_IMAGE}-java${{ inputs.java-version }}
fi
+
+ EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
+ if [[ ${{ inputs.flink-version }} == v2* ]]; then
+ EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
+ fi
+ ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')
+
echo FLINK_IMAGE=${FLINK_IMAGE}
+ echo EXAMPLES_JAR=${EXAMPLES_JAR}
sed -i "s/image: flink:.*/image: ${FLINK_IMAGE}/" e2e-tests/data/*.yaml
sed -i "s/flinkVersion: .*/flinkVersion: ${{ inputs.flink-version }}/" e2e-tests/data/*.yaml
sed -i "s/mode: .*/mode: ${{ inputs.mode }}/" e2e-tests/data/*.yaml
+ sed -i "s/STREAMING_EXAMPLES_JAR_URL/${ESCAPED_EXAMPLES_JAR}/" e2e-tests/data/*.yaml
git diff HEAD
echo "Running e2e-tests/$test"
bash e2e-tests/${{ inputs.test }} || exit 1
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 0a9703e..f484767 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -31,7 +31,7 @@
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -44,7 +44,7 @@
image: busybox:1.35.0
imagePullPolicy: IfNotPresent
# Use wget or other tools to get user jars from remote storage
- command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar', '-O', '/flink-artifact/myjob.jar' ]
+ command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
volumeMounts:
- mountPath: /flink-artifact
name: flink-artifact
diff --git a/e2e-tests/data/multi-sessionjob.yaml b/e2e-tests/data/multi-sessionjob.yaml
index bc48c20..e8f84ce 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -31,7 +31,7 @@
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -79,7 +79,7 @@
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -120,7 +120,7 @@
spec:
deploymentName: session-cluster-1
job:
- jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+ jarURI: STREAMING_EXAMPLES_JAR_URL
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
@@ -134,7 +134,7 @@
spec:
deploymentName: session-cluster-1
job:
- jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+ jarURI: STREAMING_EXAMPLES_JAR_URL
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml
index f43e341..1d96988 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -31,7 +31,7 @@
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -73,7 +73,7 @@
spec:
deploymentName: session-cluster-1
job:
- jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+ jarURI: STREAMING_EXAMPLES_JAR_URL
parallelism: 2
upgradeMode: savepoint
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index 5356a40..bf2f0e6 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -231,8 +231,14 @@
function debug_and_show_logs {
echo "Debugging failed e2e test:"
echo "Currently existing Kubernetes resources"
+ kubectl get flinkdeployments
+ kubectl get flinksessionjobs
kubectl get all
+ kubectl get configmaps
+ kubectl describe flinkdeployments
+ kubectl describe flinksessionjobs
kubectl describe all
+ kubectl describe configmaps
operator_pod_namespace=$(get_operator_pod_namespace)
operator_pod_names=$(get_operator_pod_name)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index a6db17b..71ec3de 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -285,11 +285,13 @@
boolean cancellable = allowLastStateCancel(ctx);
if (running) {
- return getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
+ var mode = getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
+ LOG.info("Job is running, using {} for last-state upgrade", mode);
+ return mode;
}
if (cancellable) {
- LOG.info("Using cancel to perform last-state upgrade");
+ LOG.info("Job is not running, using cancel to perform last-state upgrade");
return JobUpgrade.lastStateUsingCancel();
}
}
@@ -356,8 +358,11 @@
}
var conf = ctx.getObserveConfig();
- return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB)
- || !ctx.getFlinkService().isHaMetadataAvailable(conf);
+ if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
+ LOG.info("HA metadata not available, cancel will be used instead of last-state");
+ return true;
+ }
+ return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);
}
protected void restoreJob(
diff --git a/helm/flink-kubernetes-operator/conf/log4j-operator.properties b/helm/flink-kubernetes-operator/conf/log4j-operator.properties
index e6d0318..7c4285f 100644
--- a/helm/flink-kubernetes-operator/conf/log4j-operator.properties
+++ b/helm/flink-kubernetes-operator/conf/log4j-operator.properties
@@ -27,11 +27,11 @@
# Do not log config loading
logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
-logger.conf.level = WARN
+logger.conf.level = ERROR
# Avoid logging fallback key INFO messages
logger.conf.name = org.apache.flink.configuration.Configuration
-logger.conf.level = WARN
+logger.conf.level = ERROR
# The monitor interval in seconds to enable log4j automatic reconfiguration
# monitorInterval = 30
\ No newline at end of file