[FLINK-37236] Update e2e jar handling logic and improve debugging
diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml
index 9a1f6ce..26f90a8 100644
--- a/.github/workflows/e2e.yaml
+++ b/.github/workflows/e2e.yaml
@@ -84,10 +84,19 @@
           if [[ "${{ inputs.append-java-version }}" == "true" ]]; then
             FLINK_IMAGE=${FLINK_IMAGE}-java${{ inputs.java-version }}
           fi
+          
+          EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
+          if [[ ${{ inputs.flink-version }} == v2* ]]; then
+            EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
+          fi
+          ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')
+          
           echo FLINK_IMAGE=${FLINK_IMAGE}
+          echo EXAMPLES_JAR=${EXAMPLES_JAR}
           sed -i "s/image: flink:.*/image: ${FLINK_IMAGE}/" e2e-tests/data/*.yaml
           sed -i "s/flinkVersion: .*/flinkVersion: ${{ inputs.flink-version }}/" e2e-tests/data/*.yaml
           sed -i "s/mode: .*/mode: ${{ inputs.mode }}/" e2e-tests/data/*.yaml
+          sed -i "s/STREAMING_EXAMPLES_JAR_URL/${ESCAPED_EXAMPLES_JAR}/" e2e-tests/data/*.yaml
           git diff HEAD
           echo "Running e2e-tests/$test"
           bash e2e-tests/${{ inputs.test }} || exit 1
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index 0a9703e..f484767 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -31,7 +31,7 @@
       nginx.ingress.kubernetes.io/rewrite-target: "/$2"
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
-    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.type: kubernetes
     high-availability.storageDir: file:///opt/flink/volume/flink-ha
     state.checkpoints.dir: file:///opt/flink/volume/flink-cp
     state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -44,7 +44,7 @@
           image: busybox:1.35.0
           imagePullPolicy: IfNotPresent
           # Use wget or other tools to get user jars from remote storage
-          command: [ 'wget', 'https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar', '-O', '/flink-artifact/myjob.jar' ]
+          command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
           volumeMounts:
             - mountPath: /flink-artifact
               name: flink-artifact
diff --git a/e2e-tests/data/multi-sessionjob.yaml b/e2e-tests/data/multi-sessionjob.yaml
index bc48c20..e8f84ce 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -31,7 +31,7 @@
       nginx.ingress.kubernetes.io/rewrite-target: "/$2"
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
-    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.type: kubernetes
     high-availability.storageDir: file:///opt/flink/volume/flink-ha
     state.checkpoints.dir: file:///opt/flink/volume/flink-cp
     state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -79,7 +79,7 @@
       nginx.ingress.kubernetes.io/rewrite-target: "/$2"
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
-    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.type: kubernetes
     high-availability.storageDir: file:///opt/flink/volume/flink-ha
     state.checkpoints.dir: file:///opt/flink/volume/flink-cp
     state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -120,7 +120,7 @@
 spec:
   deploymentName: session-cluster-1
   job:
-    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+    jarURI: STREAMING_EXAMPLES_JAR_URL
     parallelism: 2
     upgradeMode: savepoint
     entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
@@ -134,7 +134,7 @@
 spec:
   deploymentName: session-cluster-1
   job:
-    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+    jarURI: STREAMING_EXAMPLES_JAR_URL
     parallelism: 2
     upgradeMode: savepoint
     entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml
index f43e341..1d96988 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -31,7 +31,7 @@
       nginx.ingress.kubernetes.io/rewrite-target: "/$2"
   flinkConfiguration:
     taskmanager.numberOfTaskSlots: "2"
-    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+    high-availability.type: kubernetes
     high-availability.storageDir: file:///opt/flink/volume/flink-ha
     state.checkpoints.dir: file:///opt/flink/volume/flink-cp
     state.savepoints.dir: file:///opt/flink/volume/flink-sp
@@ -73,7 +73,7 @@
 spec:
   deploymentName: session-cluster-1
   job:
-    jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar
+    jarURI: STREAMING_EXAMPLES_JAR_URL
     parallelism: 2
     upgradeMode: savepoint
     entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
diff --git a/e2e-tests/utils.sh b/e2e-tests/utils.sh
index 5356a40..bf2f0e6 100755
--- a/e2e-tests/utils.sh
+++ b/e2e-tests/utils.sh
@@ -231,8 +231,14 @@
 function debug_and_show_logs {
     echo "Debugging failed e2e test:"
     echo "Currently existing Kubernetes resources"
+    kubectl get flinkdeployments
+    kubectl get flinksessionjobs
     kubectl get all
+    kubectl get configmaps
+    kubectl describe flinkdeployments
+    kubectl describe flinksessionjobs
     kubectl describe all
+    kubectl describe configmaps
 
     operator_pod_namespace=$(get_operator_pod_namespace)
     operator_pod_names=$(get_operator_pod_name)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index a6db17b..71ec3de 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -285,11 +285,13 @@
 
             boolean cancellable = allowLastStateCancel(ctx);
             if (running) {
-                return getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
+                var mode = getUpgradeModeBasedOnStateAge(ctx, deployConfig, cancellable);
+                LOG.info("Job is running, using {} for last-state upgrade", mode);
+                return mode;
             }
 
             if (cancellable) {
-                LOG.info("Using cancel to perform last-state upgrade");
+                LOG.info("Job is not running, using cancel to perform last-state upgrade");
                 return JobUpgrade.lastStateUsingCancel();
             }
         }
@@ -356,8 +358,11 @@
         }
 
         var conf = ctx.getObserveConfig();
-        return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB)
-                || !ctx.getFlinkService().isHaMetadataAvailable(conf);
+        if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) {
+            LOG.info("HA metadata not available, cancel will be used instead of last-state");
+            return true;
+        }
+        return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB);
     }
 
     protected void restoreJob(
diff --git a/helm/flink-kubernetes-operator/conf/log4j-operator.properties b/helm/flink-kubernetes-operator/conf/log4j-operator.properties
index e6d0318..7c4285f 100644
--- a/helm/flink-kubernetes-operator/conf/log4j-operator.properties
+++ b/helm/flink-kubernetes-operator/conf/log4j-operator.properties
@@ -27,11 +27,11 @@
 
 # Do not log config loading
 logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
-logger.conf.level = WARN
+logger.conf.level = ERROR
 
 # Avoid logging fallback key INFO messages
 logger.conf.name = org.apache.flink.configuration.Configuration
-logger.conf.level = WARN
+logger.conf.level = ERROR
 
 # The monitor interval in seconds to enable log4j automatic reconfiguration
 # monitorInterval = 30
\ No newline at end of file