blob: f0deb2c66d88e6a7a8f3c3d2d4f9cf6e432aaac8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("load_two_step") {
// Import once, use unique key, use seq and delete
// Map[tableName, rowCount]
def tables = [customer: 1500000, lineitem: 59986052, nation: 25, orders: 15000000, part: 2000000, partsupp: 8000000, region: 5, supplier: 100000]
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()
// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
tables.each { table, rows ->
// create table if not exists
try{
sql new File("""${context.file.parentFile.parent}/ddl/${table}_sequence.sql""").text
def loadLabel = table + "_" + uniqueID
// load data from cos
def loadSql = new File("""${context.file.parentFile.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql
// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows)
}
sql new File("""${context.file.parentFile.parent}/ddl/${table}_delete.sql""").text
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == 0)
}
break
}
sleep(5000)
}
}
finally {
try_sql("DROP TABLE IF EXISTS ${table}")
}
}
}