[HTTP][API] Add backends info API for spark/flink connector (#6984)
Doris should provide a http api to return backends list for connectors to submit stream load,
and without privilege checking, which can let common user to use it
diff --git a/src/main/java/org/apache/doris/flink/rest/RestService.java b/src/main/java/org/apache/doris/flink/rest/RestService.java
index 184afd3..1e6310c 100644
--- a/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -32,6 +32,7 @@
import org.apache.doris.flink.exception.ShouldNeverHappenException;
import org.apache.doris.flink.rest.models.Backend;
import org.apache.doris.flink.rest.models.BackendRow;
+import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
@@ -83,7 +84,9 @@
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
+ @Deprecated
private static final String BACKENDS = "/rest/v1/system?path=//backends";
+ private static final String BACKENDS_V2 = "/api/backends?is_aliva=true";
private static final String FE_LOGIN = "/rest/v1/login";
/**
@@ -250,25 +253,29 @@
*/
@VisibleForTesting
public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
- List<BackendRow> backends = getBackends(options, readOptions, logger);
+ List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
Collections.shuffle(backends);
- BackendRow backend = backends.get(0);
- return backend.getIP() + ":" + backend.getHttpPort();
+ BackendV2.BackendRowV2 backend = backends.get(0);
+ return backend.getIp() + ":" + backend.getHttpPort();
}
/**
- * get Doris BE nodes to request.
+ * get Doris BE nodes to request.
*
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
+ *
+ * This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users.
+ * Use getBackendsV2 instead
*/
+ @Deprecated
@VisibleForTesting
static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
@@ -281,6 +288,7 @@
return backends;
}
+ @Deprecated
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
ObjectMapper mapper = new ObjectMapper();
Backend backend;
@@ -310,6 +318,54 @@
}
/**
+ * get Doris BE nodes to request.
+ *
+ * @param options configuration of request
+ * @param logger slf4j logger
+ * @return the chosen one Doris BE node
+ * @throws IllegalArgumentException BE nodes is illegal
+ */
+ @VisibleForTesting
+ static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+ String feNodes = options.getFenodes();
+ String feNode = randomEndpoint(feNodes, logger);
+ String beUrl = "http://" + feNode + BACKENDS_V2;
+ HttpGet httpGet = new HttpGet(beUrl);
+ String response = send(options, readOptions, httpGet, logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+ return backends;
+ }
+
+ static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ BackendV2 backend;
+ try {
+ backend = mapper.readValue(response, BackendV2.class);
+ } catch (JsonParseException e) {
+ String errMsg = "Doris BE's response is not a json. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ } catch (JsonMappingException e) {
+ String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ } catch (IOException e) {
+ String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+ logger.error(errMsg, e);
+ throw new DorisException(errMsg, e);
+ }
+
+ if (backend == null) {
+ logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+ throw new ShouldNeverHappenException();
+ }
+ List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+ logger.debug("Parsing schema result is '{}'.", backendRows);
+ return backendRows;
+ }
+
+ /**
* get a valid URI to connect Doris FE.
*
* @param options configuration of request
diff --git a/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/src/main/java/org/apache/doris/flink/rest/models/Backend.java
index d74e46f..d91614f 100644
--- a/src/main/java/org/apache/doris/flink/rest/models/Backend.java
+++ b/src/main/java/org/apache/doris/flink/rest/models/Backend.java
@@ -25,6 +25,7 @@
/**
* Be response model
**/
+@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class Backend {
diff --git a/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java b/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
index 5b7df99..3dd0471 100644
--- a/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
+++ b/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java
@@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendRow {
diff --git a/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java b/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
new file mode 100644
index 0000000..5efb85e
--- /dev/null
+++ b/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.rest.models;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * Be response model
+ **/
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BackendV2 {
+
+ @JsonProperty(value = "backends")
+ private List<BackendRowV2> backends;
+
+ public List<BackendRowV2> getBackends() {
+ return backends;
+ }
+
+ public void setBackends(List<BackendRowV2> backends) {
+ this.backends = backends;
+ }
+
+ public static class BackendRowV2 {
+ @JsonProperty("ip")
+ public String ip;
+ @JsonProperty("http_port")
+ public int httpPort;
+ @JsonProperty("is_alive")
+ public boolean isAlive;
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getHttpPort() {
+ return httpPort;
+ }
+
+ public void setHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ }
+
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+ public void setAlive(boolean alive) {
+ isAlive = alive;
+ }
+ }
+}