| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| import CommonTestProperties.Runner |
| import CommonTestProperties.SDK |
| import LoadTestConfig.SerializableOption |
| import groovy.json.JsonBuilder |
| import org.codehaus.groovy.runtime.InvokerHelper |
| |
| import java.util.function.Predicate |
| |
| import static java.util.Objects.nonNull |
| import static java.util.Objects.requireNonNull |
| |
| /** |
| * This class contains simple DSL for load tests configuration. Configuration as Map<String, Serializable> |
| * [{@link LoadTestConfig#config config} -- returns configuration map] |
| * [{@link LoadTestConfig#templateConfig templateConfig} -- return LoadTestConfig reusable object] |
| * [{@link LoadTestConfig#fromTemplate fromTemplate} -- returns configuration from given template].<br><br> |
| * |
| * Example: |
| * <blockquote><pre> |
| * LoadTestConfig template = templateConfig { |
| * title 'Load test' |
| * test 'org.apache.beam.sdk.loadtests.SomeLoadTests' |
| * dataflow() |
| * pipelineOptions { |
| * python() |
| * jobName 'Any job name' |
| * publishToBigQuery true |
| * //other fields |
| * } |
| * specificParameters([ |
| * fanout: 4 |
| * ]) |
| * } |
| * Map<String, Serializable> configMap = fromTemplate(template) { |
| * //fields can be changed or/and added |
| * portable() |
| * pipelineOptions { |
| * parallelism 5 |
| * inputOptions { |
| * numRecords 20000 |
| * keySize 1000 |
| * valueSize 10 |
| * } |
| * } |
| * } |
| * </pre></blockquote> |
| */ |
| class LoadTestConfig implements SerializableOption<Map<String, Serializable>> { |
| |
| private String _title |
| private String _test |
| private Runner _runner |
| private PipelineOptions _pipelineOptions |
| |
| private LoadTestConfig() {} |
| |
| void title(final String title) { |
| _title = title |
| } |
| void test(final String test) { |
| _test = test |
| } |
| |
| //runners |
| void dataflow() { setRunnerAndUpdatePipelineOptions(Runner.DATAFLOW)} |
| void portable() { setRunnerAndUpdatePipelineOptions(Runner.PORTABLE) } |
| |
| private void setRunnerAndUpdatePipelineOptions(final Runner runner) { |
| _runner = runner |
| final def pipeline = _pipelineOptions ?: new PipelineOptions() |
| pipeline.i_runner = runner |
| _pipelineOptions = pipeline |
| } |
| |
| void pipelineOptions(final Closure cl = {}) { |
| final def options = _pipelineOptions ?: new PipelineOptions() |
| delegateAndInvoke(options, cl) |
| _pipelineOptions = options |
| } |
| |
| /** |
| * Returns load test config object which can be reusable.</br> |
| * All possible fields that can be set: |
| * <blockquote><pre> |
| * templateConfig { |
| * title [String] |
| * test [String] |
| * [dataflow(), portable()] -- runner |
| * pipelineOptions { |
| * [python(), java()] -- sdk |
| * jobName [String] |
| * appName [String] |
| * project [String] |
| * publishToBigQuery [boolean] |
| * metricsDataset (python) [String] |
| * metricsTable (python) [String] |
| * bigQueryDataset (java) [String] |
| * bigQueryTable (java) [String] |
| * numWorkers [int] |
| * parallelism [int] |
| * tempLocation [String] |
| * autoscalingAlgorithm [String] |
| * jobEndpoint [String] |
| * environmentType [String] |
| * environmentConfig [String] |
| * inputOptions/coInputOptions (for python) { |
| * numRecords [int] |
| * keySize [int] |
| * valueSize [int] |
| * numHotKeys [int] |
| * hotKeyFraction [int] |
| * } |
| * sourceOptions/coSourceOptions (for java) { |
| * numRecords [int] |
| * keySizeBytes [int] |
| * valueSizeBytes [int] |
| * numHotKeys [int] |
| * hotKeyFraction [int] |
| * splitPointFrequencyRecords [int] |
| * } |
| * stepOptions { |
| * outputRecordsPerInputRecord [int] |
| * preservesInputKeyDistribution [boolean] |
| * } |
| * specificParameters [Map<String, Object>] |
| * } |
| * } |
| * </pre></blockquote> |
| * @param cl Closure with fields setting |
| * @return LoadTestConfig object |
| */ |
| static LoadTestConfig templateConfig(final Closure cl = {}) { |
| final def config = new LoadTestConfig() |
| delegateAndInvoke(config, cl) |
| return config |
| } |
| |
| /** |
| * Returns configuration map from given template. Any field can be changed or/and added. Validation is performed |
| * before final map is returned (ex. Flink runner requires <b>environmentConfig</b> to be set). In case of |
| * validation failure exception is thrown.<br> |
| * Example result: |
| *<blockquote><pre> |
| * [ |
| * title : 'any given title', |
| * test : 'org.apache.beam.sdk.loadtests.SomeLoadTests', |
| * runner : CommonTestProperties.Runner.DATAFLOW, |
| * pipelineOptions: [ |
| * job_name : 'any given job name', |
| * publish_to_big_query: true, |
| * project : 'apache-beam-testing', |
| * metrics_dataset : 'given_dataset_name', |
| * metrics_table : 'given_table_name', |
| * input_options : '\'{"num_records": 200000000,"key_size": 1,"value_size":9}\'', |
| * iterations : 1, |
| * fanout : 1, |
| * parallelism : 5, |
| * job_endpoint : 'localhost:1234', |
| * environment_config : 'given_environment_config', |
| * environment_type : 'given_environment_type' |
| * ] |
| * ] |
| * </blockquote></pre> |
| * @param templateConfig LoadTestConfig instance |
| * @param cl Closure with fields setting |
| * @return configuration map |
| * @see LoadTestConfig |
| * @see LoadTestConfig#templateConfig |
| */ |
| static Map<String, Serializable> fromTemplate(final LoadTestConfig templateConfig, final Closure cl = {}) { |
| final def newConfig = of(templateConfig) |
| delegateAndInvoke(newConfig, cl) |
| final def properties = newConfig.propertiesMap |
| verifyProperties(properties) |
| return ConfigHelper.convertProperties(properties) |
| } |
| |
| /** |
| * Returns configuration map (see {@link LoadTestConfig#fromTemplate}) directly from given settings |
| * @param cl Closure with settings |
| * @return configuration map |
| */ |
| static Map<String, Serializable> config(final Closure cl = {}) { |
| final def config = new LoadTestConfig() |
| delegateAndInvoke(config, cl) |
| final def properties = config.propertiesMap |
| verifyProperties(properties) |
| return ConfigHelper.convertProperties(config.propertiesMap) |
| } |
| |
| private static void delegateAndInvoke(final delegate, final Closure cl = {}) { |
| final def code = cl.rehydrate(delegate, this, this) |
| code.resolveStrategy = Closure.DELEGATE_ONLY |
| code() |
| } |
| |
| private static LoadTestConfig of(final LoadTestConfig oldConfig) { |
| final def newConfig = new LoadTestConfig() |
| |
| //primitive values |
| InvokerHelper.setProperties(newConfig, oldConfig.propertiesMap) |
| |
| //non-primitive values |
| newConfig._pipelineOptions = oldConfig._pipelineOptions ? PipelineOptions.of(oldConfig._pipelineOptions) : null |
| |
| return newConfig |
| } |
| |
| @Override |
| Map<String, Serializable> toPrimitiveValues() { |
| final def map = propertiesMap |
| verifyProperties(map) |
| return ConfigHelper.convertProperties(map) |
| } |
| |
| LinkedHashMap<String, Object> getPropertiesMap() { |
| return [ |
| _title: _title, |
| _test: _test, |
| _runner: _runner, |
| _pipelineOptions: _pipelineOptions |
| ] |
| } |
| |
| private static void verifyProperties(final LinkedHashMap<String, Object> map) { |
| for (entry in map.entrySet()) { |
| requireNonNull(entry.value, "Missing ${entry.key.substring(1)} in configuration") |
| } |
| } |
| |
| private static class PipelineOptions implements SerializableOption<Map<String, Serializable>> { |
| private Map<String, Object> _specificParameters = new HashMap<>() |
| private boolean _streaming = false |
| private SourceOptions _coSourceOptions |
| private InputOptions _coInputOptions |
| private StepOptions _stepOptions |
| |
| //required |
| private String _project |
| private String _publishToBigQuery |
| |
| //java required |
| private String _bigQueryDataset |
| private String _bigQueryTable |
| private String _appName |
| private SourceOptions _sourceOptions |
| |
| //python required |
| private String _metricsDataset |
| private String _metricsTable |
| private String _jobName |
| private InputOptions _inputOptions |
| |
| //internal usage |
| private SDK i_sdk |
| private Runner i_runner |
| private static final i_required = [ |
| "_project", |
| "_publishToBigQuery" |
| ] |
| private static final i_dataflowRequired = [ |
| "_numWorkers", |
| "_tempLocation", |
| "_autoscalingAlgorithm", |
| "_region" |
| ] |
| private static final i_portableRequired = [ |
| "_jobEndpoint", |
| "_environmentType", |
| "_environmentConfig", |
| "_parallelism" |
| ] |
| private static final i_javaRequired = [ |
| "_bigQueryDataset", |
| "_bigQueryTable", |
| "_sourceOptions", |
| "_appName" |
| ] |
| private static final i_pythonRequired = [ |
| "_metricsDataset", |
| "_metricsTable", |
| "_inputOptions", |
| "_jobName" |
| ] |
| |
| //dataflow required |
| private def _numWorkers |
| private String _tempLocation |
| private String _autoscalingAlgorithm |
| private String _region = 'us-central1' |
| |
| //flink required |
| private String _jobEndpoint |
| private String _environmentType |
| private String _environmentConfig |
| private def _parallelism |
| |
| void jobName(final String name) { _jobName = name } |
| void appName(final String name) { _appName = name } |
| void project(final String project) { _project = project } |
| void tempLocation(final String location) { _tempLocation = location } |
| void publishToBigQuery(final boolean publish) { _publishToBigQuery = publish } |
| void metricsDataset(final String dataset) { _metricsDataset = dataset } |
| void metricsTable(final String table) { _metricsTable = table } |
| void inputOptions(final InputOptions options) { _inputOptions = options } |
| void numWorkers(final int workers) { _numWorkers = workers } |
| void autoscalingAlgorithm(final String algorithm) { _autoscalingAlgorithm = algorithm } |
| void region(final String region) { _region = region } |
| void jobEndpoint(final String endpoint) { _jobEndpoint = endpoint } |
| void environmentType(final String type) { _environmentType = type } |
| void environmentConfig(final String config) { _environmentConfig = config } |
| void parallelism(final int parallelism) { _parallelism = parallelism } |
| void bigQueryDataset(final String dataset) { _bigQueryDataset = dataset } |
| void bigQueryTable(final String table) { _bigQueryTable = table } |
| void streaming(final boolean isStreaming) { _streaming = isStreaming } |
| void sourceOptions(final Closure cl = {}) { _sourceOptions = makeSourceOptions(cl) } |
| void coSourceOptions(final Closure cl = {}) { _coSourceOptions = makeSourceOptions(cl) } |
| void inputOptions(final Closure cl = {}) { _inputOptions = makeInputOptions(cl) } |
| void coInputOptions(final Closure cl = {}) { _coInputOptions = makeInputOptions(cl) } |
| void stepOptions(final Closure cl = {}) { _stepOptions = makeStepOptions(cl) } |
| void specificParameters(final Map<String, Object> map) { _specificParameters.putAll(map) } |
| |
| //sdk -- snake_case vs camelCase |
| void python() { i_sdk = SDK.PYTHON } |
| void java() { i_sdk = SDK.JAVA } |
| |
| |
| private InputOptions makeInputOptions(final Closure cl = {}) { |
| return makeOptions(cl, _inputOptions ?: InputOptions.withSDK(i_sdk)) |
| } |
| |
| private SourceOptions makeSourceOptions(final Closure cl = {}) { |
| return makeOptions(cl, _sourceOptions ?: SourceOptions.withSDK(i_sdk)) |
| } |
| |
| private StepOptions makeStepOptions(final Closure cl = {}) { |
| return makeOptions(cl, _stepOptions ?: StepOptions.withSDK(i_sdk)) |
| } |
| |
| private <T> T makeOptions(final Closure cl = {}, final T options) { |
| final def code = cl.rehydrate(options, this, this) |
| code.resolveStrategy = Closure.DELEGATE_ONLY |
| code() |
| return options |
| } |
| |
| @Override |
| Map<String, Serializable> toPrimitiveValues() { |
| final def map = propertiesMap |
| verifyPipelineProperties(map) |
| return ConfigHelper.convertProperties(map, i_sdk) |
| } |
| |
| private void verifyPipelineProperties(final Map<String, Object> map) { |
| verifyRequired(map) |
| switch (i_runner) { |
| case Runner.DATAFLOW: |
| verifyDataflowProperties(map) |
| break |
| case Runner.PORTABLE: |
| verifyPortableProperties(map) |
| break |
| default: |
| break |
| } |
| } |
| |
| private void verifyRequired(final Map<String, Object> map) { |
| verifyCommonRequired(map) |
| switch (i_sdk) { |
| case SDK.PYTHON: |
| verifyPythonRequired(map) |
| break |
| case SDK.JAVA: |
| verifyJavaRequired(map) |
| break |
| default: |
| break |
| } |
| } |
| |
| private static void verifyCommonRequired(final Map<String, Object> map) { |
| verify(map, "") { i_required.contains(it.key) } |
| } |
| |
| private static void verifyPythonRequired(final Map<String, Object> map) { |
| verify(map, "for Python SDK") { i_pythonRequired.contains(it.key) } |
| } |
| |
| private static void verifyJavaRequired(final Map<String, Object> map) { |
| verify(map, "for Java SDK") { i_javaRequired.contains(it.key) } |
| } |
| |
| private static void verifyDataflowProperties(final Map<String, Object> map) { |
| verify(map, "for Dataflow runner") { i_dataflowRequired.contains(it.key) } |
| } |
| |
| private static void verifyPortableProperties(final Map<String, Object> map) { |
| verify(map, "for Portable runner") { i_portableRequired.contains(it.key) } |
| } |
| |
| private static void verify(final Map<String, Object> map, final String message, final Predicate<Map.Entry<String, Object>> predicate) { |
| map.entrySet() |
| .stream() |
| .filter(predicate) |
| .forEach{ requireNonNull(it.value, "${it.key.substring(1)} is required " + message) } |
| } |
| |
| static PipelineOptions of(final PipelineOptions options) { |
| final def newOptions = new PipelineOptions() |
| |
| //primitive values |
| InvokerHelper.setProperties(newOptions, options.propertiesMap) |
| |
| //non-primitive |
| newOptions._inputOptions = options._inputOptions ? InputOptions.of(options._inputOptions) : null |
| newOptions._coInputOptions = options._coInputOptions ? InputOptions.of(options._coInputOptions) : null |
| newOptions._sourceOptions = options._sourceOptions ? SourceOptions.of(options._sourceOptions) : null |
| newOptions._coSourceOptions = options._coSourceOptions ? SourceOptions.of(options._coSourceOptions) : null |
| newOptions._stepOptions = options._stepOptions ? StepOptions.of(options._stepOptions) : null |
| newOptions._specificParameters = new HashMap<>(options._specificParameters) |
| |
| return newOptions |
| } |
| |
| Map<String, Object> getPropertiesMap() { |
| return [ |
| i_sdk: i_sdk, |
| i_runner: i_runner, |
| _jobName: _jobName, |
| _appName: _appName, |
| _project: _project, |
| _tempLocation: _tempLocation, |
| _publishToBigQuery: _publishToBigQuery, |
| _metricsDataset: _metricsDataset, |
| _metricsTable: _metricsTable, |
| _numWorkers: _numWorkers, |
| _autoscalingAlgorithm: _autoscalingAlgorithm, |
| _region: _region, |
| _inputOptions: _inputOptions, |
| _coInputOptions: _coInputOptions, |
| _jobEndpoint: _jobEndpoint, |
| _environmentType: _environmentType, |
| _environmentConfig: _environmentConfig, |
| _parallelism: _parallelism, |
| _bigQueryDataset: _bigQueryDataset, |
| _bigQueryTable: _bigQueryTable, |
| _streaming: _streaming, |
| _sourceOptions: _sourceOptions, |
| _coSourceOptions: _coSourceOptions, |
| _stepOptions: _stepOptions |
| ].putAll(_specificParameters.entrySet()) |
| } |
| |
| private static class InputOptions implements SerializableOption<String> { |
| private def _numRecords |
| private def _keySize |
| private def _valueSize |
| private def _numHotKeys |
| private def _hotKeyFraction |
| |
| //internal usage |
| private SDK i_sdk |
| |
| private InputOptions() {} |
| |
| static withSDK(final SDK sdk) { |
| final def input = new InputOptions() |
| input.i_sdk = sdk |
| return input |
| } |
| |
| void numRecords(final int num) { _numRecords = num } |
| void keySize(final int size) { _keySize = size } |
| void valueSize(final int size) { _valueSize = size } |
| void numHotsKeys(final int num) { _numHotKeys = num } |
| void hotKeyFraction(final int fraction) { _hotKeyFraction = fraction } |
| |
| @Override |
| String toPrimitiveValues() { |
| return "'${new JsonBuilder(ConfigHelper.convertProperties(propertiesMap, i_sdk)).toString()}'" |
| } |
| |
| static InputOptions of(final InputOptions oldOptions) { |
| final def newOptions = new InputOptions() |
| InvokerHelper.setProperties(newOptions, oldOptions.propertiesMap) |
| return newOptions |
| } |
| |
| LinkedHashMap<String, Object> getPropertiesMap() { |
| return [ |
| i_sdk: i_sdk, |
| _numRecords: _numRecords, |
| _keySize: _keySize, |
| _valueSize: _valueSize, |
| _numHotKeys: _numHotKeys, |
| _hotKeyFraction: _hotKeyFraction |
| ] as LinkedHashMap<String, Object> |
| } |
| } |
| |
| private static class SourceOptions implements SerializableOption<String> { |
| private def _numRecords |
| private def _keySizeBytes |
| private def _valueSizeBytes |
| private def _numHotKeys |
| private def _hotKeyFraction |
| private def _splitPointFrequencyRecords |
| |
| //internal usage |
| private SDK i_sdk |
| |
| private SourceOptions() {} |
| |
| static withSDK(final SDK sdk) { |
| final def input = new SourceOptions() |
| input.i_sdk = sdk |
| return input |
| } |
| |
| void numRecords(final int num) { _numRecords = num } |
| void keySizeBytes(final int size) { _keySizeBytes = size } |
| void valueSizeBytes(final int size) { _valueSizeBytes = size } |
| void numHotsKeys(final int num) { _numHotKeys = num } |
| void hotKeyFraction(final int fraction) { _hotKeyFraction = fraction } |
| void splitPointFrequencyRecords(final int splitPoint) { _splitPointFrequencyRecords = splitPoint } |
| |
| @Override |
| String toPrimitiveValues() { |
| return new JsonBuilder(ConfigHelper.convertProperties(propertiesMap, i_sdk)).toString() |
| } |
| |
| static SourceOptions of(final SourceOptions oldOptions) { |
| final def newOptions = new SourceOptions() |
| InvokerHelper.setProperties(newOptions, oldOptions.propertiesMap) |
| return newOptions |
| } |
| |
| Map<String, Object> getPropertiesMap() { |
| return [ |
| i_sdk: i_sdk, |
| _numRecords: _numRecords, |
| _keySizeBytes: _keySizeBytes, |
| _valueSizeBytes: _valueSizeBytes, |
| _numHotKeys: _numHotKeys, |
| _hotKeyFraction: _hotKeyFraction, |
| _splitPointFrequencyRecords: _splitPointFrequencyRecords |
| ] |
| } |
| } |
| |
| private static class StepOptions implements SerializableOption<String> { |
| private def _outputRecordsPerInputRecord |
| private boolean _preservesInputKeyDistribution |
| |
| //internal usage |
| private SDK i_sdk |
| |
| private StepOptions() {} |
| |
| static withSDK(final SDK sdk) { |
| final def option = new StepOptions() |
| option.i_sdk = sdk |
| return option |
| } |
| |
| void outputRecordsPerInputRecord(final int records) { _outputRecordsPerInputRecord = records } |
| void preservesInputKeyDistribution(final boolean shouldPreserve) { _preservesInputKeyDistribution = shouldPreserve } |
| |
| @Override |
| String toPrimitiveValues() { |
| return new JsonBuilder(ConfigHelper.convertProperties(propertiesMap, i_sdk)).toString() |
| } |
| |
| Map<String, Object> getPropertiesMap() { |
| return [ |
| i_sdk: i_sdk, |
| _outputRecordsPerInputRecord: _outputRecordsPerInputRecord, |
| _preservesInputKeyDistribution: _preservesInputKeyDistribution |
| ] as Map<String, Object> |
| } |
| |
| static StepOptions of(final StepOptions oldOption) { |
| final def newOption = new StepOptions() |
| InvokerHelper.setProperties(newOption, oldOption.propertiesMap) |
| return newOption |
| } |
| } |
| } |
| |
| private interface SerializableOption<T> { |
| T toPrimitiveValues() |
| } |
| |
| private static class ConfigHelper { |
| private static final List<String> FIELDS_TO_REMOVE = ["class", "i_sdk", "i_runner"] |
| |
| static Map<String, Serializable> convertProperties(final Map<String, Object> propertyMap, final SDK sdk = SDK.JAVA) { |
| return propertyMap |
| .findAll { nonNull(it.value) } |
| .findAll { !FIELDS_TO_REMOVE.contains(it.key) } |
| .collectEntries { key, value -> |
| [ |
| modifyKey(key, sdk), |
| toPrimitive(value) |
| ] |
| } as Map<String, Serializable> |
| } |
| |
| private static String modifyKey(final String key, final SDK sdk) { |
| final def result = key.startsWith('_') ? key.substring(1) : key |
| switch (sdk) { |
| case SDK.PYTHON: |
| return toSnakeCase(result) |
| case SDK.JAVA: |
| return toCamelCase(result) |
| default: |
| throw new IllegalArgumentException("SDK not specified") |
| } |
| } |
| |
| private static String toSnakeCase(final String text) { |
| return text.replaceAll(/([A-Z])/, /_$1/).toLowerCase().replaceAll(/^_/, '') |
| } |
| |
| private static String toCamelCase(final String text) { |
| return text.replaceAll( "(_)([A-Za-z0-9])", { Object[] it -> ((String) it[2]).toUpperCase() }) |
| } |
| |
| private static def toPrimitive(value) { |
| return value instanceof SerializableOption |
| ? value.toPrimitiveValues() |
| : value |
| } |
| } |
| } |