[improve] Check the status of fe and be before connect (#311)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 0fcc01d..fcf31c3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -38,6 +38,7 @@
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
+import org.apache.doris.flink.sink.BackendUtil;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
@@ -280,7 +281,13 @@
}
List<String> nodes = Arrays.asList(feNodes.split(","));
Collections.shuffle(nodes);
- return nodes.get(0).trim();
+ for (String feNode : nodes) {
+ if (BackendUtil.tryHttpConnection(feNode)) {
+ return feNode;
+ }
+ }
+ throw new DorisRuntimeException(
+ "No Doris FE is available, please check configuration or cluster status.");
}
/**
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index f909bb6..9a45ff0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -87,17 +87,18 @@
throw new DorisRuntimeException("no available backend.");
}
- public static boolean tryHttpConnection(String backend) {
+ public static boolean tryHttpConnection(String host) {
try {
- backend = "http://" + backend;
- URL url = new URL(backend);
+ LOG.info("try to connect host {}", host);
+ host = "http://" + host;
+ URL url = new URL(host);
HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(60000);
co.connect();
co.disconnect();
return true;
} catch (Exception ex) {
- LOG.warn("Failed to connect to backend:{}", backend, ex);
+ LOG.warn("Failed to connect to host:{}", host, ex);
return false;
}
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 5b553fc..4563876 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -20,6 +20,7 @@
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.IllegalArgumentException;
+import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.http.ProtocolVersion;
@@ -28,6 +29,7 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicStatusLine;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -70,7 +72,8 @@
HttpEntityMock entityMock;
SchemaChangeManager schemaChangeManager;
- static MockedStatic<HttpClients> httpClientMockedStatic = mockStatic(HttpClients.class);
+ static MockedStatic<HttpClients> httpClientMockedStatic;
+ static MockedStatic<BackendUtil> backendUtilMockedStatic;
@Before
public void setUp() throws IOException {
@@ -89,7 +92,11 @@
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
+ httpClientMockedStatic = mockStatic(HttpClients.class);
httpClientMockedStatic.when(() -> HttpClients.createDefault()).thenReturn(httpClient);
+
+ backendUtilMockedStatic = mockStatic(BackendUtil.class);
+ backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true);
}
@Test
@@ -140,4 +147,10 @@
Assert.assertEquals(
"ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL);
}
+
+ @After
+ public void after() {
+ httpClientMockedStatic.close();
+ backendUtilMockedStatic.close();
+ }
}