[FLINK-22434] Retry in clusterclient if job state is unknown

Since the dispatcher now exposed the SUSPENDED job state it
needs to be handled in the ClusterClient. We decided to not
expose this state because it would be an unexpected change
for users relying on the old behavior.
In summary, the RestClusterClient will retry to receive a
different state for jobs in SUSPENDED state.

This closes #15964.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 6659b94..f22dd0a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -32,6 +32,7 @@
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices;
@@ -161,6 +162,11 @@
     /** ExecutorService to run operations that can be retried on exceptions. */
     private ScheduledExecutorService retryExecutorService;
 
+    private final Predicate<Throwable> unknownJobStateRetryable =
+            exception ->
+                    ExceptionUtils.findThrowable(exception, JobStateUnknownException.class)
+                            .isPresent();
+
     public RestClusterClient(Configuration config, T clusterId) throws Exception {
         this(config, clusterId, HighAvailabilityServicesUtils.createClientHAService(config));
     }
@@ -270,7 +276,9 @@
 
     @Override
     public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
-        return getJobDetails(jobId).thenApply(JobDetailsInfo::getJobStatus);
+        final CheckedSupplier<CompletableFuture<JobStatus>> operation =
+                () -> requestJobStatus(jobId);
+        return retry(operation, unknownJobStateRetryable);
     }
 
     /**
@@ -283,12 +291,9 @@
      */
     @Override
     public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
-        return pollResourceAsync(
-                () -> {
-                    final JobMessageParameters messageParameters = new JobMessageParameters();
-                    messageParameters.jobPathParameter.resolve(jobId);
-                    return sendRequest(JobExecutionResultHeaders.getInstance(), messageParameters);
-                });
+        final CheckedSupplier<CompletableFuture<JobResult>> operation =
+                () -> requestJobResultInternal(jobId);
+        return retry(operation, unknownJobStateRetryable);
     }
 
     @Override
@@ -700,6 +705,44 @@
     // RestClient Helper
     // -------------------------------------------------------------------------
 
+    private CompletableFuture<JobStatus> requestJobStatus(JobID jobId) {
+        return getJobDetails(jobId)
+                .thenApply(JobDetailsInfo::getJobStatus)
+                .thenApply(
+                        jobStatus -> {
+                            if (jobStatus == JobStatus.SUSPENDED) {
+                                throw new JobStateUnknownException(
+                                        String.format("Job %s is in state SUSPENDED", jobId));
+                            }
+                            return jobStatus;
+                        });
+    }
+
+    private static class JobStateUnknownException extends RuntimeException {
+        public JobStateUnknownException(String message) {
+            super(message);
+        }
+    }
+
+    private CompletableFuture<JobResult> requestJobResultInternal(@Nonnull JobID jobId) {
+        return pollResourceAsync(
+                        () -> {
+                            final JobMessageParameters messageParameters =
+                                    new JobMessageParameters();
+                            messageParameters.jobPathParameter.resolve(jobId);
+                            return sendRequest(
+                                    JobExecutionResultHeaders.getInstance(), messageParameters);
+                        })
+                .thenApply(
+                        jobResult -> {
+                            if (jobResult.getApplicationStatus() == ApplicationStatus.UNKNOWN) {
+                                throw new JobStateUnknownException(
+                                        String.format("Result for Job %s is UNKNOWN", jobId));
+                            }
+                            return jobResult;
+                        });
+    }
+
     private <
                     M extends MessageHeaders<EmptyRequestBody, P, U>,
                     U extends MessageParameters,
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 0c94c3f..35b968f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -71,6 +71,8 @@
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
@@ -129,6 +131,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
@@ -600,6 +603,18 @@
                         new RestHandlerException(
                                 "should trigger retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
                         JobExecutionResultResponseBody.inProgress(),
+                        // On an UNKNOWN JobResult it should be retried
+                        JobExecutionResultResponseBody.created(
+                                new JobResult.Builder()
+                                        .applicationStatus(ApplicationStatus.UNKNOWN)
+                                        .jobId(jobId)
+                                        .netRuntime(Long.MAX_VALUE)
+                                        .accumulatorResults(
+                                                Collections.singletonMap(
+                                                        "testName",
+                                                        new SerializedValue<>(
+                                                                OptionalFailure.of(1.0))))
+                                        .build()),
                         JobExecutionResultResponseBody.created(
                                 new JobResult.Builder()
                                         .applicationStatus(ApplicationStatus.SUCCEEDED)
@@ -819,6 +834,48 @@
         }
     }
 
+    /**
+     * The SUSPENDED job status should never be returned by the client thus client retries until it
+     * either receives a different job status or the cluster is not reachable.
+     */
+    @Test
+    public void testNotShowSuspendedJobStatus() throws Exception {
+        final List<JobDetailsInfo> jobDetails = new ArrayList<>();
+        jobDetails.add(buildJobDetail(JobStatus.SUSPENDED));
+        jobDetails.add(buildJobDetail(JobStatus.RUNNING));
+        final TestJobStatusHandler jobStatusHandler =
+                new TestJobStatusHandler(jobDetails.iterator());
+
+        try (TestRestServerEndpoint restServerEndpoint =
+                createRestServerEndpoint(jobStatusHandler)) {
+            final RestClusterClient<?> restClusterClient =
+                    createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
+            try {
+                final CompletableFuture<JobStatus> future = restClusterClient.getJobStatus(jobId);
+                assertEquals(JobStatus.RUNNING, future.get());
+            } finally {
+                restClusterClient.close();
+            }
+        }
+    }
+
+    private JobDetailsInfo buildJobDetail(JobStatus jobStatus) {
+        return new JobDetailsInfo(
+                jobId,
+                "testJob",
+                true,
+                jobStatus,
+                1L,
+                2L,
+                1L,
+                8888L,
+                1984L,
+                new HashMap<>(),
+                new ArrayList<>(),
+                new HashMap<>(),
+                "{\"id\":\"1234\"}");
+    }
+
     private class TestClientCoordinationHandler
             extends TestHandler<
                     ClientCoordinationRequestBody,
@@ -957,6 +1014,29 @@
         }
     }
 
+    private class TestJobStatusHandler
+            extends TestHandler<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> {
+
+        private final Iterator<JobDetailsInfo> jobDetailsInfo;
+
+        private TestJobStatusHandler(@Nonnull Iterator<JobDetailsInfo> jobDetailsInfo) {
+            super(JobDetailsHeaders.getInstance());
+            checkState(jobDetailsInfo.hasNext(), "Job details are empty");
+            this.jobDetailsInfo = checkNotNull(jobDetailsInfo);
+        }
+
+        @Override
+        protected CompletableFuture<JobDetailsInfo> handleRequest(
+                @Nonnull HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
+                @Nonnull DispatcherGateway gateway)
+                throws RestHandlerException {
+            if (!jobDetailsInfo.hasNext()) {
+                throw new IllegalStateException("More job details were requested than configured");
+            }
+            return CompletableFuture.completedFuture(jobDetailsInfo.next());
+        }
+    }
+
     private abstract class TestHandler<
                     R extends RequestBody, P extends ResponseBody, M extends MessageParameters>
             extends AbstractRestHandler<DispatcherGateway, R, P, M> {