Supports traversal of Doris FE nodes when searching for Doris BE (#67)
Co-authored-by: 顾忠辉 <guzhonghui@yiche.com>
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index b614e1a..a00385c 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -263,6 +263,17 @@
"/";
}
+ @VisibleForTesting
+ static String getUriStr(String feNode,Settings cfg, Logger logger) throws IllegalArgumentException {
+ String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
+ return "http://" +
+ feNode + API_PREFIX +
+ "/" + identifier[0] +
+ "/" + identifier[1] +
+ "/";
+ }
+
+
/**
* discover Doris table schema from Doris FE.
* @param cfg configuration of request
@@ -273,10 +284,20 @@
public static Schema getSchema(Settings cfg, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
- HttpGet httpGet = new HttpGet(getUriStr(cfg, logger) + SCHEMA);
- String response = send(cfg, httpGet, logger);
- logger.debug("Find schema response is '{}'.", response);
- return parseSchema(response, logger);
+ List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
+ for (String feNode: feNodeList) {
+ try {
+ HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + SCHEMA);
+ String response = send(cfg, httpGet, logger);
+ logger.debug("Find schema response is '{}'.", response);
+ return parseSchema(response, logger);
+ } catch (ConnectedFailedException e) {
+ logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
+ }
+ }
+ String errMsg = "No Doris FE is available, please check configuration";
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
}
/**
@@ -337,25 +358,36 @@
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
- HttpPost httpPost = new HttpPost(getUriStr(cfg, logger) + QUERY_PLAN);
- String entity = "{\"sql\": \""+ sql +"\"}";
- logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
- StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
- stringEntity.setContentEncoding("UTF-8");
- stringEntity.setContentType("application/json");
- httpPost.setEntity(stringEntity);
+ List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
+ for (String feNode: feNodeList) {
+ try {
+ HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) + QUERY_PLAN);
+ String entity = "{\"sql\": \""+ sql +"\"}";
+ logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
+ StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
+ stringEntity.setContentEncoding("UTF-8");
+ stringEntity.setContentType("application/json");
+ httpPost.setEntity(stringEntity);
- String resStr = send(cfg, httpPost, logger);
- logger.debug("Find partition response is '{}'.", resStr);
- QueryPlan queryPlan = getQueryPlan(resStr, logger);
- Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
- return tabletsMapToPartition(
- cfg,
- be2Tablets,
- queryPlan.getOpaqued_query_plan(),
- tableIdentifiers[0],
- tableIdentifiers[1],
- logger);
+ String resStr = send(cfg, httpPost, logger);
+ logger.debug("Find partition response is '{}'.", resStr);
+ QueryPlan queryPlan = getQueryPlan(resStr, logger);
+ Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
+ return tabletsMapToPartition(
+ cfg,
+ be2Tablets,
+ queryPlan.getOpaqued_query_plan(),
+ tableIdentifiers[0],
+ tableIdentifiers[1],
+ logger);
+ } catch (ConnectedFailedException e) {
+ logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
+ }
+ }
+ String errMsg = "No Doris FE is available, please check configuration";
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
+
}
/**
@@ -536,19 +568,27 @@
*/
@VisibleForTesting
public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException {
- String feNodes = sparkSettings.getProperty(DORIS_FENODES);
- String feNode = randomEndpoint(feNodes, logger);
- String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(sparkSettings, httpGet, logger);
- logger.info("Backend Info:{}", response);
- List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
- logger.trace("Parse beNodes '{}'.", backends);
- if (backends == null || backends.isEmpty()) {
- logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
- throw new IllegalArgumentException("beNodes", String.valueOf(backends));
+ List<String> feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
+ for (String feNode : feNodeList){
+ try {
+ String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
+ HttpGet httpGet = new HttpGet(beUrl);
+ String response = send(sparkSettings, httpGet, logger);
+ logger.info("Backend Info:{}", response);
+ List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+ logger.trace("Parse beNodes '{}'.", backends);
+ if (backends == null || backends.isEmpty()) {
+ logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
+ throw new IllegalArgumentException("beNodes", String.valueOf(backends));
+ }
+ return backends;
+ } catch (ConnectedFailedException e) {
+ logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
+ }
}
- return backends;
+ String errMsg = "No Doris FE is available, please check configuration";
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
}
/**
@@ -629,4 +669,24 @@
}
return partitions;
}
+
+ /**
+ * choice a Doris FE node to request.
+ *
+ * @param feNodes Doris FE node list, separate be comma
+ * @param logger slf4j logger
+ * @return the array of Doris FE nodes
+ * @throws IllegalArgumentException fe nodes is illegal
+ */
+ @VisibleForTesting
+ static List<String> allEndpoints(String feNodes, Logger logger) throws IllegalArgumentException {
+ logger.trace("Parse fenodes '{}'.", feNodes);
+ if (StringUtils.isEmpty(feNodes)) {
+ logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+ throw new IllegalArgumentException("fenodes", feNodes);
+ }
+ List<String> nodes = Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
+ Collections.shuffle(nodes);
+ return nodes;
+ }
}