[FLINK-22545][coordination] Fix check during creation of Source Coordinator thread.

The check was meant as a safeguard to prevent re-instantiation after fatal errors killed a previous thread.
But the check was susceptible to thread termination due to idleness in the executor.

This updates the check to only fail if there is in fact an instantiation next to a running thread, or after a
previously crashed thread.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 563ed28..1660027 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -27,6 +27,8 @@
 import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -85,13 +87,16 @@
     }
 
     /** A thread factory class that provides some helper methods. */
-    public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
+    public static class CoordinatorExecutorThreadFactory
+            implements ThreadFactory, Thread.UncaughtExceptionHandler {
 
         private final String coordinatorThreadName;
         private final ClassLoader cl;
         private final Thread.UncaughtExceptionHandler errorHandler;
 
-        private Thread t;
+        @Nullable private Thread t;
+
+        @Nullable private volatile Throwable previousFailureReason;
 
         CoordinatorExecutorThreadFactory(
                 final String coordinatorThreadName, final ClassLoader contextClassLoader) {
@@ -110,18 +115,32 @@
 
         @Override
         public synchronized Thread newThread(Runnable r) {
-            if (t != null) {
+            if (t != null && t.isAlive()) {
                 throw new Error(
-                        "This indicates that a fatal error has happened and caused the "
-                                + "coordinator executor thread to exit. Check the earlier logs"
-                                + "to see the root cause of the problem.");
+                        "Source Coordinator Thread already exists. There should never be more than one "
+                                + "thread driving the actions of a Source Coordinator. Existing Thread: "
+                                + t);
+            }
+            if (t != null && previousFailureReason != null) {
+                throw new Error(
+                        "The following fatal error has happened in a previously spawned "
+                                + "Source Coordinator thread. No new thread can be spawned.",
+                        previousFailureReason);
             }
             t = new Thread(r, coordinatorThreadName);
             t.setContextClassLoader(cl);
-            t.setUncaughtExceptionHandler(errorHandler);
+            t.setUncaughtExceptionHandler(this);
             return t;
         }
 
+        @Override
+        public synchronized void uncaughtException(Thread t, Throwable e) {
+            if (previousFailureReason == null) {
+                previousFailureReason = e;
+            }
+            errorHandler.uncaughtException(t, e);
+        }
+
         String getCoordinatorThreadName() {
             return coordinatorThreadName;
         }