blob: 05c43bbeebf12b91bf7b6b2b3c97ce83020a48b9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.Suite
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.NodeType
@Grab(group='org.apache.httpcomponents', module='httpclient', version='4.5.13')
import org.apache.http.client.methods.*
import org.apache.http.impl.client.*
import org.apache.http.util.EntityUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.conn.ConnectTimeoutException
import org.apache.http.conn.HttpHostConnectException
import org.codehaus.groovy.runtime.IOGroovyMethods
Suite.metaClass.http_client = { String method, String url /* param */ ->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception("Invalid method: ${method}")
}
if (!url || !(url =~ /^https?:\/\/.+/)) {
throw new Exception("Invalid url: ${url}")
}
Integer timeout = 300 // seconds
Integer maxRetries = 10
Integer retryCount = 0
Integer sleepTime = 1000 // milliseconds
logger.info("HTTP request: ${method} ${url}")
CloseableHttpClient httpClient = HttpClients.custom()
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.build()
int code
String err
String out
try {
while (retryCount < maxRetries) {
HttpRequestBase request
if (method == "GET") {
request = new HttpGet(url)
} else if (method == "POST") {
request = new HttpPost(url)
}
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000)
.build()
request.setConfig(requestConfig)
try {
CloseableHttpResponse response = httpClient.execute(request)
try {
code = response.getStatusLine().getStatusCode()
out = EntityUtils.toString(response.getEntity())
if (code >= 200 && code < 300) {
code = 0 // to be compatible with the old curl function
err = ""
return [code, out, err]
} else if (code == 500) {
return [code, out, "Internal Server Error"]
} else {
logger.warn("HTTP request failed with status code ${code}, response ${out}, retrying (${++retryCount}/${maxRetries})")
}
} finally {
response.close()
}
} catch (ConnectTimeoutException | HttpHostConnectException e) {
logger.warn("Connection failed, retrying (${++retryCount}/${maxRetries}): ${e.message}")
} catch (SocketTimeoutException e) {
timeout = timeout + 10
logger.warn("Read timed out, retrying (${++retryCount}/${maxRetries}): ${e.message}")
} catch (Exception e) {
code = 500 // Internal Server Error
logger.error("Error executing HTTP request: ${e.message}")
err = e.message
return [code, out, err]
}
sleep(sleepTime)
sleepTime = Math.min(sleepTime * 2, 60000)
}
logger.error("HTTP request failed after ${maxRetries} attempts")
err = "Failed after ${maxRetries} attempts"
code = 500 // Internal Server Error
return [code, out, err]
} finally {
httpClient.close()
}
}
logger.info("Added 'http_client' function to Suite")
Suite.metaClass.curl = { String method, String url, String body = null, Integer timeoutSec = 10 /* param */->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception(String.format("invalid curl method: %s", method))
}
if (url.isBlank()) {
throw new Exception("invalid curl url, blank")
}
Integer maxRetries = 10; // Maximum number of retries
Integer retryCount = 0; // Current retry count
Integer sleepTime = 5000; // Sleep time in milliseconds
String cmd
if (method == "POST" && body != null) {
cmd = String.format("curl --max-time %d -X %s -H Content-Type:application/json -d %s %s", timeoutSec, method, body, url).toString()
} else {
cmd = String.format("curl --max-time %d -X %s %s", timeoutSec, method, url).toString()
}
logger.info("curl cmd: " + cmd)
def process
int code
String err
String out
while (retryCount < maxRetries) {
process = cmd.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())))
out = process.getText()
// If the command was successful, break the loop
if (code == 0) {
break
}
// If the command was not successful, increment the retry count, sleep for a while and try again
retryCount++
sleep(sleepTime)
}
// If the command was not successful after maxRetries attempts, log the failure and return the result
if (code != 0) {
logger.error("Command curl failed after " + maxRetries + " attempts. code: " + code + ", err: " + err)
}
return [code, out, err]
}
logger.info("Added 'curl' function to Suite")
Suite.metaClass.show_be_config = { String ip, String port /*param */ ->
return curl("GET", String.format("http://%s:%s/api/show_config", ip, port))
}
logger.info("Added 'show_be_config' function to Suite")
Suite.metaClass.update_be_config = { String ip, String port, String key, String value /*param */ ->
return curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", ip, port, key, value))
}
logger.info("Added 'update_be_config' function to Suite")
Suite.metaClass.update_all_be_config = { String key, Object value ->
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backendId_to_backendIP.each { beId, beIp ->
def port = backendId_to_backendHttpPort.get(beId)
def url = "http://${beIp}:${port}/api/update_config?${key}=${value}"
def result = Http.POST(url, null, true)
assert result.size() == 1, result.toString()
assert result[0].status == "OK", result.toString()
}
}
logger.info("Added 'update_all_be_config' function to Suite")
Suite.metaClass._be_report = { String ip, int port, String reportName ->
def url = "http://${ip}:${port}/api/report/${reportName}"
def result = Http.GET(url, true)
Http.checkHttpResult(result, NodeType.BE)
}
// before report, be need random sleep 5s
Suite.metaClass.be_report_disk = { String ip, int port ->
_be_report(ip, port, "disk")
}
logger.info("Added 'be_report_disk' function to Suite")
// before report, be need random sleep 5s
Suite.metaClass.be_report_tablet = { String ip, int port ->
_be_report(ip, port, "tablet")
}
logger.info("Added 'be_report_tablet' function to Suite")
// before report, be need random sleep 5s
Suite.metaClass.be_report_task = { String ip, int port ->
_be_report(ip, port, "task")
}
logger.info("Added 'be_report_task' function to Suite")
// check nested index file api
Suite.metaClass.check_nested_index_file = { ip, port, tablet_id, expected_rowsets_count, expected_indices_count, format ->
def (code, out, err) = http_client("GET", String.format("http://%s:%s/api/show_nested_index_file?tablet_id=%s", ip, port, tablet_id))
logger.info("Run show_nested_index_file_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
// only when the expected_indices_count is 0, the tablet may not have the index file.
if (code == 500 && expected_indices_count == 0) {
assertEquals("E-6003", parseJson(out.trim()).status)
assertTrue(parseJson(out.trim()).msg.contains("not found"))
return
}
assertTrue(code == 0)
assertEquals(tablet_id, parseJson(out.trim()).tablet_id.toString())
def rowsets_count = parseJson(out.trim()).rowsets.size();
assertEquals(expected_rowsets_count, rowsets_count)
def index_files_count = 0
def segment_files_count = 0
for (def rowset in parseJson(out.trim()).rowsets) {
assertEquals(format, rowset.index_storage_format)
for (int i = 0; i < rowset.segments.size(); i++) {
def segment = rowset.segments[i]
assertEquals(i, segment.segment_id)
def indices_count = segment.indices.size()
assertEquals(expected_indices_count, indices_count)
if (format == "V1") {
index_files_count += indices_count
} else {
index_files_count++
}
}
segment_files_count += rowset.segments.size()
}
if (format == "V1") {
assertEquals(index_files_count, segment_files_count * expected_indices_count)
} else {
assertEquals(index_files_count, segment_files_count)
}
}
logger.info("Added 'check_nested_index_file' function to Suite")