[improve] Check the status of fe and be before connect (#311)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 0fcc01d..fcf31c3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -38,6 +38,7 @@
 import org.apache.doris.flink.rest.models.QueryPlan;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.rest.models.Tablet;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
@@ -280,7 +281,13 @@
         }
         List<String> nodes = Arrays.asList(feNodes.split(","));
         Collections.shuffle(nodes);
-        return nodes.get(0).trim();
+        for (String feNode : nodes) {
+            if (BackendUtil.tryHttpConnection(feNode)) {
+                return feNode;
+            }
+        }
+        throw new DorisRuntimeException(
+                "No Doris FE is available, please check configuration or cluster status.");
     }
 
     /**
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index f909bb6..9a45ff0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -87,17 +87,18 @@
         throw new DorisRuntimeException("no available backend.");
     }
 
-    public static boolean tryHttpConnection(String backend) {
+    public static boolean tryHttpConnection(String host) {
         try {
-            backend = "http://" + backend;
-            URL url = new URL(backend);
+            LOG.info("try to connect host {}", host);
+            host = "http://" + host;
+            URL url = new URL(host);
             HttpURLConnection co = (HttpURLConnection) url.openConnection();
             co.setConnectTimeout(60000);
             co.connect();
             co.disconnect();
             return true;
         } catch (Exception ex) {
-            LOG.warn("Failed to connect to backend:{}", backend, ex);
+            LOG.warn("Failed to connect to host:{}", host, ex);
             return false;
         }
     }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 5b553fc..4563876 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -20,6 +20,7 @@
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.HttpEntityMock;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.http.ProtocolVersion;
@@ -28,6 +29,7 @@
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.message.BasicStatusLine;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -70,7 +72,8 @@
 
     HttpEntityMock entityMock;
     SchemaChangeManager schemaChangeManager;
-    static MockedStatic<HttpClients> httpClientMockedStatic = mockStatic(HttpClients.class);
+    static MockedStatic<HttpClients> httpClientMockedStatic;
+    static MockedStatic<BackendUtil> backendUtilMockedStatic;
 
     @Before
     public void setUp() throws IOException {
@@ -89,7 +92,11 @@
         when(httpResponse.getStatusLine()).thenReturn(normalLine);
         when(httpResponse.getEntity()).thenReturn(entityMock);
 
+        httpClientMockedStatic = mockStatic(HttpClients.class);
         httpClientMockedStatic.when(() -> HttpClients.createDefault()).thenReturn(httpClient);
+
+        backendUtilMockedStatic = mockStatic(BackendUtil.class);
+        backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true);
     }
 
     @Test
@@ -140,4 +147,10 @@
         Assert.assertEquals(
                 "ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL);
     }
+
+    @After
+    public void after() {
+        httpClientMockedStatic.close();
+        backendUtilMockedStatic.close();
+    }
 }