blob: 33c21abea7d49b5c3345bbb376aec51279ffbc41 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("regression_test_variant_logdata", "nonConcurrent,p2"){
def set_be_config = { key, value ->
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
def load_json_data = {table_name, file_name ->
// load the json data
streamLoad {
table "${table_name}"
// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
set 'memtable_on_sink_node', 'true'
file file_name // import json file
time 10000 // limit inflight 10s
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
logger.info("Stream load ${file_name} result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
// assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
def create_table = { table_name, key_type="DUPLICATE", buckets=(new Random().nextInt(15) + 1).toString() ->
sql "DROP TABLE IF EXISTS ${table_name}"
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
v variant
)
${key_type} KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS ${buckets}
properties("replication_num" = "1", "disable_auto_compaction" = "false");
"""
}
// 12. streamload remote file
def table_name = "logdata"
create_table.call(table_name, "DUPLICATE", "4")
// sql "set enable_two_phase_read_opt = false;"
// no sparse columns
load_json_data.call(table_name, """${getS3Url() + '/regression/load/logdata.json'}""")
qt_sql_32 """ select json_extract(v, "\$.json.parseFailed") from logdata where cast(json_extract(v, "\$.json.parseFailed") as string) != 'null' order by k limit 1;"""
qt_sql_32_1 """select cast(v['json']['parseFailed'] as string) from logdata where cast(v['json']['parseFailed'] as string) is not null and k = 162 limit 1;"""
sql "truncate table ${table_name}"
// 0.95 default ratio
load_json_data.call(table_name, """${getS3Url() + '/regression/load/logdata.json'}""")
qt_sql_33 """ select json_extract(v,"\$.json.parseFailed") from logdata where cast(json_extract(v,"\$.json.parseFailed") as string) != 'null' order by k limit 1;"""
qt_sql_33_1 """select cast(v['json']['parseFailed'] as string) from logdata where cast(v['json']['parseFailed'] as string) is not null and k = 162 limit 1;"""
sql "truncate table ${table_name}"
// always sparse column
load_json_data.call(table_name, """${getS3Url() + '/regression/load/logdata.json'}""")
qt_sql_34 """ select json_extract(v, "\$.json.parseFailed") from logdata where cast(json_extract(v,"\$.json.parseFailed") as string) != 'null' order by k limit 1;"""
sql "truncate table ${table_name}"
qt_sql_35 """select json_extract(v,"\$.json.parseFailed") from logdata where k = 162 and cast(json_extract(v,"\$.json.parseFailed") as string) != 'null';"""
qt_sql_35_1 """select cast(v['json']['parseFailed'] as string) from logdata where cast(v['json']['parseFailed'] as string) is not null and k = 162 limit 1;"""
// TODO add test case that some certain columns are materialized in some file while others are not materilized(sparse)
// unique table
}