| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| import org.apache.doris.regression.suite.Suite |
| import org.apache.doris.regression.util.Http |
| import org.apache.doris.regression.util.NodeType |
| @Grab(group='org.apache.httpcomponents', module='httpclient', version='4.5.13') |
| import org.apache.http.client.methods.* |
| import org.apache.http.impl.client.* |
| import org.apache.http.util.EntityUtils |
| import org.apache.http.client.config.RequestConfig |
| import org.apache.http.conn.ConnectTimeoutException |
| import org.apache.http.conn.HttpHostConnectException |
| import org.codehaus.groovy.runtime.IOGroovyMethods |
| |
| |
| Suite.metaClass.http_client = { String method, String url /* param */ -> |
| Suite suite = delegate as Suite |
| if (method != "GET" && method != "POST") { |
| throw new Exception("Invalid method: ${method}") |
| } |
| if (!url || !(url =~ /^https?:\/\/.+/)) { |
| throw new Exception("Invalid url: ${url}") |
| } |
| |
| Integer timeout = 300 // seconds |
| Integer maxRetries = 10 |
| Integer retryCount = 0 |
| Integer sleepTime = 1000 // milliseconds |
| |
| logger.info("HTTP request: ${method} ${url}") |
| |
| CloseableHttpClient httpClient = HttpClients.custom() |
| .setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)) |
| .build() |
| |
| int code |
| String err |
| String out |
| |
| try { |
| while (retryCount < maxRetries) { |
| HttpRequestBase request |
| if (method == "GET") { |
| request = new HttpGet(url) |
| } else if (method == "POST") { |
| request = new HttpPost(url) |
| } |
| |
| RequestConfig requestConfig = RequestConfig.custom() |
| .setConnectTimeout(timeout * 1000) |
| .setSocketTimeout(timeout * 1000) |
| .build() |
| request.setConfig(requestConfig) |
| |
| try { |
| CloseableHttpResponse response = httpClient.execute(request) |
| try { |
| code = response.getStatusLine().getStatusCode() |
| out = EntityUtils.toString(response.getEntity()) |
| |
| if (code >= 200 && code < 300) { |
| code = 0 // to be compatible with the old curl function |
| err = "" |
| return [code, out, err] |
| } else if (code == 500) { |
| return [code, out, "Internal Server Error"] |
| } else { |
| logger.warn("HTTP request failed with status code ${code}, response ${out}, retrying (${++retryCount}/${maxRetries})") |
| } |
| } finally { |
| response.close() |
| } |
| } catch (ConnectTimeoutException | HttpHostConnectException e) { |
| logger.warn("Connection failed, retrying (${++retryCount}/${maxRetries}): ${e.message}") |
| } catch (SocketTimeoutException e) { |
| timeout = timeout + 10 |
| logger.warn("Read timed out, retrying (${++retryCount}/${maxRetries}): ${e.message}") |
| } catch (Exception e) { |
| code = 500 // Internal Server Error |
| logger.error("Error executing HTTP request: ${e.message}") |
| err = e.message |
| return [code, out, err] |
| } |
| |
| sleep(sleepTime) |
| sleepTime = Math.min(sleepTime * 2, 60000) |
| } |
| |
| logger.error("HTTP request failed after ${maxRetries} attempts") |
| err = "Failed after ${maxRetries} attempts" |
| code = 500 // Internal Server Error |
| return [code, out, err] |
| } finally { |
| httpClient.close() |
| } |
| } |
| |
| logger.info("Added 'http_client' function to Suite") |
| |
| Suite.metaClass.curl = { String method, String url, String body = null, Integer timeoutSec = 10 /* param */-> |
| Suite suite = delegate as Suite |
| if (method != "GET" && method != "POST") { |
| throw new Exception(String.format("invalid curl method: %s", method)) |
| } |
| if (url.isBlank()) { |
| throw new Exception("invalid curl url, blank") |
| } |
| |
| Integer maxRetries = 10; // Maximum number of retries |
| Integer retryCount = 0; // Current retry count |
| Integer sleepTime = 5000; // Sleep time in milliseconds |
| |
| String cmd |
| if (method == "POST" && body != null) { |
| cmd = String.format("curl --max-time %d -X %s -H Content-Type:application/json -d %s %s", timeoutSec, method, body, url).toString() |
| } else { |
| cmd = String.format("curl --max-time %d -X %s %s", timeoutSec, method, url).toString() |
| } |
| |
| logger.info("curl cmd: " + cmd) |
| def process |
| int code |
| String err |
| String out |
| |
| while (retryCount < maxRetries) { |
| process = cmd.execute() |
| code = process.waitFor() |
| err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))) |
| out = process.getText() |
| |
| // If the command was successful, break the loop |
| if (code == 0) { |
| break |
| } |
| |
| // If the command was not successful, increment the retry count, sleep for a while and try again |
| retryCount++ |
| sleep(sleepTime) |
| } |
| |
| // If the command was not successful after maxRetries attempts, log the failure and return the result |
| if (code != 0) { |
| logger.error("Command curl failed after " + maxRetries + " attempts. code: " + code + ", err: " + err) |
| } |
| |
| return [code, out, err] |
| } |
| logger.info("Added 'curl' function to Suite") |
| |
| Suite.metaClass.show_be_config = { String ip, String port /*param */ -> |
| return curl("GET", String.format("http://%s:%s/api/show_config", ip, port)) |
| } |
| |
| logger.info("Added 'show_be_config' function to Suite") |
| |
| Suite.metaClass.update_be_config = { String ip, String port, String key, String value /*param */ -> |
| return curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", ip, port, key, value)) |
| } |
| |
| logger.info("Added 'update_be_config' function to Suite") |
| |
| Suite.metaClass.update_all_be_config = { String key, Object value -> |
| def backendId_to_backendIP = [:] |
| def backendId_to_backendHttpPort = [:] |
| getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); |
| backendId_to_backendIP.each { beId, beIp -> |
| def port = backendId_to_backendHttpPort.get(beId) |
| def url = "http://${beIp}:${port}/api/update_config?${key}=${value}" |
| def result = Http.POST(url, null, true) |
| assert result.size() == 1, result.toString() |
| assert result[0].status == "OK", result.toString() |
| } |
| } |
| |
| logger.info("Added 'update_all_be_config' function to Suite") |
| |
| Suite.metaClass._be_report = { String ip, int port, String reportName -> |
| def url = "http://${ip}:${port}/api/report/${reportName}" |
| def result = Http.GET(url, true) |
| Http.checkHttpResult(result, NodeType.BE) |
| } |
| |
| // before report, be need random sleep 5s |
| Suite.metaClass.be_report_disk = { String ip, int port -> |
| _be_report(ip, port, "disk") |
| } |
| |
| logger.info("Added 'be_report_disk' function to Suite") |
| |
| // before report, be need random sleep 5s |
| Suite.metaClass.be_report_tablet = { String ip, int port -> |
| _be_report(ip, port, "tablet") |
| } |
| |
| logger.info("Added 'be_report_tablet' function to Suite") |
| |
| // before report, be need random sleep 5s |
| Suite.metaClass.be_report_task = { String ip, int port -> |
| _be_report(ip, port, "task") |
| } |
| |
| logger.info("Added 'be_report_task' function to Suite") |
| |
| // check nested index file api |
| Suite.metaClass.check_nested_index_file = { ip, port, tablet_id, expected_rowsets_count, expected_indices_count, format -> |
| def (code, out, err) = http_client("GET", String.format("http://%s:%s/api/show_nested_index_file?tablet_id=%s", ip, port, tablet_id)) |
| logger.info("Run show_nested_index_file_on_tablet: code=" + code + ", out=" + out + ", err=" + err) |
| // only when the expected_indices_count is 0, the tablet may not have the index file. |
| if (code == 500 && expected_indices_count == 0) { |
| assertEquals("E-6003", parseJson(out.trim()).status) |
| assertTrue(parseJson(out.trim()).msg.contains("not found")) |
| return |
| } |
| assertTrue(code == 0) |
| assertEquals(tablet_id, parseJson(out.trim()).tablet_id.toString()) |
| def rowsets_count = parseJson(out.trim()).rowsets.size(); |
| assertEquals(expected_rowsets_count, rowsets_count) |
| def index_files_count = 0 |
| def segment_files_count = 0 |
| for (def rowset in parseJson(out.trim()).rowsets) { |
| assertEquals(format, rowset.index_storage_format) |
| for (int i = 0; i < rowset.segments.size(); i++) { |
| def segment = rowset.segments[i] |
| assertEquals(i, segment.segment_id) |
| def indices_count = segment.indices.size() |
| assertEquals(expected_indices_count, indices_count) |
| if (format == "V1") { |
| index_files_count += indices_count |
| } else { |
| index_files_count++ |
| } |
| } |
| segment_files_count += rowset.segments.size() |
| } |
| if (format == "V1") { |
| assertEquals(index_files_count, segment_files_count * expected_indices_count) |
| } else { |
| assertEquals(index_files_count, segment_files_count) |
| } |
| } |
| |
| logger.info("Added 'check_nested_index_file' function to Suite") |
| |