blob: 8381fd28a8eefe1085b0713c7fcb432d3f718909 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.Suite
import org.codehaus.groovy.runtime.IOGroovyMethods
Suite.metaClass.doCloudCompaction = { String tableName /* param */ ->
// which suite invoke current function?
Suite suite = delegate as Suite
//function body
suite.getLogger().info("Test plugin: suiteName: ${suite.name}, tableName:${tableName}".toString())
//BackendId,Cluster,IP,HeartbeatPort,BePort,HttpPort,BrpcPort,LastStartTime,LastHeartbeat,Alive,SystemDecommissioned,ClusterDecommissioned,TabletNum,DataUsedCapacity,AvailCapacity,TotalCapacity,UsedPct,MaxDiskUsedPct,RemoteUsedCapacity,Tag,ErrMsg,Version,Status
String[][] backends = sql """ show backends; """
suite.getLogger().info("backends: ${backends}".toString())
assertTrue(backends.size() > 0)
def backendIdToBackendIP = [:]
def backendIdToBackendHttpPort = [:]
def clusterToBackendId = [:]
for (String[] backend in backends) {
backendIdToBackendIP.put(backend[0], backend[2])
backendIdToBackendHttpPort.put(backend[0], backend[5])
def tagJson = parseJson(backend[19])
if (!clusterToBackendId.containsKey(tagJson.cloud_cluster_name)) {
clusterToBackendId.put(tagJson.cloud_cluster_name, backend[0])
}
}
suite.getLogger().info("backendIdToBackendIP: ${backendIdToBackendIP}".toString())
suite.getLogger().info("backendIdToBackendHttpPort: ${backendIdToBackendHttpPort}".toString())
suite.getLogger().info("clusterToBackendId: ${clusterToBackendId}".toString())
def cluster0 = clusterToBackendId.keySet()[0]
def backend_id0 = clusterToBackendId.get(cluster0)
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
String[][] tablets = sql """ show tablets from ${tableName}; """
def doCompaction = { backend_id, compact_type ->
// trigger compactions for all tablets in ${tableName}
for (String[] tablet in tablets) {
Thread.sleep(10)
String tablet_id = tablet[0]
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendIdToBackendIP.get(backend_id))
sb.append(":")
sb.append(backendIdToBackendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=${compact_type}")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
//assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendIdToBackendIP.get(backend_id))
sb.append(":")
sb.append(backendIdToBackendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
//assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
doCompaction.call(backend_id0, "cumulative")
//doCompaction.call(backend_id0, "base")
return
}
logger.info("Added 'doCloudCompaction' function to Suite")