blob: a0a813fb800cc07b38995a44648fc76fd749e6ef [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.Suite
import org.apache.doris.regression.util.Http
import org.apache.doris.regression.util.NodeType
@Grab(group='org.apache.httpcomponents', module='httpclient', version='4.5.13')
import org.apache.http.client.methods.*
import org.apache.http.impl.client.*
import org.apache.http.util.EntityUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.conn.ConnectTimeoutException
import org.apache.http.conn.HttpHostConnectException
import org.apache.http.impl.client.CloseableHttpClient
import org.apache.http.impl.client.HttpClients
import org.apache.http.ssl.SSLContextBuilder
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.security.KeyStore
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
import java.security.PrivateKey
import java.security.spec.PKCS8EncodedKeySpec
import java.security.KeyFactory
import java.util.Base64
import javax.net.ssl.SSLContext
Suite.metaClass.http_client = { String method, String url /* param */ ->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception("Invalid method: ${method}")
}
if (!url || !(url =~ /^https?:\/\/.+/)) {
throw new Exception("Invalid url: ${url}")
}
Integer timeout = 300 // seconds
Integer maxRetries = 10
Integer retryCount = 0
Integer sleepTime = 1000 // milliseconds
logger.info("HTTP request: ${method} ${url}")
CloseableHttpClient httpClient
if ((suite.context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false ) {
url = url.replace("http://", "https://")
//如果url没有https开头,那加上
if (!url.toLowerCase().startsWith("https://")) {
url="https://${url}"
}
KeyStore ks = KeyStore.getInstance(suite.context.config.otherConfigs.get("keyStoreType"))
ks.load(new FileInputStream(suite.context.config.otherConfigs.get("keyStorePath")?.toString()),
context.config.otherConfigs.get("keyStorePassword")?.toCharArray())
KeyStore ts = KeyStore.getInstance(suite.context.config.otherConfigs.get("trustStoreType"))
ts.load(new FileInputStream(suite.context.config.otherConfigs.get("trustStorePath")?.toString()),
suite.context.config.otherConfigs.get("trustStorePassword")?.toCharArray())
SSLContext sslContext = SSLContextBuilder.create()
.loadKeyMaterial(ks, suite.context.config.otherConfigs.get("keyStorePassword")?.toCharArray())
.loadTrustMaterial(ts, null)
.build()
httpClient = HttpClients.custom()
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.setSSLContext(sslContext)
.build()
} else {
httpClient = HttpClients.custom()
.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true))
.build()
}
int code
String err
String out
try {
while (retryCount < maxRetries) {
HttpRequestBase request
if (method == "GET") {
request = new HttpGet(url)
} else if (method == "POST") {
request = new HttpPost(url)
}
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(timeout * 1000)
.setSocketTimeout(timeout * 1000)
.build()
request.setConfig(requestConfig)
try {
CloseableHttpResponse response = httpClient.execute(request)
try {
code = response.getStatusLine().getStatusCode()
out = EntityUtils.toString(response.getEntity())
if (code >= 200 && code < 300) {
code = 0 // to be compatible with the old curl function
err = ""
return [code, out, err]
} else if (code == 500) {
return [code, out, "Internal Server Error"]
} else {
logger.warn("HTTP request failed with status code ${code}, response ${out}, retrying (${++retryCount}/${maxRetries})")
}
} finally {
response.close()
}
} catch (ConnectTimeoutException | HttpHostConnectException e) {
logger.warn("Connection failed, retrying (${++retryCount}/${maxRetries}): ${e.message}")
} catch (SocketTimeoutException e) {
timeout = timeout + 10
logger.warn("Read timed out, retrying (${++retryCount}/${maxRetries}): ${e.message}")
} catch (Exception e) {
code = 500 // Internal Server Error
logger.error("Error executing HTTP request: ${e.message}")
err = e.message
return [code, out, err]
}
sleep(sleepTime)
sleepTime = Math.min(sleepTime * 2, 60000)
}
logger.error("HTTP request failed after ${maxRetries} attempts")
err = "Failed after ${maxRetries} attempts"
code = 500 // Internal Server Error
return [code, out, err]
} finally {
httpClient.close()
}
}
logger.info("Added 'http_client' function to Suite")
Suite.metaClass.curl = { String method, String url, String body = null, Integer timeoutSec = 10, String user = "", String pwd = ""->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
throw new Exception(String.format("invalid curl method: %s", method))
}
if (url.isBlank()) {
throw new Exception("invalid curl url, blank")
}
Integer maxRetries = 10; // Maximum number of retries
Integer retryCount = 0; // Current retry count
Integer sleepTime = 5000; // Sleep time in milliseconds
String auth = "";
if (!user.equals("")) {
auth = String.format("-u%s:%s", user, pwd).toString();
}
String cmd
if (!url.toLowerCase().startsWith("http://") && !url.toLowerCase().startsWith("https://")) {
url="http://${url}"
}
if (method == "POST" && body != null) {
cmd = String.format("curl %s --max-time %d -X %s -H Content-Type:application/json -d %s %s", auth, timeoutSec, method, body, url).toString()
} else {
cmd = String.format("curl %s --max-time %d -X %s %s", auth, timeoutSec, method, url).toString()
}
if ((suite.context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false) {
if (!cmd.contains("https")){
cmd = cmd.replace("http://", "https://")
}
cmd += String.format(" --cert %s --key %s --cacert %s", suite.context.config.otherConfigs.get("trustCert"), suite.context.config.otherConfigs.get("trustCAKey"), suite.context.config.otherConfigs.get("trustCACert"))
}
logger.info("curl cmd: " + cmd)
def process
int code
String err
String out
while (retryCount < maxRetries) {
process = cmd.execute()
def outputGlobber = new ByteArrayOutputStream()
def errorGlobber = new ByteArrayOutputStream()
process.waitForProcessOutput(outputGlobber, errorGlobber)
code = process.exitValue()
err = errorGlobber.toString()
out = outputGlobber.toString()
// If the command was successful, break the loop
if (code == 0) {
break
}
logger.error("Command curl failed, code: " + code + ", err: " + err + ", retry after " + sleepTime + " ms")
// If the command was not successful, increment the retry count, sleep for a while and try again
retryCount++
sleep(sleepTime)
}
// If the command was not successful after maxRetries attempts, log the failure and return the result
if (code != 0) {
logger.error("Command curl failed after " + maxRetries + " attempts. code: " + code + ", err: " + err)
}
return [code, out, err]
}
logger.info("Added 'curl' function to Suite")
Suite.metaClass.show_be_config = { String ip, String port /*param */ ->
Suite suite = delegate as Suite
if ((suite.context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false ) {
return curl("GET", String.format("https://%s:%s/api/show_config", ip, port))
}
return curl("GET", String.format("http://%s:%s/api/show_config", ip, port))
}
logger.info("Added 'show_be_config' function to Suite")
Suite.metaClass.update_be_config = { String ip, String port, String key, String value /*param */ ->
Suite suite = delegate as Suite
if ((suite.context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false ) {
return curl("POST", String.format("https://%s:%s/api/update_config?%s=%s", ip, port, key, value))
}
return curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", ip, port, key, value))
}
logger.info("Added 'update_be_config' function to Suite")
Suite.metaClass.update_all_be_config = { String key, Object value ->
Suite suite = delegate as Suite
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backendId_to_backendIP.each { beId, beIp ->
def port = backendId_to_backendHttpPort.get(beId)
def url = "http://${beIp}:${port}/api/update_config?${key}=${value}"
if ((suite.context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false ) {
url = "https://${beIp}:${port}/api/update_config?${key}=${value}"
}
def result = Http.POST(url, null, true)
assert result.size() == 1, result.toString()
assert result[0].status == "OK", result.toString()
}
}
logger.info("Added 'update_all_be_config' function to Suite")
Suite.metaClass._be_report = { String ip, int port, String reportName ->
Suite suite = delegate as Suite
def url = "http://${ip}:${port}/api/report/${reportName}"
if ((suite.context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true")) ?: false ) {
url = "https://${ip}:${port}/api/report/${reportName}"
}
def result = Http.GET(url, true)
Http.checkHttpResult(result, NodeType.BE)
}
// before report, be need random sleep 5s
Suite.metaClass.be_report_disk = { String ip, int port ->
_be_report(ip, port, "disk")
}
logger.info("Added 'be_report_disk' function to Suite")
// before report, be need random sleep 5s
Suite.metaClass.be_report_tablet = { String ip, int port ->
_be_report(ip, port, "tablet")
}
logger.info("Added 'be_report_tablet' function to Suite")
// before report, be need random sleep 5s
Suite.metaClass.be_report_task = { String ip, int port ->
_be_report(ip, port, "task")
}
logger.info("Added 'be_report_task' function to Suite")
// check nested index file api
Suite.metaClass.check_nested_index_file = { ip, port, tablet_id, expected_rowsets_count, expected_indices_count, format ->
def (code, out, err) = http_client("GET", String.format("http://%s:%s/api/show_nested_index_file?tablet_id=%s", ip, port, tablet_id))
logger.info("Run show_nested_index_file_on_tablet: code=" + code + ", out=" + out + ", err=" + err)
// only when the expected_indices_count is 0, the tablet may not have the index file.
if (code == 500 && expected_indices_count == 0) {
assertEquals("E-6003", parseJson(out.trim()).status)
assertTrue(parseJson(out.trim()).msg.contains("not found"))
return
}
assertTrue(code == 0)
assertEquals(tablet_id, parseJson(out.trim()).tablet_id.toString())
def rowsets_count = parseJson(out.trim()).rowsets.size();
assertEquals(expected_rowsets_count, rowsets_count)
def index_files_count = 0
def segment_files_count = 0
for (def rowset in parseJson(out.trim()).rowsets) {
assertEquals(format, rowset.index_storage_format)
for (int i = 0; i < rowset.segments.size(); i++) {
def segment = rowset.segments[i]
assertEquals(i, segment.segment_id)
def indices_count = segment.indices.size()
assertEquals(expected_indices_count, indices_count)
if (format == "V1") {
index_files_count += indices_count
} else {
index_files_count++
}
}
segment_files_count += rowset.segments.size()
}
if (format == "V1") {
assertEquals(index_files_count, segment_files_count * expected_indices_count)
} else {
assertEquals(index_files_count, segment_files_count)
}
}
logger.info("Added 'check_nested_index_file' function to Suite")