blob: 35dc06c1c77a1b260f9700189787e38225ece361 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_pythonudf_performance_comparison") {
// Quick performance comparison: Scalar vs Vector Python UDF
// Lightweight test for quick performance checks
def scalarPyPath = """${context.file.parent}/udf_scripts/python_udf_scalar_ops.zip"""
def vectorPyPath = """${context.file.parent}/udf_scripts/python_udf_vector_ops.zip"""
scp_udf_file_to_all_be(scalarPyPath)
scp_udf_file_to_all_be(vectorPyPath)
def runtime_version = "3.10.12"
sql "CREATE DATABASE IF NOT EXISTS test_pythonudf_performance_comparison"
sql "USE test_pythonudf_performance_comparison"
// Quick test with smaller dataset
def TEST_ROWS = 100000 // 100K rows for quick testing
log.info("=" * 80)
log.info("PYTHON UDF PERFORMANCE COMPARISON")
log.info("Quick test with ${TEST_ROWS} rows")
log.info("=" * 80)
try {
// Create test table
sql """ DROP TABLE IF EXISTS perf_comparison_table; """
sql """
CREATE TABLE perf_comparison_table (
id INT,
val1 INT,
val2 INT,
price DOUBLE,
discount DOUBLE,
text STRING
) ENGINE=OLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1");
"""
// Load test data using streamLoad from CSV file
log.info("Loading ${TEST_ROWS} rows using streamLoad from CSV file...")
def loadStartTime = System.currentTimeMillis()
streamLoad {
db 'test_pythonudf_performance_comparison'
table "perf_comparison_table"
// Set column separator to tab
set 'column_separator', '\t'
// File path relative to regression-test/data/pythonudf_p0/
file 'benchmark_data_100k.csv'
time 60000 // 60 seconds timeout
// Custom check callback
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
def loadEndTime = System.currentTimeMillis()
log.info("Data loaded in ${loadEndTime - loadStartTime} ms")
sql "sync"
def actualRows = sql "SELECT COUNT(*) FROM perf_comparison_table"
log.info("Verified row count: ${actualRows[0][0]}\nData ready. Starting performance tests...\n")
// Define test cases
def testCases = [
[
name: "Integer Multiplication",
scalar_symbol: "python_udf_scalar_ops.multiply_with_default",
vector_symbol: "python_udf_vector_ops.multiply_by_constant",
params: "(INT, INT, INT)",
returns: "INT",
query: "SELECT COUNT(*) FROM (SELECT id, {UDF}(val1, 10, 1) AS result FROM perf_comparison_table) t"
],
[
name: "Price Calculation",
scalar_symbol: "python_udf_scalar_ops.calculate_discount_price",
vector_symbol: "python_udf_vector_ops.calculate_discount",
params: "(DOUBLE, DOUBLE)",
returns: "DOUBLE",
query: "SELECT COUNT(*) FROM (SELECT id, {UDF}(price, discount) AS result FROM perf_comparison_table) t"
],
[
name: "String Length",
scalar_symbol: "python_udf_scalar_ops.string_length_custom",
vector_symbol: "python_udf_vector_ops.string_length",
params: "(STRING)",
returns: "INT",
query: "SELECT COUNT(*) FROM (SELECT id, {UDF}(text) AS result FROM perf_comparison_table) t"
]
]
def results = []
testCases.each { testCase ->
log.info("-" * 80)
log.info("Test: ${testCase.name}")
log.info("-" * 80)
// Test Scalar UDF
sql """ DROP FUNCTION IF EXISTS py_scalar_test${testCase.params}; """
sql """
CREATE FUNCTION py_scalar_test${testCase.params}
RETURNS ${testCase.returns}
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file://${scalarPyPath}",
"symbol" = "${testCase.scalar_symbol}",
"runtime_version" = "${runtime_version}",
"always_nullable" = "true"
);
"""
def scalarQuery = testCase.query.replace("{UDF}", "py_scalar_test")
// Warm up
sql scalarQuery
// Actual test - run 3 times and take average
def scalarTimes = []
for (int i = 0; i < 3; i++) {
def start = System.currentTimeMillis()
sql scalarQuery
def end = System.currentTimeMillis()
scalarTimes.add(end - start)
}
def scalarAvg = scalarTimes.sum() / scalarTimes.size()
log.info(" Scalar UDF: ${scalarTimes} ms, Avg: ${scalarAvg} ms")
// Test Vector UDF
sql """ DROP FUNCTION IF EXISTS py_vector_test${testCase.params}; """
sql """
CREATE FUNCTION py_vector_test${testCase.params}
RETURNS ${testCase.returns}
PROPERTIES (
"type" = "PYTHON_UDF",
"file" = "file://${vectorPyPath}",
"symbol" = "${testCase.vector_symbol}",
"runtime_version" = "${runtime_version}",
"always_nullable" = "true",
"vectorized" = "true"
);
"""
def vectorQuery = testCase.query.replace("{UDF}", "py_vector_test")
// Warm up
sql vectorQuery
// Actual test - run 3 times and take average
def vectorTimes = []
for (int i = 0; i < 3; i++) {
def start = System.currentTimeMillis()
sql vectorQuery
def end = System.currentTimeMillis()
vectorTimes.add(end - start)
}
def vectorAvg = vectorTimes.sum() / vectorTimes.size()
log.info(" Vector UDF: ${vectorTimes} ms, Avg: ${vectorAvg} ms")
def speedup = scalarAvg / vectorAvg
def improvement = ((scalarAvg - vectorAvg) / scalarAvg * 100)
log.info(" Speedup: ${String.format('%.2f', speedup)}x")
log.info(" Improvement: ${String.format('%.1f', improvement)}%")
results.add([
name: testCase.name,
scalar: scalarAvg,
vector: vectorAvg,
speedup: speedup,
improvement: improvement
])
// Cleanup
sql """ DROP FUNCTION IF EXISTS py_scalar_test${testCase.params}; """
sql """ DROP FUNCTION IF EXISTS py_vector_test${testCase.params}; """
}
// Print summary
def summary = new StringBuilder()
summary.append("\n" + "=" * 80 + "\n")
summary.append("PERFORMANCE COMPARISON SUMMARY\n")
summary.append("=" * 80 + "\n")
summary.append(String.format("%-30s %12s %12s %10s %12s", "Test Case", "Scalar(ms)", "Vector(ms)", "Speedup", "Improvement") + "\n")
summary.append("-" * 80 + "\n")
results.each { r ->
summary.append(String.format("%-30s %12.1f %12.1f %9.2fx %11.1f%%",
r.name, r.scalar, r.vector, r.speedup, r.improvement) + "\n")
}
def avgSpeedup = results.collect { it.speedup }.sum() / results.size()
def avgImprovement = results.collect { it.improvement }.sum() / results.size()
summary.append("-" * 80 + "\n")
summary.append(String.format("%-30s %12s %12s %9.2fx %11.1f%%",
"AVERAGE", "-", "-", avgSpeedup, avgImprovement) + "\n")
summary.append("=" * 80)
log.info(summary.toString())
} finally {
// Cleanup
try_sql("DROP TABLE IF EXISTS perf_comparison_table;")
try_sql("DROP DATABASE IF EXISTS test_pythonudf_performance_comparison;")
log.info("\nPerformance comparison completed.")
}
}