blob: a00385cde849edd330f464d162ba89046c150d95 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.rest;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER;
import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;
import java.util.Base64;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
/**
* Service for communicate with Doris FE.
*/
public class RestService implements Serializable {
public final static int REST_RESPONSE_STATUS_OK = 200;
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
@Deprecated
private static final String BACKENDS = "/rest/v1/system?path=//backends";
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
/**
* send request to Doris FE and get response json string.
* @param cfg configuration of request
* @param request {@link HttpRequestBase} real request
* @param logger {@link Logger}
* @return Doris FE response in json string
* @throws ConnectedFailedException throw when cannot connect to Doris FE
*/
private static String send(Settings cfg, HttpRequestBase request, Logger logger) throws
ConnectedFailedException {
int connectTimeout = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT);
int socketTimeout = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
int retries = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
connectTimeout, socketTimeout, retries);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(connectTimeout)
.setSocketTimeout(socketTimeout)
.build();
request.setConfig(requestConfig);
String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);
IOException ex = null;
int statusCode = -1;
for (int attempt = 0; attempt < retries; attempt++) {
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
String response;
if (request instanceof HttpGet){
response = getConnectionGet(request.getURI().toString(), user, password,logger);
} else {
response = getConnectionPost(request,user, password,logger);
}
if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
request.getURI(), statusCode);
continue;
}
logger.trace("Success get response from Doris FE: {}, response is: {}.",
request.getURI(), response);
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
//Handle the problem of inconsistent data format returned by http v1 and v2
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
return response;
}
} catch (IOException e) {
ex = e;
logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
}
}
logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}
private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
URL realUrl = new URL(request);
// open connection
HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);
connection.connect();
return parseResponse(connection,logger);
}
private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
if (connection.getResponseCode() != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris {}, http code is {}",
connection.getURL(), connection.getResponseCode());
throw new IOException("Failed to get response from Doris");
}
StringBuilder result = new StringBuilder("");
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
String line;
while ((line = in.readLine()) != null) {
result.append(line);
}
if (in != null) {
in.close();
}
return result.toString();
}
private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
InputStream content = ((HttpPost)request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
PrintWriter out = new PrintWriter(conn.getOutputStream());
// send request params
out.print(res);
// flush
out.flush();
// read response
return parseResponse(conn,logger);
}
/**
* parse table identifier to array.
* @param tableIdentifier table identifier string
* @param logger {@link Logger}
* @return first element is db name, second element is table name
* @throws IllegalArgumentException table identifier is illegal
*/
@VisibleForTesting
static String[] parseIdentifier(String tableIdentifier, Logger logger) throws IllegalArgumentException {
logger.trace("Parse identifier '{}'.", tableIdentifier);
if (StringUtils.isEmpty(tableIdentifier)) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
throw new IllegalArgumentException("table.identifier", tableIdentifier);
}
String[] identifier = tableIdentifier.split("\\.");
if (identifier.length != 2) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
throw new IllegalArgumentException("table.identifier", tableIdentifier);
}
return identifier;
}
/**
* choice a Doris FE node to request.
* @param feNodes Doris FE node list, separate be comma
* @param logger slf4j logger
* @return the chosen one Doris FE node
* @throws IllegalArgumentException fe nodes is illegal
*/
@VisibleForTesting
static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgumentException {
logger.trace("Parse fenodes '{}'.", feNodes);
if (StringUtils.isEmpty(feNodes)) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
throw new IllegalArgumentException("fenodes", feNodes);
}
List<String> nodes = Arrays.asList(feNodes.split(","));
Collections.shuffle(nodes);
return nodes.get(0).trim();
}
/**
* get a valid URI to connect Doris FE.
* @param cfg configuration of request
* @param logger {@link Logger}
* @return uri string
* @throws IllegalArgumentException throw when configuration is illegal
*/
@VisibleForTesting
static String getUriStr(Settings cfg, Logger logger) throws IllegalArgumentException {
String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
return "http://" +
randomEndpoint(cfg.getProperty(DORIS_FENODES), logger) + API_PREFIX +
"/" + identifier[0] +
"/" + identifier[1] +
"/";
}
@VisibleForTesting
static String getUriStr(String feNode,Settings cfg, Logger logger) throws IllegalArgumentException {
String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
return "http://" +
feNode + API_PREFIX +
"/" + identifier[0] +
"/" + identifier[1] +
"/";
}
/**
* discover Doris table schema from Doris FE.
* @param cfg configuration of request
* @param logger slf4j logger
* @return Doris table schema
* @throws DorisException throw when discover failed
*/
public static Schema getSchema(Settings cfg, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
for (String feNode: feNodeList) {
try {
HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + SCHEMA);
String response = send(cfg, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
} catch (ConnectedFailedException e) {
logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
}
}
String errMsg = "No Doris FE is available, please check configuration";
logger.error(errMsg);
throw new DorisException(errMsg);
}
/**
* translate Doris FE response to inner {@link Schema} struct.
* @param response Doris FE response
* @param logger {@link Logger}
* @return inner {@link Schema} struct
* @throws DorisException throw when translate failed
*/
@VisibleForTesting
public static Schema parseSchema(String response, Logger logger) throws DorisException {
logger.trace("Parse response '{}' to schema.", response);
ObjectMapper mapper = new ObjectMapper();
Schema schema;
try {
schema = mapper.readValue(response, Schema.class);
} catch (JsonParseException e) {
String errMsg = "Doris FE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (JsonMappingException e) {
String errMsg = "Doris FE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris FE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}
if (schema == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
String errMsg = "Doris FE's response is not OK, status is " + schema.getStatus();
logger.error(errMsg);
throw new DorisException(errMsg);
}
logger.debug("Parsing schema result is '{}'.", schema);
return schema;
}
/**
* find Doris RDD partitions from Doris FE.
* @param cfg configuration of request
* @param logger {@link Logger}
* @return an list of Doris RDD partitions
* @throws DorisException throw when find partition failed
*/
public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logger) throws DorisException {
String[] tableIdentifiers = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
" from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
for (String feNode: feNodeList) {
try {
HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) + QUERY_PLAN);
String entity = "{\"sql\": \""+ sql +"\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
String resStr = send(cfg, httpPost, logger);
logger.debug("Find partition response is '{}'.", resStr);
QueryPlan queryPlan = getQueryPlan(resStr, logger);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
return tabletsMapToPartition(
cfg,
be2Tablets,
queryPlan.getOpaqued_query_plan(),
tableIdentifiers[0],
tableIdentifiers[1],
logger);
} catch (ConnectedFailedException e) {
logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
}
}
String errMsg = "No Doris FE is available, please check configuration";
logger.error(errMsg);
throw new DorisException(errMsg);
}
/**
* translate Doris FE response string to inner {@link QueryPlan} struct.
* @param response Doris FE response string
* @param logger {@link Logger}
* @return inner {@link QueryPlan} struct
* @throws DorisException throw when translate failed.
*/
@VisibleForTesting
static QueryPlan getQueryPlan(String response, Logger logger) throws DorisException {
ObjectMapper mapper = new ObjectMapper();
QueryPlan queryPlan;
try {
queryPlan = mapper.readValue(response, QueryPlan.class);
} catch (JsonParseException e) {
String errMsg = "Doris FE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (JsonMappingException e) {
String errMsg = "Doris FE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris FE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}
if (queryPlan == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus();
logger.error(errMsg);
throw new DorisException(errMsg);
}
logger.debug("Parsing partition result is '{}'.", queryPlan);
return queryPlan;
}
/**
* select which Doris BE to get tablet data.
* @param queryPlan {@link QueryPlan} translated from Doris FE response
* @param logger {@link Logger}
* @return BE to tablets {@link Map}
* @throws DorisException throw when select failed.
*/
@VisibleForTesting
static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
Map<String, List<Long>> be2Tablets = new HashMap<>();
for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
logger.debug("Parse tablet info: '{}'.", part);
long tabletId;
try {
tabletId = Long.parseLong(part.getKey());
} catch (NumberFormatException e) {
String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed.";
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}
String target = null;
int tabletCount = Integer.MAX_VALUE;
for (String candidate : part.getValue().getRoutings()) {
logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", candidate, tabletId);
if (!be2Tablets.containsKey(candidate)) {
logger.debug("Choice a new Doris BE '{}' for tablet '{}'.", candidate, tabletId);
List<Long> tablets = new ArrayList<>();
be2Tablets.put(candidate, tablets);
target = candidate;
break;
} else {
if (be2Tablets.get(candidate).size() < tabletCount) {
target = candidate;
tabletCount = be2Tablets.get(candidate).size();
logger.debug("Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.",
tabletId, target, tabletCount);
}
}
}
if (target == null) {
String errMsg = "Cannot choice Doris BE for tablet " + tabletId;
logger.error(errMsg);
throw new DorisException(errMsg);
}
logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, tabletId);
be2Tablets.get(target).add(tabletId);
}
return be2Tablets;
}
/**
* tablet count limit for one Doris RDD partition
* @param cfg configuration of request
* @param logger {@link Logger}
* @return tablet count limit
*/
@VisibleForTesting
static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
int tabletsSize = DORIS_TABLET_SIZE_DEFAULT;
if (cfg.getProperty(DORIS_TABLET_SIZE) != null) {
try {
tabletsSize = Integer.parseInt(cfg.getProperty(DORIS_TABLET_SIZE));
} catch (NumberFormatException e) {
logger.warn(PARSE_NUMBER_FAILED_MESSAGE, DORIS_TABLET_SIZE, cfg.getProperty(DORIS_TABLET_SIZE));
}
}
if (tabletsSize < DORIS_TABLET_SIZE_MIN) {
logger.warn("{} is less than {}, set to default value {}.",
DORIS_TABLET_SIZE, DORIS_TABLET_SIZE_MIN, DORIS_TABLET_SIZE_MIN);
tabletsSize = DORIS_TABLET_SIZE_MIN;
}
logger.debug("Tablet size is set to {}.", tabletsSize);
return tabletsSize;
}
/**
* choice a Doris BE node to request.
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
* Deprecated, use randomBackendV2 instead
*/
@Deprecated
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException {
List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp()+ ":" + backend.getHttpPort();
}
/**
* translate Doris FE response to inner {@link BackendRow} struct.
* @param response Doris FE response
* @param logger {@link Logger}
* @return inner {@link List<BackendRow>} struct
* @throws DorisException,IOException throw when translate failed
* */
@Deprecated
@VisibleForTesting
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
Backend backend;
try {
backend = mapper.readValue(response, Backend.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (com.fasterxml.jackson.databind.JsonMappingException e) {
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}
if (backend == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
/**
* get Doris BE node list.
* @param logger slf4j logger
* @return the Doris BE node list
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException {
List<String> feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
for (String feNode : feNodeList){
try {
String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
HttpGet httpGet = new HttpGet(beUrl);
String response = send(sparkSettings, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
return backends;
} catch (ConnectedFailedException e) {
logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
}
}
String errMsg = "No Doris FE is available, please check configuration";
logger.error(errMsg);
throw new DorisException(errMsg);
}
/**
* choice a Doris BE node to request.
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
}
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
BackendV2 backend;
try {
backend = mapper.readValue(response, BackendV2.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (com.fasterxml.jackson.databind.JsonMappingException e) {
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}
if (backend == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}
/**
* translate BE tablets map to Doris RDD partition.
* @param cfg configuration of request
* @param be2Tablets BE to tablets {@link Map}
* @param opaquedQueryPlan Doris BE execute plan getting from Doris FE
* @param database database name of Doris table
* @param table table name of Doris table
* @param logger {@link Logger}
* @return Doris RDD partition {@link List}
* @throws IllegalArgumentException throw when translate failed
*/
@VisibleForTesting
static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
logger.debug("Generate partition with beInfo: '{}'.", beInfo);
HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
beInfo.getValue().clear();
beInfo.getValue().addAll(tabletSet);
int first = 0;
while (first < beInfo.getValue().size()) {
Set<Long> partitionTablets = new HashSet<>(beInfo.getValue().subList(
first, Math.min(beInfo.getValue().size(), first + tabletsSize)));
first = first + tabletsSize;
PartitionDefinition partitionDefinition =
new PartitionDefinition(database, table, cfg,
beInfo.getKey(), partitionTablets, opaquedQueryPlan);
logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition);
partitions.add(partitionDefinition);
}
}
return partitions;
}
/**
* choice a Doris FE node to request.
*
* @param feNodes Doris FE node list, separate be comma
* @param logger slf4j logger
* @return the array of Doris FE nodes
* @throws IllegalArgumentException fe nodes is illegal
*/
@VisibleForTesting
static List<String> allEndpoints(String feNodes, Logger logger) throws IllegalArgumentException {
logger.trace("Parse fenodes '{}'.", feNodes);
if (StringUtils.isEmpty(feNodes)) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
throw new IllegalArgumentException("fenodes", feNodes);
}
List<String> nodes = Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
Collections.shuffle(nodes);
return nodes;
}
}