Writing a Dataflow Cost Benchmark to estimate the financial cost of executing a pipeline on Google Cloud Platform Dataflow requires 4 components in the repository:
.github/workflows/cost-benchmarks-pipeline-options directoryDataflowCostBenchmark classPipelines that are worth benchmarking in terms of performance and cost have a few straightforward requirements.
RunInference benchmark should use the same model and version for each run, never pulling the latest release of a model for use.Additionally, the run() call for the pipeline should return a PipelineResult object, which the benchmark framework uses to query metrics from Dataflow after the run completes.
Once you have a functioning pipeline to configure as a benchmark, the options needs to be saved as a .txt file in the .github/workflows/cost-benchmarks-pipeline-options directory. The file needs the Apache 2.0 license header at the top of the file, then each flag will need to be provided on a separate line. These arguments include:
NONE for a more consistent benchmark signal)publish_to_big_querytrue for cost benchmarksRunInference workloads this will be beam_run_inferenceWith the pipeline itself chosen and the arguments set, we can build out the test class that will execute the pipeline. Navigate to sdks/python/apache_beam/testing/benchmarks and select an appropriate sub-directory (or create one if necessary.) The class for wordcount is shown below:
import logging from apache_beam.examples import wordcount from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark class WordcountCostBenchmark(DataflowCostBenchmark): def __init__(self): super().__init__() def test(self): extra_opts = {} extra_opts['output'] = self.pipeline.get_option('output_file') self.result = wordcount.run( self.pipeline.get_full_options_as_args(**extra_opts), save_main_session=False) if __name__ == '__main__': logging.basicConfig(level=logging.INFO) WordcountCostBenchmark().run()
The important notes here: if there are any arguments with common arg names (like input and output) you can use the extra_opts dictionary to map to them from alternatives in the options file.
You should also make sure that you save the output of the run() call from the pipeline in the self.result field, as the framework will try to re-run the pipeline without the extra opts if that value is missing. Beyond those two key notes, the benchmarking framework does all of the other work in terms of setup, teardown, and metrics querying.
If the pipeline is a streaming use case, two versions need to be created: one operating on a backlog of work items (e.g. the entire test corpus is placed into the streaming source before the pipeline begins) and one operating in steady state (e.g. elements are added to the streaming source at a regular rate.) The former is relatively simple, simply add an extra step to the test() function to stage the input data into the streaming source being read from. For the latter, a separate Python thread should be spun up to stage one element at a time repeatedly over a given time interval (the interval between elements and the duration of the staging should be defined as part of the benchmark configuration.) Once the streaming pipeline is out of data and does not receive more for an extended period of time, the pipeline will exit and the benchmarking framework will process the results in the same manner as the batch case. In the steady state case, remember to call join() to close the thread after exectution.
Navigate to .github/workflows/beam_Python_CostBenchmarks_Dataflow.yml and make the following changes:
.txt file written above to the argument-file-paths list. This will load those pipeline options as an entry in the workflow environment, with the entry getting the value env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_X. X is an integer value corresponding to the position of the text file in the list of files, starting from 1.- name: Run wordcount on Dataflow uses: ./.github/actions/gradle-command-self-hosted-action timeout-minutes: 30 with: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | -PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \ -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \
The main class is the DataflowCostBenchmark subclass defined earlier, then the runner and python version are specified. The majority of pipeline arguments are loaded from the .txt file, with the job name and output being specified here. Be sure to set a reasonable timeout here as well.