blob: 25019937ced8ea1d1ec4ee7ec4dfc41f74811b9b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import groovy.json.JsonOutput
import org.apache.doris.regression.suite.Suite
import org.codehaus.groovy.runtime.IOGroovyMethods
Suite.metaClass.repeate_stream_load_same_data = { String tableName, int loadTimes, String filePath->
for (int i = 0; i < loadTimes; i++) {
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
file """${getS3Url()}/${filePath}"""
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}
Suite.metaClass.stream_load_partial_update_data = { String tableName->
for (int i = 0; i < 20; i++) {
int start = i * 10 + 1
int end = (i + 1) * 10
def elements = (start..end).collect { "a$it" }
String columns = "id," + elements.join(',')
streamLoad {
table tableName
set 'column_separator', '|'
set 'compress_type', 'GZ'
set 'columns', columns
set 'partial_columns', 'true'
file """${getS3Url()}/regression/show_data/fullData.1.part${i+1}.gz"""
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
}
Suite.metaClass.get_tablets_from_table = { String table ->
def res = sql_return_maparray """show tablets from ${table}"""
logger.info("get tablets from ${table}:" + res)
return res
}
Suite.metaClass.show_tablet_compaction = { HashMap tablet ->
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET ")
sb.append(tablet["CompactionStatus"])
String command = sb.toString()
logger.info(command)
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
return parseJson(out.trim())
}
Suite.metaClass.trigger_tablet_compaction = { HashMap tablet, String compact_type ->
//support trigger base/cumulative/full compaction
def tabletStatusBeforeCompaction = show_tablet_compaction(tablet)
String tabletInBe = tablet
String showCompactionStatus = tablet["CompactionStatus"]
String triggerCompactionUrl = showCompactionStatus.split("show")[0] + "run?tablet_id=" + tablet["TabletId"] + "&compact_type=" + compact_type
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST ")
sb.append(triggerCompactionUrl)
String command = sb.toString()
logger.info(command)
def process = command.execute()
def code = process.waitFor()
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
def out = process.getText()
def outJson = parseJson(out)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
// if code = 0 means compaction happend, need to check
// other condition may indicate no suitable compaction condition
if ( code == 0 && outJson.status.toLowerCase() == "success" ){
def compactionStatus = "RUNNING"
def tabletStatusAfterCompaction = null
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
tabletStatusAfterCompaction = show_tablet_compaction(tablet)
logger.info("tabletStatusAfterCompaction class: " + tabletStatusAfterCompaction.class)
logger.info("hhhhhh: " + tabletStatusAfterCompaction.toString())
if (tabletStatusAfterCompaction.rowsets.size() < tabletStatusBeforeCompaction.rowsets.size()){
compactionStatus = 'FINISHED'
}
Thread.sleep(10 * 1000)
} while (timeoutTimestamp > System.currentTimeMillis() && (compactionStatus != 'FINISHED'))
if (compactionStatus != "FINISHED") {
logger.info("compaction not Finish or failed")
return false
}
}
}
Suite.metaClass.trigger_compaction = { List<List<Object>> tablets ->
for(def tablet: tablets) {
trigger_tablet_compaction(tablet, "full")
}
}
Suite.metaClass.caculate_table_data_size_in_backend_storage = { List<List<Object>> tablets ->
def storageType = context.config.otherConfigs.get("storageProvider")
Double storageSize = 0
List<String> tabletIds = []
for(def tablet: tablets) {
tabletIds.add(tablet["TabletId"])
}
if (storageType.toLowerCase() == "oss") {
//cbs means cluster backend storage
def ak = context.config.otherConfigs.get("cbsS3Ak")
def sk = context.config.otherConfigs.get("cbsS3Sk")
def endpoint = context.config.otherConfigs.get("cbsS3Endpoint")
def bucketName = context.config.otherConfigs.get("cbsS3Bucket")
def storagePrefix = context.config.otherConfigs.get("cbsS3Prefix")
def client = initOssClient(ak, sk, endpoint)
for(String tabletId: tabletIds) {
storageSize += calculateFolderLength(client, bucketName, storagePrefix + "data/" + tabletId)
}
shutDownOssClient(client)
}
if (storageType.toLowerCase() == "hdfs") {
def fsName = context.config.otherConfigs.get("cbsFsName")
def isKerberosFs = context.config.otherConfigs.get("cbsFsKerberos")
def fsUser = context.config.otherConfigs.get("cbsFsUser")
def storagePrefix = context.config.otherConfigs.get("cbsFsPrefix")
}
def round_size = new BigDecimal(storageSize/1024/1024).setScale(0, BigDecimal.ROUND_FLOOR);
return round_size
}
Suite.metaClass.translate_different_unit_to_MB = { String size, String unitField ->
Double sizeKb = 0.0
if (unitField == "KB") {
sizeKb = Double.parseDouble(size) / 1024
} else if (unitField == "MB") {
sizeKb = Double.parseDouble(size)
} else if (unitField == "GB") {
sizeKb = Double.parseDouble(size) * 1024 * 1024
} else if (unitField == "TB") {
sizeKb = Double.parseDouble(size) * 1024 * 1024 * 1024
}
return sizeKb
}
Suite.metaClass.show_table_data_size_through_mysql = { String table ->
logger.info("[show_table_data_size_through_mysql] 表名: ${table}")
def mysqlShowDataSize = 0L
def res = sql_return_maparray " show data from ${table}"
logger.info("[show_table_data_size_through_mysql] show data结果: " + res.toString())
def tableSizeInfo = res[0]
def fields = tableSizeInfo["Size"].split(" ")
if (fields.length == 2 ){
def sizeField = fields[0]
def unitField = fields[1]
mysqlShowDataSize = translate_different_unit_to_MB(sizeField, unitField)
}
def round_size = new BigDecimal(mysqlShowDataSize).setScale(0, BigDecimal.ROUND_FLOOR);
logger.info("[show_table_data_size_through_mysql] 最终结果: ${round_size} MB")
return round_size
}
Suite.metaClass.caculate_table_data_size_through_api = { List<List<Object>> tablets ->
Double apiCaculateSize = 0
for (HashMap tablet in tablets) {
def tabletId = tablet.TabletId
def tabletStatus = show_tablet_compaction(tablet)
logger.info("[caculate_table_data_size_through_api] tablet ID: ${tabletId}, status: " + tabletStatus.toString())
for(String rowset: tabletStatus.rowsets){
def fields = rowset.split(" ")
if (fields.length == 7) {
def sizeField = fields[-2] // the last field(size)
def unitField = fields[-1] // The second to last field(unit)
// 转换成 KB
apiCaculateSize += translate_different_unit_to_MB(sizeField, unitField )
}
}
}
def round_size = new BigDecimal(apiCaculateSize).setScale(0, BigDecimal.ROUND_FLOOR);
logger.info("[caculate_table_data_size_through_api] 最终结果: ${round_size} MB")
return round_size
}
Suite.metaClass.update_ms_config = { String ms_endpoint, String key, String value /*param */ ->
return curl("POST", String.format("http://%s/MetaService/http/v1/update_config?token=%s&configs=%s=%s", ms_endpoint, context.config.metaServiceToken, key, value))
}
Suite.metaClass.set_config_before_show_data_test = { ->
sql """admin set frontend config ("tablet_stat_update_interval_second" = "1")"""
sql """admin set frontend config ("catalog_trash_expire_second" = "1")"""
def backendIdToBackendIP = [:]
def backendIdToBackendHttpPort = [:]
getBackendIpHttpPort(backendIdToBackendIP, backendIdToBackendHttpPort);
def backendId = backendIdToBackendIP.keySet()[0]
def get_be_param = { paramName ->
// assuming paramName on all BEs have save value
def (code, out, err) = show_be_config(backendIdToBackendIP.get(backendId), backendIdToBackendHttpPort.get(backendId))
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == paramName) {
return ((List<String>) ele)[2]
}
}
}
def ms_endpoint = get_be_param("meta_service_endpoint");
def (code, out, err) = update_ms_config(ms_endpoint, "recycle_interval_seconds", "5")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "retention_seconds", "0")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "compacted_rowset_retention_seconds", "0")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "recycle_job_lease_expired_ms", "0")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "dropped_partition_retention_seconds", "0")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "label_keep_max_second", "0")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "copy_job_max_retention_second", "0")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
Suite.metaClass.set_config_after_show_data_test = { ->
sql """admin set frontend config ("tablet_stat_update_interval_second" = "10")"""
sql """admin set frontend config ("catalog_trash_expire_second" = "600")"""
def backendIdToBackendIP = [:]
def backendIdToBackendHttpPort = [:]
getBackendIpHttpPort(backendIdToBackendIP, backendIdToBackendHttpPort);
def backendId = backendIdToBackendIP.keySet()[0]
def code, out, err
def get_be_param = { paramName ->
// assuming paramName on all BEs have save value
(code, out, err) = show_be_config(backendIdToBackendIP.get(backendId), backendIdToBackendHttpPort.get(backendId))
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == paramName) {
return ((List<String>) ele)[2]
}
}
}
def ms_endpoint = get_be_param("meta_service_endpoint");
update_ms_config(ms_endpoint, "recycle_interval_seconds", "600")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "retention_seconds", "259200")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "compacted_rowset_retention_seconds", "1800")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "recycle_job_lease_expired_ms", "60000")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "dropped_partition_retention_seconds", "10800")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "label_keep_max_second", "300")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
update_ms_config(ms_endpoint, "copy_job_max_retention_second", "259200")
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
//http://qa-build.oss-cn-beijing.aliyuncs.com/regression/show_data/fullData.1.part1.gz