// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.rest;

import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER;
import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;
import java.util.Base64;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;

import com.google.common.annotations.VisibleForTesting;

/**
 * Service for communicate with Doris FE.
 */
public class RestService implements Serializable {
    public final static int REST_RESPONSE_STATUS_OK = 200;
    private static final String API_PREFIX = "/api";
    private static final String SCHEMA = "_schema";
    private static final String QUERY_PLAN = "_query_plan";
    @Deprecated
    private static final String BACKENDS = "/rest/v1/system?path=//backends";
    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";

    /**
     * send request to Doris FE and get response json string.
     * @param cfg configuration of request
     * @param request {@link HttpRequestBase} real request
     * @param logger {@link Logger}
     * @return Doris FE response in json string
     * @throws ConnectedFailedException throw when cannot connect to Doris FE
     */
    private static String send(Settings cfg, HttpRequestBase request, Logger logger) throws
            ConnectedFailedException {
        int connectTimeout = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
                ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT);
        int socketTimeout = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
                ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
        int retries = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
                ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
                connectTimeout, socketTimeout, retries);

        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(connectTimeout)
                .setSocketTimeout(socketTimeout)
                .build();

        request.setConfig(requestConfig);
        String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
        String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
        logger.info("Send request to Doris FE '{}' with user '{}'.", request.getURI(), user);
        IOException ex = null;
        int statusCode = -1;

        for (int attempt = 0; attempt < retries; attempt++) {
            logger.debug("Attempt {} to request {}.", attempt, request.getURI());
            try {
                String response;
                if (request instanceof HttpGet){
                    response = getConnectionGet(request.getURI().toString(), user, password,logger);
                } else {
                    response = getConnectionPost(request,user, password,logger);
                }
                if (response == null) {
                    logger.warn("Failed to get response from Doris FE {}, http code is {}",
                            request.getURI(), statusCode);
                    continue;
                }
                logger.trace("Success get response from Doris FE: {}, response is: {}.",
                        request.getURI(), response);
                ObjectMapper mapper = new ObjectMapper();
                Map map = mapper.readValue(response, Map.class);
                //Handle the problem of inconsistent data format returned by http v1 and v2
                if (map.containsKey("code") && map.containsKey("msg")) {
                    Object data = map.get("data");
                    return mapper.writeValueAsString(data);
                } else {
                    return response;
                }
            } catch (IOException e) {
                ex = e;
                logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
            }
        }

        logger.error(CONNECT_FAILED_MESSAGE, request.getURI(), ex);
        throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
    }

    private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
        URL realUrl = new URL(request);
        // open connection
        HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
        connection.setRequestProperty("Authorization", "Basic " + authEncoding);

        connection.connect();
        return parseResponse(connection,logger);
    }

    private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
        if (connection.getResponseCode() != HttpStatus.SC_OK) {
            logger.warn("Failed to get response from Doris  {}, http code is {}",
                    connection.getURL(), connection.getResponseCode());
            throw new IOException("Failed to get response from Doris");
        }
        StringBuilder result = new StringBuilder("");
        BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream(), "utf-8"));
        String line;
        while ((line = in.readLine()) != null) {
            result.append(line);
        }
        if (in != null) {
            in.close();
        }
        return result.toString();
    }

    private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
        URL url = new URL(request.getURI().toString());
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod(request.getMethod());
        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        InputStream content = ((HttpPost)request).getEntity().getContent();
        String res = IOUtils.toString(content);
        conn.setDoOutput(true);
        conn.setDoInput(true);
        PrintWriter out = new PrintWriter(conn.getOutputStream());
        // send request params
        out.print(res);
        // flush
        out.flush();
        // read response
        return parseResponse(conn,logger);
    }
    /**
     * parse table identifier to array.
     * @param tableIdentifier table identifier string
     * @param logger {@link Logger}
     * @return first element is db name, second element is table name
     * @throws IllegalArgumentException table identifier is illegal
     */
    @VisibleForTesting
    static String[] parseIdentifier(String tableIdentifier, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse identifier '{}'.", tableIdentifier);
        if (StringUtils.isEmpty(tableIdentifier)) {
            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
            throw new IllegalArgumentException("table.identifier", tableIdentifier);
        }
        String[] identifier = tableIdentifier.split("\\.");
        if (identifier.length != 2) {
            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "table.identifier", tableIdentifier);
            throw new IllegalArgumentException("table.identifier", tableIdentifier);
        }
        return identifier;
    }

    /**
     * choice a Doris FE node to request.
     * @param feNodes Doris FE node list, separate be comma
     * @param logger slf4j logger
     * @return the chosen one Doris FE node
     * @throws IllegalArgumentException fe nodes is illegal
     */
    @VisibleForTesting
    static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse fenodes '{}'.", feNodes);
        if (StringUtils.isEmpty(feNodes)) {
            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
            throw new IllegalArgumentException("fenodes", feNodes);
        }
        List<String> nodes = Arrays.asList(feNodes.split(","));
        Collections.shuffle(nodes);
        return nodes.get(0).trim();
    }

    /**
     * get a valid URI to connect Doris FE.
     * @param cfg configuration of request
     * @param logger {@link Logger}
     * @return uri string
     * @throws IllegalArgumentException throw when configuration is illegal
     */
    @VisibleForTesting
    static String getUriStr(Settings cfg, Logger logger) throws IllegalArgumentException {
        String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
        return "http://" +
                randomEndpoint(cfg.getProperty(DORIS_FENODES), logger) + API_PREFIX +
                "/" + identifier[0] +
                "/" + identifier[1] +
                "/";
    }

    @VisibleForTesting
    static String getUriStr(String feNode,Settings cfg, Logger logger) throws IllegalArgumentException {
        String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
        return "http://" +
                feNode + API_PREFIX +
                "/" + identifier[0] +
                "/" + identifier[1] +
                "/";
    }


    /**
     * discover Doris table schema from Doris FE.
     * @param cfg configuration of request
     * @param logger slf4j logger
     * @return Doris table schema
     * @throws DorisException throw when discover failed
     */
    public static Schema getSchema(Settings cfg, Logger logger)
            throws DorisException {
        logger.trace("Finding schema.");
        List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
        for (String feNode: feNodeList) {
            try {
                HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + SCHEMA);
                String response = send(cfg, httpGet, logger);
                logger.debug("Find schema response is '{}'.", response);
                return parseSchema(response, logger);
            } catch (ConnectedFailedException e) {
                logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
            }
        }
        String errMsg = "No Doris FE is available, please check configuration";
        logger.error(errMsg);
        throw new DorisException(errMsg);
    }

    /**
     * translate Doris FE response to inner {@link Schema} struct.
     * @param response Doris FE response
     * @param logger {@link Logger}
     * @return inner {@link Schema} struct
     * @throws DorisException throw when translate failed
     */
    @VisibleForTesting
    public static Schema parseSchema(String response, Logger logger) throws DorisException {
        logger.trace("Parse response '{}' to schema.", response);
        ObjectMapper mapper = new ObjectMapper();
        Schema schema;
        try {
            schema = mapper.readValue(response, Schema.class);
        } catch (JsonParseException e) {
            String errMsg = "Doris FE's response is not a json. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (JsonMappingException e) {
            String errMsg = "Doris FE's response cannot map to schema. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (IOException e) {
            String errMsg = "Parse Doris FE's response to json failed. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        }

        if (schema == null) {
            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
            throw new ShouldNeverHappenException();
        }

        if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
            String errMsg = "Doris FE's response is not OK, status is " + schema.getStatus();
            logger.error(errMsg);
            throw new DorisException(errMsg);
        }
        logger.debug("Parsing schema result is '{}'.", schema);
        return schema;
    }

    /**
     * find Doris RDD partitions from Doris FE.
     * @param cfg configuration of request
     * @param logger {@link Logger}
     * @return an list of Doris RDD partitions
     * @throws DorisException throw when find partition failed
     */
    public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logger) throws DorisException {
        String[] tableIdentifiers = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
        String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
                " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
        if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
            sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
        }
        logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);

        List<String> feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger);
        for (String feNode: feNodeList) {
            try {
                HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) + QUERY_PLAN);
                String entity = "{\"sql\": \""+ sql +"\"}";
                logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
                StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
                stringEntity.setContentEncoding("UTF-8");
                stringEntity.setContentType("application/json");
                httpPost.setEntity(stringEntity);

                String resStr = send(cfg, httpPost, logger);
                logger.debug("Find partition response is '{}'.", resStr);
                QueryPlan queryPlan = getQueryPlan(resStr, logger);
                Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
                return tabletsMapToPartition(
                        cfg,
                        be2Tablets,
                        queryPlan.getOpaqued_query_plan(),
                        tableIdentifiers[0],
                        tableIdentifiers[1],
                        logger);
            } catch (ConnectedFailedException e) {
                logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
            }
        }
        String errMsg = "No Doris FE is available, please check configuration";
        logger.error(errMsg);
        throw new DorisException(errMsg);

    }

    /**
     * translate Doris FE response string to inner {@link QueryPlan} struct.
     * @param response Doris FE response string
     * @param logger {@link Logger}
     * @return inner {@link QueryPlan} struct
     * @throws DorisException throw when translate failed.
     */
    @VisibleForTesting
    static QueryPlan getQueryPlan(String response, Logger logger) throws DorisException {
        ObjectMapper mapper = new ObjectMapper();
        QueryPlan queryPlan;
        try {
            queryPlan = mapper.readValue(response, QueryPlan.class);
        } catch (JsonParseException e) {
            String errMsg = "Doris FE's response is not a json. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (JsonMappingException e) {
            String errMsg = "Doris FE's response cannot map to schema. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (IOException e) {
            String errMsg = "Parse Doris FE's response to json failed. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        }

        if (queryPlan == null) {
            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
            throw new ShouldNeverHappenException();
        }

        if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
            String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus();
            logger.error(errMsg);
            throw new DorisException(errMsg);
        }
        logger.debug("Parsing partition result is '{}'.", queryPlan);
        return queryPlan;
    }

    /**
     * select which Doris BE to get tablet data.
     * @param queryPlan {@link QueryPlan} translated from Doris FE response
     * @param logger {@link Logger}
     * @return BE to tablets {@link Map}
     * @throws DorisException throw when select failed.
     */
    @VisibleForTesting
    static  Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
        Map<String, List<Long>> be2Tablets = new HashMap<>();
        for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
            logger.debug("Parse tablet info: '{}'.", part);
            long tabletId;
            try {
                tabletId = Long.parseLong(part.getKey());
            } catch (NumberFormatException e) {
                String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed.";
                logger.error(errMsg, e);
                throw new DorisException(errMsg, e);
            }
            String target = null;
            int tabletCount = Integer.MAX_VALUE;
            for (String candidate : part.getValue().getRoutings()) {
                logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", candidate, tabletId);
                if (!be2Tablets.containsKey(candidate)) {
                    logger.debug("Choice a new Doris BE '{}' for tablet '{}'.", candidate, tabletId);
                    List<Long> tablets = new ArrayList<>();
                    be2Tablets.put(candidate, tablets);
                    target = candidate;
                    break;
                } else {
                    if (be2Tablets.get(candidate).size() < tabletCount) {
                        target = candidate;
                        tabletCount = be2Tablets.get(candidate).size();
                        logger.debug("Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.",
                                tabletId, target, tabletCount);
                    }
                }
            }
            if (target == null) {
                String errMsg = "Cannot choice Doris BE for tablet " + tabletId;
                logger.error(errMsg);
                throw new DorisException(errMsg);
            }

            logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, tabletId);
            be2Tablets.get(target).add(tabletId);
        }
        return be2Tablets;
    }

    /**
     * tablet count limit for one Doris RDD partition
     * @param cfg configuration of request
     * @param logger {@link Logger}
     * @return tablet count limit
     */
    @VisibleForTesting
    static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
        int tabletsSize = DORIS_TABLET_SIZE_DEFAULT;
        if (cfg.getProperty(DORIS_TABLET_SIZE) != null) {
            try {
                tabletsSize = Integer.parseInt(cfg.getProperty(DORIS_TABLET_SIZE));
            } catch (NumberFormatException e) {
                logger.warn(PARSE_NUMBER_FAILED_MESSAGE, DORIS_TABLET_SIZE, cfg.getProperty(DORIS_TABLET_SIZE));
            }
        }
        if (tabletsSize < DORIS_TABLET_SIZE_MIN) {
            logger.warn("{} is less than {}, set to default value {}.",
                    DORIS_TABLET_SIZE, DORIS_TABLET_SIZE_MIN, DORIS_TABLET_SIZE_MIN);
            tabletsSize = DORIS_TABLET_SIZE_MIN;
        }
        logger.debug("Tablet size is set to {}.", tabletsSize);
        return tabletsSize;
    }

    /**
     * choice a Doris BE node to request.
     * @param logger slf4j logger
     * @return the chosen one Doris BE node
     * @throws IllegalArgumentException BE nodes is illegal
     * Deprecated, use randomBackendV2 instead
     */
    @Deprecated
    @VisibleForTesting
    public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException {
        List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
        Collections.shuffle(backends);
        BackendV2.BackendRowV2 backend = backends.get(0);
        return backend.getIp()+ ":" + backend.getHttpPort();
    }

    /**
     * translate Doris FE response to inner {@link BackendRow} struct.
     * @param response Doris FE response
     * @param logger {@link Logger}
     * @return inner {@link List<BackendRow>} struct
     * @throws DorisException,IOException throw when translate failed
     * */
    @Deprecated
    @VisibleForTesting
    static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
        com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
        Backend backend;
        try {
            backend = mapper.readValue(response, Backend.class);
        } catch (com.fasterxml.jackson.core.JsonParseException e) {
            String errMsg = "Doris BE's response is not a json. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (com.fasterxml.jackson.databind.JsonMappingException e) {
            String errMsg = "Doris BE's response cannot map to schema. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (IOException e) {
            String errMsg = "Parse Doris BE's response to json failed. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        }

        if (backend == null) {
            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
            throw new ShouldNeverHappenException();
        }
        List<BackendRow> backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
        logger.debug("Parsing schema result is '{}'.", backendRows);
        return backendRows;
    }

    /**
     * get Doris BE node list.
     * @param logger slf4j logger
     * @return the Doris BE node list
     * @throws IllegalArgumentException BE nodes is illegal
     */
    @VisibleForTesting
    public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings,  Logger logger) throws DorisException {
        List<String> feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger);
        for (String feNode : feNodeList){
            try {
                String beUrl =   String.format("http://%s" + BACKENDS_V2, feNode);
                HttpGet httpGet = new HttpGet(beUrl);
                String response = send(sparkSettings, httpGet, logger);
                logger.info("Backend Info:{}", response);
                List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
                logger.trace("Parse beNodes '{}'.", backends);
                if (backends == null || backends.isEmpty()) {
                    logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
                    throw new IllegalArgumentException("beNodes", String.valueOf(backends));
                }
                return backends;
            } catch (ConnectedFailedException e) {
                logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage());
            }
        }
        String errMsg = "No Doris FE is available, please check configuration";
        logger.error(errMsg);
        throw new DorisException(errMsg);
    }

    /**
     * choice a Doris BE node to request.
     * @param logger slf4j logger
     * @return the chosen one Doris BE node
     * @throws IllegalArgumentException BE nodes is illegal
     */
    @VisibleForTesting
    public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
        List<BackendV2.BackendRowV2> backends = getBackendRows(sparkSettings, logger);
        Collections.shuffle(backends);
        BackendV2.BackendRowV2 backend = backends.get(0);
        return backend.getIp() + ":" + backend.getHttpPort();
    }

    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
        com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
        BackendV2 backend;
        try {
            backend = mapper.readValue(response, BackendV2.class);
        } catch (com.fasterxml.jackson.core.JsonParseException e) {
            String errMsg = "Doris BE's response is not a json. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (com.fasterxml.jackson.databind.JsonMappingException e) {
            String errMsg = "Doris BE's response cannot map to schema. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        } catch (IOException e) {
            String errMsg = "Parse Doris BE's response to json failed. res: " + response;
            logger.error(errMsg, e);
            throw new DorisException(errMsg, e);
        }

        if (backend == null) {
            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
            throw new ShouldNeverHappenException();
        }
        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
        logger.debug("Parsing schema result is '{}'.", backendRows);
        return backendRows;
    }

    /**
     * translate BE tablets map to Doris RDD partition.
     * @param cfg configuration of request
     * @param be2Tablets BE to tablets {@link Map}
     * @param opaquedQueryPlan Doris BE execute plan getting from Doris FE
     * @param database database name of Doris table
     * @param table table name of Doris table
     * @param logger {@link Logger}
     * @return Doris RDD partition {@link List}
     * @throws IllegalArgumentException throw when translate failed
     */
    @VisibleForTesting
    static List<PartitionDefinition> tabletsMapToPartition(Settings cfg, Map<String, List<Long>> be2Tablets,
            String opaquedQueryPlan, String database, String table, Logger logger)
            throws IllegalArgumentException {
        int tabletsSize = tabletCountLimitForOnePartition(cfg, logger);
        List<PartitionDefinition> partitions = new ArrayList<>();
        for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
            logger.debug("Generate partition with beInfo: '{}'.", beInfo);
            HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
            beInfo.getValue().clear();
            beInfo.getValue().addAll(tabletSet);
            int first = 0;
            while (first < beInfo.getValue().size()) {
                Set<Long> partitionTablets = new HashSet<>(beInfo.getValue().subList(
                        first, Math.min(beInfo.getValue().size(), first + tabletsSize)));
                first = first + tabletsSize;
                PartitionDefinition partitionDefinition =
                        new PartitionDefinition(database, table, cfg,
                                beInfo.getKey(), partitionTablets, opaquedQueryPlan);
                logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition);
                partitions.add(partitionDefinition);
            }
        }
        return partitions;
    }

    /**
     * choice a Doris FE node to request.
     *
     * @param feNodes Doris FE node list, separate be comma
     * @param logger  slf4j logger
     * @return the array of Doris FE nodes
     * @throws IllegalArgumentException fe nodes is illegal
     */
    @VisibleForTesting
    static List<String> allEndpoints(String feNodes, Logger logger) throws IllegalArgumentException {
        logger.trace("Parse fenodes '{}'.", feNodes);
        if (StringUtils.isEmpty(feNodes)) {
            logger.error(ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
            throw new IllegalArgumentException("fenodes", feNodes);
        }
        List<String> nodes = Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
        Collections.shuffle(nodes);
        return nodes;
    }
}
