blob: 131417f8820149e9f4c2b8f3b018da0686bbb0b8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import CommonJobProperties as commonJobProperties
class PerformanceTestConfigurations {
// Name of the Jenkins job
String jobName
// Description of the Jenkins job
String jobDescription
// Phrase to trigger this Jenkins job
String jobTriggerPhrase
// Frequency of the job build, default to every 6 hours
String buildSchedule = 'H */6 * * *'
// A benchmark defined flag, will pass to benchmark as "--benchmarkName"
String benchmarkName = 'beam_integration_benchmark'
// A benchmark defined flag, will pass to benchmark as "--bigqueryTable"
String resultTable
// A benchmark defined flag, will pass to benchmark as "--beam_it_class"
String itClass
// A benchmark defined flag, will pass to benchmark as "--beam_it_module".
// It's a Gradle project that defines 'integrationTest' task. This task is executed by Perfkit
// Beam benchmark launcher and can be added by enablePythonPerformanceTest() defined in
// BeamModulePlugin.
String itModule
// A benchmark defined flag, will pass to benchmark as "--beam_python_sdk_location".
// It's the location of Python SDK distribution archive which is required for TestDataflowRunner.
String pythonSdkLocation = 'build/apache-beam.tar.gz'
// A benchmark defined flag, will pass to benchmark as "--beam_runner"
String runner = 'TestDataflowRunner'
// A benchmark defined flag, will pass to benchmark as "--beam_it_timeout"
Integer itTimeoutSec = 1200
// A benchmark defined flag, will pass to benchmark as "--beam_it_args"
Map extraPipelineArgs
// Common pipeline args for Dataflow job.
def dataflowPipelineArgs = [
project : 'apache-beam-testing',
staging_location: 'gs://temp-storage-for-end-to-end-tests/staging-it',
temp_location : 'gs://temp-storage-for-end-to-end-tests/temp-it',
// Configurations of each Jenkins job.
def testConfigurations = [
new PerformanceTestConfigurations(
jobName : 'beam_PerformanceTests_WordCountIT_Py27',
jobDescription : 'Python SDK Performance Test - Run WordCountIT in Py27 with 1Gb files',
jobTriggerPhrase : 'Run Python27 WordCountIT Performance Test',
resultTable : 'beam_performance.wordcount_py27_pkb_results',
itClass : 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it',
itModule : ':sdks:python:test-suites:dataflow:py2',
extraPipelineArgs : dataflowPipelineArgs + [
input: 'gs://apache-beam-samples/input_small_files/ascii_sort_1MB_input.0000*', // 1Gb
output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output',
expect_checksum: 'ea0ca2e5ee4ea5f218790f28d0b9fe7d09d8d710',
num_workers: '10',
autoscaling_algorithm: 'NONE', // Disable autoscale the worker pool.
new PerformanceTestConfigurations(
jobName : 'beam_PerformanceTests_WordCountIT_Py35',
jobDescription : 'Python SDK Performance Test - Run WordCountIT in Py35 with 1Gb files',
jobTriggerPhrase : 'Run Python35 WordCountIT Performance Test',
resultTable : 'beam_performance.wordcount_py35_pkb_results',
itClass : 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it',
itModule : ':sdks:python:test-suites:dataflow:py35',
extraPipelineArgs : dataflowPipelineArgs + [
input: 'gs://apache-beam-samples/input_small_files/ascii_sort_1MB_input.0000*', // 1Gb
output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output',
expect_checksum: 'ea0ca2e5ee4ea5f218790f28d0b9fe7d09d8d710',
num_workers: '10',
autoscaling_algorithm: 'NONE', // Disable autoscale the worker pool.
new PerformanceTestConfigurations(
jobName : 'beam_PerformanceTests_WordCountIT_Py36',
jobDescription : 'Python SDK Performance Test - Run WordCountIT in Py36 with 1Gb files',
jobTriggerPhrase : 'Run Python36 WordCountIT Performance Test',
resultTable : 'beam_performance.wordcount_py36_pkb_results',
itClass : 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it',
itModule : ':sdks:python:test-suites:dataflow:py36',
extraPipelineArgs : dataflowPipelineArgs + [
input: 'gs://apache-beam-samples/input_small_files/ascii_sort_1MB_input.0000*', // 1Gb
output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output',
expect_checksum: 'ea0ca2e5ee4ea5f218790f28d0b9fe7d09d8d710',
num_workers: '10',
autoscaling_algorithm: 'NONE', // Disable autoscale the worker pool.
new PerformanceTestConfigurations(
jobName : 'beam_PerformanceTests_WordCountIT_Py37',
jobDescription : 'Python SDK Performance Test - Run WordCountIT in Py37 with 1Gb files',
jobTriggerPhrase : 'Run Python37 WordCountIT Performance Test',
resultTable : 'beam_performance.wordcount_py37_pkb_results',
itClass : 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it',
itModule : ':sdks:python:test-suites:dataflow:py37',
extraPipelineArgs : dataflowPipelineArgs + [
input: 'gs://apache-beam-samples/input_small_files/ascii_sort_1MB_input.0000*', // 1Gb
output: 'gs://temp-storage-for-end-to-end-tests/py-it-cloud/output',
expect_checksum: 'ea0ca2e5ee4ea5f218790f28d0b9fe7d09d8d710',
num_workers: '10',
autoscaling_algorithm: 'NONE', // Disable autoscale the worker pool.
for (testConfig in testConfigurations) {
private void createPythonPerformanceTestJob(PerformanceTestConfigurations testConfig) {
// This job runs the Beam Python performance tests on PerfKit Benchmarker.
job(testConfig.jobName) {
// Set default Beam job properties.
// Run job in postcommit, don't trigger every push.
// Allows triggering this build against pull requests.
def argMap = [
beam_sdk : 'python',
benchmarks : testConfig.benchmarkName,
bigquery_table : testConfig.resultTable,
beam_it_class : testConfig.itClass,
beam_it_module : testConfig.itModule,
beam_prebuilt : 'true', // Python benchmark don't need to prebuild repo before running
beam_python_sdk_location: testConfig.pythonSdkLocation,
beam_runner : testConfig.runner,
beam_it_timeout : testConfig.itTimeoutSec.toString(),
beam_it_args : joinPipelineArgs(testConfig.extraPipelineArgs),
commonJobProperties.buildPerformanceTest(delegate, argMap)
// Helper function to join pipeline args from a map.
private static String joinPipelineArgs(Map pipelineArgs) {
def pipelineArgList = []
key, value -> pipelineArgList.add("--$key=$value")
return pipelineArgList.join(',')