| # SPDX-License-Identifier: Apache-2.0 |
| # |
| # Modifications by Apache Solr contributors; see git log for details. |
| # Licensed under the Apache License, Version 2.0. |
| # |
| # The OpenSearch Contributors require contributions made to |
| # this file be licensed under the Apache-2.0 license or a |
| # compatible open source license. |
| # Modifications Copyright OpenSearch Contributors. See |
| # GitHub history for details. |
| # Licensed to Elasticsearch B.V. under one or more contributor |
| # license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright |
| # ownership. Elasticsearch B.V. licenses this file to you under |
| # the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import os |
| import random |
| import re |
| import textwrap |
| import unittest.mock as mock |
| import urllib.error |
| from unittest import TestCase |
| |
| from solrorbit import exceptions, config |
| from solrorbit.workload import loader, workload |
| from solrorbit.utils import io |
| |
| |
| def strip_ws(s): |
| return re.sub(r"\s", "", s) |
| |
| |
| class StaticClock: |
| NOW = 1453362707.0 |
| |
| @staticmethod |
| def now(): |
| return StaticClock.NOW |
| |
| @staticmethod |
| def stop_watch(): |
| return None |
| |
| |
| class InstanceOf: |
| "Tests whether an object belongs to a specified class." |
| |
| def __init__(self, cls): |
| self.cls = cls |
| |
| def __eq__(self, other): |
| return isinstance(other, self.cls) |
| |
| def __ne__(self, other): |
| return not isinstance(other, self.cls) |
| |
| def __repr__(self): |
| return f"<{self.cls.__name__} object at {hex(id(self))}>" |
| |
| |
| class SimpleWorkloadRepositoryTests(TestCase): |
| @mock.patch("os.path.exists") |
| @mock.patch("os.path.isdir") |
| def test_workload_from_directory(self, is_dir, path_exists): |
| is_dir.return_value = True |
| path_exists.return_value = True |
| |
| repo = loader.SimpleWorkloadRepository("/path/to/workload/unit-test") |
| self.assertEqual("unit-test", repo.workload_name) |
| self.assertEqual(["unit-test"], repo.workload_names) |
| self.assertEqual("/path/to/workload/unit-test", |
| repo.workload_dir("unit-test")) |
| self.assertEqual("/path/to/workload/unit-test/workload.json", |
| repo.workload_file("unit-test")) |
| |
| @mock.patch("os.path.exists") |
| @mock.patch("os.path.isdir") |
| @mock.patch("os.path.isfile") |
| def test_workload_from_file(self, is_file, is_dir, path_exists): |
| is_file.return_value = True |
| is_dir.return_value = False |
| path_exists.return_value = True |
| |
| repo = loader.SimpleWorkloadRepository( |
| "/path/to/workload/unit-test/my-workload.json") |
| self.assertEqual("my-workload", repo.workload_name) |
| self.assertEqual(["my-workload"], repo.workload_names) |
| self.assertEqual("/path/to/workload/unit-test", |
| repo.workload_dir("my-workload")) |
| self.assertEqual("/path/to/workload/unit-test/my-workload.json", |
| repo.workload_file("my-workload")) |
| |
| @mock.patch("os.path.exists") |
| @mock.patch("os.path.isdir") |
| @mock.patch("os.path.isfile") |
| def test_workload_from_named_pipe(self, is_file, is_dir, path_exists): |
| is_file.return_value = False |
| is_dir.return_value = False |
| path_exists.return_value = True |
| |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| loader.SimpleWorkloadRepository( |
| "a named pipe cannot point to a workload") |
| self.assertEqual( |
| "a named pipe cannot point to a workload is neither a file nor a directory", ctx.exception.args[0]) |
| |
| @mock.patch("os.path.exists") |
| def test_workload_from_non_existing_path(self, path_exists): |
| path_exists.return_value = False |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| loader.SimpleWorkloadRepository("/path/does/not/exist") |
| self.assertEqual( |
| "Workload path /path/does/not/exist does not exist", ctx.exception.args[0]) |
| |
| @mock.patch("os.path.isdir") |
| @mock.patch("os.path.exists") |
| def test_workload_from_directory_without_workload(self, path_exists, is_dir): |
| # directory exists, but not the file |
| path_exists.side_effect = [True, False] |
| is_dir.return_value = True |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| loader.SimpleWorkloadRepository("/path/to/not/a/workload") |
| self.assertEqual( |
| "Could not find workload.json in /path/to/not/a/workload", ctx.exception.args[0]) |
| |
| @mock.patch("os.path.exists") |
| @mock.patch("os.path.isdir") |
| @mock.patch("os.path.isfile") |
| def test_workload_from_file_but_not_json(self, is_file, is_dir, path_exists): |
| is_file.return_value = True |
| is_dir.return_value = False |
| path_exists.return_value = True |
| |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| loader.SimpleWorkloadRepository( |
| "/path/to/workload/unit-test/my-workload.xml") |
| self.assertEqual( |
| "/path/to/workload/unit-test/my-workload.xml has to be a JSON file", ctx.exception.args[0]) |
| |
| |
| class GitRepositoryTests(TestCase): |
| class MockGitRepo: |
| def __init__(self, remote_url, root_dir, repo_name, resource_name, offline, fetch=True): |
| self.repo_dir = "%s/%s" % (root_dir, repo_name) |
| |
| @mock.patch("os.path.exists") |
| @mock.patch("os.walk") |
| def test_workload_from_existing_repo(self, walk, exists): |
| walk.return_value = iter( |
| [(".", ["unittest", "unittest2", "unittest3"], [])]) |
| exists.return_value = True |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "workload", |
| "workload.name", "unittest") |
| cfg.add(config.Scope.application, "workload", |
| "repository.name", "default") |
| cfg.add(config.Scope.application, "system", "offline.mode", False) |
| cfg.add(config.Scope.application, "node", "root.dir", "/tmp") |
| cfg.add(config.Scope.application, "benchmarks", |
| "workload.repository.dir", "workloads") |
| |
| repo = loader.GitWorkloadRepository( |
| cfg, fetch=False, update=False, repo_class=GitRepositoryTests.MockGitRepo) |
| |
| self.assertEqual("unittest", repo.workload_name) |
| self.assertEqual(["unittest", "unittest2", "unittest3"], |
| list(repo.workload_names)) |
| self.assertEqual("/tmp/workloads/default/unittest", |
| repo.workload_dir("unittest")) |
| self.assertEqual( |
| "/tmp/workloads/default/unittest/workload.json", repo.workload_file("unittest")) |
| |
| |
| class WorkloadPreparationTests(TestCase): |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_does_nothing_if_document_file_available(self, is_file, get_size, prepare_file_offset_table): |
| is_file.return_value = True |
| get_size.return_value = 2000 |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| prepare_file_offset_table.assert_called_with( |
| "/tmp/docs.json", None, None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_decompresses_if_archive_available(self, is_file, get_size, prepare_file_offset_table): |
| is_file.return_value = True |
| get_size.return_value = 2000 |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| prepare_file_offset_table.assert_called_with( |
| "/tmp/docs.json", None, None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_raise_error_on_wrong_uncompressed_file_size(self, is_file, get_size, decompress): |
| # uncompressed file does not exist |
| # compressed file exists |
| # after decompression, uncompressed file exists |
| is_file.side_effect = [False, True, True] |
| # compressed file size is 200 |
| # uncompressed is corrupt, only 1 byte available |
| get_size.side_effect = [200, 1] |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| self.assertEqual( |
| "[/tmp/docs.json] is corrupt. Extracted [1] bytes but [2000] bytes are expected.", ctx.exception.args[0]) |
| |
| decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") |
| |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_raise_error_if_compressed_does_not_contain_expected_document_file(self, is_file, get_size, decompress): |
| # uncompressed file does not exist |
| # compressed file exists |
| # after decompression, uncompressed file does not exist (e.g. because the output file name is called differently) |
| is_file.side_effect = [False, True, False] |
| # compressed file size is 200 |
| get_size.return_value = 200 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| self.assertEqual("Decompressing [/tmp/docs.json.bz2] did not create [/tmp/docs.json]. Please check with the workload author if the " |
| "compressed archive has been created correctly.", ctx.exception.args[0]) |
| |
| decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_download_document_archive_if_no_file_available(self, is_file, get_size, ensure_dir, download, decompress, |
| prepare_file_offset_table): |
| # uncompressed file does not exist |
| # compressed file does not exist |
| # after download compressed file exists |
| # after download uncompressed file still does not exist (in main loop) |
| # after download compressed file exists (in main loop) |
| # after decompression, uncompressed file exists |
| is_file.side_effect = [False, False, |
| True, False, True, True, True, True] |
| # compressed file size is 200 after download |
| # compressed file size is 200 after download (in main loop) |
| # uncompressed file size is 2000 after decompression |
| # uncompressed file size is 2000 after decompression (in main loop) |
| get_size.side_effect = [200, 200, 2000, 2000, None] |
| |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| ensure_dir.assert_called_with("/tmp") |
| decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") |
| calls = [mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", |
| "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY)] |
| download.assert_has_calls(calls) |
| prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', |
| None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_download_document_archive_with_source_url_compressed(self, is_file, get_size, ensure_dir, download, decompress, |
| prepare_file_offset_table): |
| # uncompressed file does not exist |
| # compressed file does not exist |
| # after download compressed file exists |
| # after download uncompressed file still does not exist (in main loop) |
| # after download compressed file exists (in main loop) |
| # after decompression, uncompressed file exists |
| is_file.side_effect = [False, False, |
| True, False, True, True, True, True] |
| # compressed file size is 200 after download |
| # compressed file size is 200 after download (in main loop) |
| # uncompressed file size is 2000 after decompression |
| # uncompressed file size is 2000 after decompression (in main loop) |
| get_size.side_effect = [200, 200, 2000, 2000, None] |
| |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora", |
| source_url="http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| ensure_dir.assert_called_with("/tmp") |
| decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") |
| download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", |
| "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY) |
| prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora', |
| 'http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2', |
| InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_download_document_with_source_url_uncompressed(self, is_file, get_size, ensure_dir, download, decompress, |
| prepare_file_offset_table): |
| # uncompressed file does not exist |
| # after download uncompressed file exists |
| # after download uncompressed file exists (main loop) |
| is_file.side_effect = [False, True, True] |
| # uncompressed file size is 2000 |
| get_size.return_value = 2000 |
| scheme = random.choice(["http", "https", "s3", "gs"]) |
| |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| source_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| base_url=f"{scheme}://benchmarks.opensearch.org/corpora/", |
| document_file="docs.json", |
| # --> We don't provide a document archive here <-- |
| document_archive=None, |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| ensure_dir.assert_called_with("/tmp") |
| download.assert_called_with(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| "/tmp/docs.json", 2000, progress_indicator=mock.ANY) |
| prepare_file_offset_table.assert_called_with("/tmp/docs.json", f"{scheme}://benchmarks.opensearch.org/corpora/", |
| f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size, ensure_dir, download, decompress, |
| prepare_file_offset_table): |
| # uncompressed file does not exist |
| # after download uncompressed file exists |
| # after download uncompressed file exists (main loop) |
| is_file.side_effect = [False, True, True, True] |
| # uncompressed file size is 2000 |
| get_size.return_value = 2000 |
| scheme = random.choice(["http", "https", "s3", "gs"]) |
| |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/", |
| document_file="docs.json", |
| # --> We don't provide a document archive here <-- |
| document_archive=None, |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| ensure_dir.assert_called_with("/tmp") |
| calls = [mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| "/tmp/docs.json", 2000, progress_indicator=mock.ANY)] |
| download.assert_has_calls(calls) |
| prepare_file_offset_table.assert_called_with("/tmp/docs.json", f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/", |
| None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_download_document_file_if_no_file_available(self, is_file, get_size, ensure_dir, download, prepare_file_offset_table): |
| # uncompressed file does not exist |
| # after download uncompressed file exists |
| # after download uncompressed file exists (main loop) |
| is_file.side_effect = [False, True, True, True] |
| # uncompressed file size is 2000 |
| get_size.return_value = 2000 |
| |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs.json", |
| # --> We don't provide a document archive here <-- |
| document_archive=None, |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| ensure_dir.assert_called_with("/tmp") |
| calls = [mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| "/tmp/docs.json", 2000, progress_indicator=mock.ANY)] |
| download.assert_has_calls(calls) |
| prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', |
| None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.isfile") |
| def test_raise_download_error_if_offline(self, is_file, ensure_dir, download): |
| # uncompressed file does not exist |
| is_file.return_value = False |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=True, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs.json", |
| number_of_documents=5, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| self.assertEqual( |
| "Cannot find [/tmp/docs.json]. Please disable offline mode and retry.", ctx.exception.args[0]) |
| |
| self.assertEqual(0, ensure_dir.call_count) |
| self.assertEqual(0, download.call_count) |
| |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.isfile") |
| def test_raise_download_error_if_no_url_provided_and_file_missing(self, is_file, ensure_dir, download): |
| # uncompressed file does not exist |
| is_file.return_value = False |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url=None, |
| document_file="docs.json", |
| document_archive=None, |
| number_of_documents=5, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| self.assertEqual( |
| "Cannot download data because no base URL is provided.", ctx.exception.args[0]) |
| |
| self.assertEqual(0, ensure_dir.call_count) |
| self.assertEqual(0, download.call_count) |
| |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_raise_download_error_if_no_url_provided_and_wrong_file_size(self, is_file, get_size, ensure_dir, download): |
| # uncompressed file exists... |
| is_file.return_value = True |
| # but it's size is wrong |
| get_size.return_value = 100 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| number_of_documents=5, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| self.assertEqual("[/tmp/docs.json] is present but does not have the expected size of [2000] bytes and it " |
| "cannot be downloaded because no base URL is provided.", ctx.exception.args[0]) |
| |
| self.assertEqual(0, ensure_dir.call_count) |
| self.assertEqual(0, download.call_count) |
| |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.isfile") |
| def test_raise_download_error_no_test_mode_file(self, is_file, ensure_dir, download): |
| # uncompressed file does not exist |
| is_file.return_value = False |
| |
| download.side_effect = urllib.error.HTTPError("http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/unit-test/docs-1k.json", |
| 404, "", None, None) |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=True), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs-1k.json", |
| number_of_documents=5, |
| uncompressed_size_in_bytes=None), |
| data_root="/tmp") |
| |
| self.assertEqual("This workload does not support test mode. Ask the workload author to add it or disable " |
| "test mode and retry.", ctx.exception.args[0]) |
| |
| ensure_dir.assert_called_with("/tmp") |
| download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs-1k.json", |
| "/tmp/docs-1k.json", None, progress_indicator=mock.ANY) |
| |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.isfile") |
| def test_raise_download_error_on_connection_problems(self, is_file, ensure_dir, download): |
| # uncompressed file does not exist |
| is_file.return_value = False |
| |
| download.side_effect = urllib.error.HTTPError("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| 500, "Internal Server Error", None, None) |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs.json", |
| number_of_documents=5, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| self.assertEqual("Could not download [http://benchmarks.opensearch.org/corpora/unit-test/docs.json] " |
| "to [/tmp/docs.json] (HTTP status: 500, reason: Internal Server Error)", ctx.exception.args[0]) |
| |
| ensure_dir.assert_called_with("/tmp") |
| download.assert_called_with("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", |
| "/tmp/docs.json", 2000, progress_indicator=mock.ANY) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_prepare_bundled_document_set_if_document_file_available(self, is_file, get_size, decompress, prepare_file_offset_table): |
| is_file.return_value = True |
| # check only uncompressed |
| get_size.side_effect = [2000] |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| self.assertTrue(p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root=".")) |
| |
| prepare_file_offset_table.assert_called_with( |
| "./docs.json", None, None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_prepare_bundled_document_set_does_nothing_if_no_document_files(self, is_file, get_size, decompress, prepare_file_offset_table): |
| # no files present |
| is_file.return_value = False |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| self.assertFalse(p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root=".")) |
| |
| self.assertEqual(0, decompress.call_count) |
| self.assertEqual(0, prepare_file_offset_table.call_count) |
| |
| def test_used_corpora(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [ |
| {"name": "logs-181998"}, |
| {"name": "logs-191998"}, |
| {"name": "logs-201998"}, |
| ], |
| "corpora": [ |
| { |
| "name": "http_logs_unparsed", |
| "target-type": "type", |
| "documents": [ |
| { |
| "target-collection": "logs-181998", |
| "source-file": "documents-181998.unparsed.json.bz2", |
| "document-count": 2708746, |
| "compressed-bytes": 13064317, |
| "uncompressed-bytes": 303920342 |
| }, |
| { |
| "target-collection": "logs-191998", |
| "source-file": "documents-191998.unparsed.json.bz2", |
| "document-count": 9697882, |
| "compressed-bytes": 47211781, |
| "uncompressed-bytes": 1088378738 |
| }, |
| { |
| "target-collection": "logs-201998", |
| "source-file": "documents-201998.unparsed.json.bz2", |
| "document-count": 13053463, |
| "compressed-bytes": 63174979, |
| "uncompressed-bytes": 1456836090 |
| } |
| ] |
| }, |
| { |
| "name": "http_logs", |
| "target-type": "type", |
| "documents": [ |
| { |
| "target-collection": "logs-181998", |
| "source-file": "documents-181998.json.bz2", |
| "document-count": 2708746, |
| "compressed-bytes": 13815456, |
| "uncompressed-bytes": 363512754 |
| }, |
| { |
| "target-collection": "logs-191998", |
| "source-file": "documents-191998.json.bz2", |
| "document-count": 9697882, |
| "compressed-bytes": 49439633, |
| "uncompressed-bytes": 1301732149 |
| }, |
| { |
| "target-collection": "logs-201998", |
| "source-file": "documents-201998.json.bz2", |
| "document-count": 13053463, |
| "compressed-bytes": 65623436, |
| "uncompressed-bytes": 1744012279 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "bulk-index-1", |
| "operation-type": "bulk", |
| "corpora": ["http_logs"], |
| "indices": ["logs-181998"], |
| "bulk-size": 500 |
| }, |
| { |
| "name": "bulk-index-2", |
| "operation-type": "bulk", |
| "corpora": ["http_logs"], |
| "indices": ["logs-191998"], |
| "bulk-size": 500 |
| }, |
| { |
| "name": "bulk-index-3", |
| "operation-type": "bulk", |
| "corpora": ["http_logs_unparsed"], |
| "indices": ["logs-201998"], |
| "bulk-size": 500 |
| }, |
| { |
| "name": "node-stats", |
| "operation-type": "node-stats" |
| }, |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "index-1", |
| "operation": "bulk-index-1", |
| }, |
| { |
| "name": "index-2", |
| "operation": "bulk-index-2", |
| }, |
| { |
| "name": "index-3", |
| "operation": "bulk-index-3", |
| }, |
| ] |
| } |
| }, |
| { |
| "operation": "node-stats" |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader( |
| selected_test_procedure="default-test_procedure") |
| full_workload = reader("unittest", workload_specification, "/mappings") |
| used_corpora = sorted(loader.used_corpora( |
| full_workload), key=lambda c: c.name) |
| self.assertEqual(2, len(used_corpora)) |
| self.assertEqual("http_logs", used_corpora[0].name) |
| # each bulk operation requires a different data file but they should have been merged properly. |
| self.assertEqual({"documents-181998.json.bz2", "documents-191998.json.bz2"}, |
| {d.document_archive for d in used_corpora[0].documents}) |
| |
| self.assertEqual("http_logs_unparsed", used_corpora[1].name) |
| self.assertEqual({"documents-201998.unparsed.json.bz2"}, |
| {d.document_archive for d in used_corpora[1].documents}) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_prepare_bundled_document_set_decompresses_compressed_docs(self, is_file, get_size, decompress, prepare_file_offset_table): |
| # uncompressed is missing |
| # decompressed is present |
| # check if uncompressed is present after decompression |
| # final loop iteration - uncompressed is present now |
| is_file.side_effect = [False, True, True, True] |
| # compressed |
| # uncompressed after decompression |
| # uncompressed in final loop iteration |
| get_size.side_effect = [200, 2000, 2000] |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| self.assertTrue(p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root=".")) |
| |
| prepare_file_offset_table.assert_called_with( |
| "./docs.json", None, None, InstanceOf(loader.Downloader)) |
| |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_prepare_bundled_document_set_error_compressed_docs_wrong_size(self, is_file, get_size): |
| # uncompressed is missing |
| # decompressed is present |
| is_file.side_effect = [False, True] |
| # compressed has wrong size |
| get_size.side_effect = [150] |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root=".") |
| |
| self.assertEqual("[./docs.json.bz2] is present but does not have the expected size of [200] bytes.", |
| ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.io.decompress") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file, get_size, decompress, prepare_file_offset_table): |
| # uncompressed is present |
| is_file.side_effect = [True] |
| # uncompressed |
| get_size.side_effect = [1500] |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| with self.assertRaises(exceptions.DataError) as ctx: |
| p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs.json", |
| document_archive="docs.json.bz2", |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root=".") |
| self.assertEqual("[./docs.json] is present but does not have the expected size of [2000] bytes.", |
| ctx.exception.args[0]) |
| |
| self.assertEqual(0, prepare_file_offset_table.call_count) |
| |
| |
| class WorkloadPreparationTests_1(TestCase): |
| @mock.patch("solrorbit.utils.io.prepare_file_offset_table") |
| @mock.patch("solrorbit.utils.net.download") |
| @mock.patch("solrorbit.utils.io.ensure_dir") |
| @mock.patch("os.path.getsize") |
| @mock.patch("os.path.isfile") |
| @mock.patch("os.remove") |
| def test_download_document_file_from_part_files(self, rm_file, is_file, get_size, ensure_dir, download, prepare_file_offset_table): |
| # uncompressed file does not exist |
| # after download uncompressed file exists |
| # after download uncompressed file exists (main loop) |
| is_file.side_effect = [False, True, True, True, True] |
| # uncompressed file size is 2000 |
| get_size.side_effect = [1000, 600, 400, 2000] |
| |
| prepare_file_offset_table.return_value = 5 |
| |
| p = loader.DocumentSetPreparator(workload_name="unit-test", |
| downloader=loader.Downloader( |
| offline=False, test_mode=False), |
| decompressor=loader.Decompressor()) |
| |
| mo = mock.mock_open() |
| with mock.patch("builtins.open", mo): |
| p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| base_url="http://benchmarks.opensearch.org/corpora/unit-test", |
| document_file="docs.json", |
| document_file_parts=[{"name": "xaa", "size": 1000}, |
| {"name": "xab", |
| "size": 600}, |
| {"name": "xac", "size": 400}], |
| # --> We don't provide a document archive here <-- |
| document_archive=None, |
| number_of_documents=5, |
| compressed_size_in_bytes=200, |
| uncompressed_size_in_bytes=2000), |
| data_root="/tmp") |
| |
| ensure_dir.assert_called_with("/tmp") |
| calls = [mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xaa', '/tmp/xaa', 1000, progress_indicator=mock.ANY), |
| mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xab', |
| '/tmp/xab', 600, progress_indicator=mock.ANY), |
| mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xac', '/tmp/xac', 400, progress_indicator=mock.ANY)] |
| |
| download.assert_has_calls(calls) |
| prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', |
| None, InstanceOf(loader.Downloader)) |
| |
| class TemplateSource(TestCase): |
| @mock.patch("solrorbit.utils.io.dirname") |
| @mock.patch.object(loader.TemplateSource, "read_glob_files") |
| def test_entrypoint_of_replace_includes(self, patched_read_glob, patched_dirname): |
| workload = textwrap.dedent(""" |
| {% import "benchmark.helpers" as benchmark with context %} |
| { |
| "version": 2, |
| "description": "unittest workload", |
| "data-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", |
| "indices": [ |
| { |
| "name": "geonames", |
| "body": "index.json" |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "geonames", |
| "base-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", |
| "documents": [ |
| { |
| "source-file": "documents-2.json.bz2", |
| "document-count": 11396505, |
| "compressed-bytes": 264698741, |
| "uncompressed-bytes": 3547614383 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| {{ benchmark.collect(parts="operations/*.json") }} |
| ], |
| "test_procedures": [ |
| {{ benchmark.collect(parts="test_procedures/*.json") }} |
| ] |
| } |
| """) |
| |
| def dummy_read_glob(c): |
| return "{{\"replaced {}\": \"true\"}}".format(c) |
| |
| patched_read_glob.side_effect = dummy_read_glob |
| |
| base_path = "~/.benchmark/benchmarks/workloads/default/geonames" |
| template_file_name = "workload.json" |
| tmpl_src = loader.TemplateSource(base_path, template_file_name) |
| # pylint: disable=trailing-whitespace |
| expected_response = textwrap.dedent(""" |
| {% import "benchmark.helpers" as benchmark with context %} |
| { |
| "version": 2, |
| "description": "unittest workload", |
| "data-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", |
| "indices": [ |
| { |
| "name": "geonames", |
| "body": "index.json" |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "geonames", |
| "base-url": "http://benchmarks.opensearch.org.s3.amazonaws.com/corpora/geonames", |
| "documents": [ |
| { |
| "source-file": "documents-2.json.bz2", |
| "document-count": 11396505, |
| "compressed-bytes": 264698741, |
| "uncompressed-bytes": 3547614383 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| {"replaced ~/.benchmark/benchmarks/workloads/default/geonames/operations/*.json": "true"} |
| ], |
| "test_procedures": [ |
| {"replaced ~/.benchmark/benchmarks/workloads/default/geonames/test_procedures/*.json": "true"} |
| ] |
| } |
| """) |
| |
| self.assertEqual( |
| expected_response, |
| tmpl_src.replace_includes(base_path, workload) |
| ) |
| |
| def test_read_glob_files(self): |
| tmpl_obj = loader.TemplateSource( |
| base_path="/some/path/to/a/benchmark/workload", |
| template_file_name="workload.json", |
| fileglobber=lambda pat: [ |
| os.path.join(os.path.dirname(__file__), |
| "resources", "workload_fragment_1.json"), |
| os.path.join(os.path.dirname(__file__), |
| "resources", "workload_fragment_2.json") |
| ] |
| ) |
| response = tmpl_obj.read_glob_files("*workload_fragment_*.json") |
| expected_response = '{\n "item1": "value1"\n}\n,\n{\n "item2": "value2"\n}\n' |
| |
| self.assertEqual(expected_response, response) |
| |
| |
| class TemplateRenderTests(TestCase): |
| unittest_template_internal_vars = loader.default_internal_template_vars( |
| clock=StaticClock) |
| |
| def test_render_simple_template(self): |
| template = """ |
| { |
| "key": {{'01-01-2000' | days_ago(now)}}, |
| "key2": "static value" |
| } |
| """ |
| |
| rendered = loader.render_template( |
| template, template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) |
| |
| expected = """ |
| { |
| "key": 5864, |
| "key2": "static value" |
| } |
| """ |
| self.assertEqual(expected, rendered) |
| |
| def test_render_template_with_external_variables(self): |
| template = """ |
| { |
| "greeting": "{{greeting | default("Aloha")}}", |
| "name": "{{name | default("stranger")}}" |
| } |
| """ |
| |
| rendered = loader.render_template(template, template_vars={"greeting": "Hi"}, |
| template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) |
| |
| expected = """ |
| { |
| "greeting": "Hi", |
| "name": "stranger" |
| } |
| """ |
| self.assertEqual(expected, rendered) |
| |
| def test_render_template_with_globbing(self): |
| def key_globber(e): |
| if e == "dynamic-key-*": |
| return [ |
| "dynamic-key-1", |
| "dynamic-key-2", |
| "dynamic-key-3", |
| ] |
| else: |
| return [] |
| |
| template = """ |
| {% import "benchmark.helpers" as benchmark %} |
| { |
| "key1": "static value", |
| {{ benchmark.collect(parts="dynamic-key-*") }} |
| |
| } |
| """ |
| |
| source = io.DictStringFileSourceFactory({ |
| "dynamic-key-1": [ |
| textwrap.dedent('"dkey1": "value1"') |
| ], |
| "dynamic-key-2": [ |
| textwrap.dedent('"dkey2": "value2"') |
| ], |
| "dynamic-key-3": [ |
| textwrap.dedent('"dkey3": "value3"') |
| ] |
| }) |
| |
| template_source = loader.TemplateSource( |
| "", "workload.json", source=source, fileglobber=key_globber) |
| template_source.load_template_from_string(template) |
| |
| rendered = loader.render_template( |
| template_source.assembled_source, |
| template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) |
| |
| expected = """ |
| { |
| "key1": "static value", |
| "dkey1": "value1", |
| "dkey2": "value2", |
| "dkey3": "value3" |
| |
| } |
| """ |
| self.assertEqualIgnoreWhitespace(expected, rendered) |
| |
| def test_render_template_with_variables(self): |
| template = """ |
| {% set _clients = clients if clients is defined else 16 %} |
| {% set _bulk_size = bulk_size if bulk_size is defined else 100 %} |
| {% import "benchmark.helpers" as benchmark with context %} |
| { |
| "key1": "static value", |
| "dkey1": {{ _clients }}, |
| "dkey2": {{ _bulk_size }} |
| } |
| """ |
| rendered = loader.render_template( |
| template, |
| template_vars={"clients": 8}, |
| template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) |
| |
| expected = """ |
| { |
| "key1": "static value", |
| "dkey1": 8, |
| "dkey2": 100 |
| } |
| """ |
| self.assertEqualIgnoreWhitespace(expected, rendered) |
| |
| def assertEqualIgnoreWhitespace(self, expected, actual): |
| self.assertEqual(strip_ws(expected), strip_ws(actual)) |
| |
| |
| class CompleteWorkloadParamsTests(TestCase): |
| assembled_source = textwrap.dedent("""{% import "benchmark.helpers" as benchmark with context %} |
| "key1": "value1", |
| "key2": {{ value2 | default(3) }}, |
| "key3": {{ value3 | default("default_value3") }} |
| "key4": {{ value2 | default(3) }} |
| """) |
| |
| def test_check_complete_workload_params_contains_all_workload_params(self): |
| complete_workload_params = loader.CompleteWorkloadParams() |
| loader.register_all_params_in_workload( |
| CompleteWorkloadParamsTests.assembled_source, complete_workload_params) |
| |
| self.assertEqual( |
| ["value2", "value3"], |
| complete_workload_params.sorted_workload_defined_params |
| ) |
| |
| def test_check_complete_workload_params_does_not_fail_with_no_workload_params(self): |
| complete_workload_params = loader.CompleteWorkloadParams() |
| loader.register_all_params_in_workload('{}', complete_workload_params) |
| |
| self.assertEqual( |
| [], |
| complete_workload_params.sorted_workload_defined_params |
| ) |
| |
| def test_unused_user_defined_workload_params(self): |
| workload_params = { |
| "number_of_repliacs": 1, # deliberate typo |
| "enable_source": True, # unknown parameter |
| "number_of_shards": 5 |
| } |
| |
| complete_workload_params = loader.CompleteWorkloadParams( |
| user_specified_workload_params=workload_params) |
| complete_workload_params.populate_workload_defined_params(list_of_workload_params=[ |
| "bulk_indexing_clients", |
| "bulk_indexing_iterations", |
| "bulk_size", |
| "cluster_health", |
| "number_of_replicas", |
| "number_of_shards"] |
| ) |
| |
| self.assertEqual( |
| ["enable_source", "number_of_repliacs"], |
| sorted(complete_workload_params.unused_user_defined_workload_params()) |
| ) |
| |
| def test_unused_user_defined_workload_params_doesnt_fail_with_detaults(self): |
| complete_workload_params = loader.CompleteWorkloadParams() |
| complete_workload_params.populate_workload_defined_params(list_of_workload_params=[ |
| "bulk_indexing_clients", |
| "bulk_indexing_iterations", |
| "bulk_size", |
| "cluster_health", |
| "number_of_replicas", |
| "number_of_shards"] |
| ) |
| |
| self.assertEqual( |
| [], |
| sorted(complete_workload_params.unused_user_defined_workload_params()) |
| ) |
| |
| |
| class WorkloadPostProcessingTests(TestCase): |
| workload_with_params_as_string = textwrap.dedent("""{ |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "test-index-body.json", |
| "types": ["test-type"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "unittest", |
| "documents": [ |
| { |
| "source-file": "documents.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk", |
| "bulk-size": 5000 |
| }, |
| { |
| "name": "search", |
| "operation-type": "search" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "description": "Default test_procedure", |
| "schedule": [ |
| { |
| "clients": {{ bulk_indexing_clients | default(8) }}, |
| "operation": "index-append", |
| "warmup-time-period": 100, |
| "time-period": 240 |
| }, |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "search #1", |
| "clients": 4, |
| "operation": "search", |
| "warmup-iterations": 1000, |
| "iterations": 2000, |
| "target-interval": 30 |
| }, |
| { |
| "name": "search #2", |
| "clients": 1, |
| "operation": "search", |
| "warmup-iterations": 1000, |
| "iterations": 2000, |
| "target-throughput": 200 |
| }, |
| { |
| "name": "search #3", |
| "clients": 1, |
| "operation": "search", |
| "iterations": 1 |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| }""") |
| |
| def test_post_processes_workload_spec(self): |
| workload_specification = { |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "test-index-body.json", |
| "types": ["test-type"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "unittest", |
| "documents": [ |
| { |
| "source-file": "documents.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk", |
| "bulk-size": 5000 |
| }, |
| { |
| "name": "search", |
| "operation-type": "search" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "description": "Default test_procedure", |
| "schedule": [ |
| { |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-time-period": 100, |
| "time-period": 240, |
| }, |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "search #1", |
| "clients": 4, |
| "operation": "search", |
| "warmup-iterations": 1000, |
| "iterations": 2000, |
| "target-interval": 30 |
| }, |
| { |
| "name": "search #2", |
| "clients": 1, |
| "operation": "search", |
| "warmup-iterations": 1000, |
| "iterations": 2000, |
| "target-throughput": 200 |
| }, |
| { |
| "name": "search #3", |
| "clients": 1, |
| "operation": "search", |
| "iterations": 1 |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| |
| expected_post_processed = { |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "test-index-body.json", |
| "types": ["test-type"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "unittest", |
| "documents": [ |
| { |
| "source-file": "documents-1k.json.bz2", |
| "document-count": 1000 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk", |
| "bulk-size": 5000 |
| }, |
| { |
| "name": "search", |
| "operation-type": "search" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "description": "Default test_procedure", |
| "schedule": [ |
| { |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-time-period": 0, |
| "time-period": 10, |
| }, |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "search #1", |
| "clients": 4, |
| "operation": "search", |
| "warmup-iterations": 4, |
| "iterations": 4 |
| }, |
| { |
| "name": "search #2", |
| "clients": 1, |
| "operation": "search", |
| "warmup-iterations": 1, |
| "iterations": 1 |
| }, |
| { |
| "name": "search #3", |
| "clients": 1, |
| "operation": "search", |
| "iterations": 1 |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| |
| complete_workload_params = loader.CompleteWorkloadParams() |
| index_body = '{"settings": {"index.number_of_shards": {{ number_of_shards | default(5) }}, '\ |
| '"index.number_of_replicas": {{ number_of_replicas | default(0)}} }}' |
| |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "workload", |
| "test.mode.enabled", True) |
| |
| self.assertEqual( |
| self.as_workload( |
| expected_post_processed, complete_workload_params=complete_workload_params, index_body=index_body), |
| loader.TestModeWorkloadProcessor(cfg).on_after_load_workload( |
| self.as_workload( |
| workload_specification, complete_workload_params=complete_workload_params, index_body=index_body) |
| ) |
| ) |
| |
| self.assertEqual( |
| [], |
| complete_workload_params.sorted_workload_defined_params |
| ) |
| |
| def as_workload(self, workload_specification, workload_params=None, complete_workload_params=None, index_body=None): |
| reader = loader.WorkloadSpecificationReader( |
| workload_params=workload_params, |
| complete_workload_params=complete_workload_params, |
| source=io.DictStringFileSourceFactory({ |
| "/mappings/test-index-body.json": [index_body] |
| }) |
| ) |
| return reader("unittest", workload_specification, "/mappings") |
| |
| |
| class WorkloadPathTests(TestCase): |
| @mock.patch("os.path.exists") |
| def test_sets_absolute_path(self, path_exists): |
| path_exists.return_value = True |
| |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "benchmarks", |
| "local.dataset.cache", "/data") |
| |
| default_test_procedure = workload.TestProcedure("default", default=True, schedule=[ |
| workload.Task(name="index", operation=workload.Operation( |
| "index", operation_type=workload.OperationType.Bulk), clients=4) |
| ]) |
| another_test_procedure = workload.TestProcedure("other", default=False) |
| t = workload.Workload(name="u", test_procedures=[another_test_procedure, default_test_procedure], |
| corpora=[ |
| workload.DocumentCorpus("unittest", documents=[ |
| workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, |
| document_file="docs/documents.json", |
| document_archive="docs/documents.json.bz2") |
| ]) |
| ], |
| ) |
| |
| loader.set_absolute_data_path(cfg, t) |
| |
| self.assertEqual("/data/unittest/docs/documents.json", |
| t.corpora[0].documents[0].document_file) |
| self.assertEqual("/data/unittest/docs/documents.json.bz2", |
| t.corpora[0].documents[0].document_archive) |
| |
| |
| class WorkloadFilterTests(TestCase): |
| def filter(self, workload_specification, include_tasks=None, exclude_tasks=None): |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "workload", |
| "include.tasks", include_tasks) |
| cfg.add(config.Scope.application, "workload", |
| "exclude.tasks", exclude_tasks) |
| |
| processor = loader.TaskFilterWorkloadProcessor(cfg) |
| return processor.on_after_load_workload(workload_specification) |
| |
| def test_rejects_invalid_syntax(self): |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| self.filter(workload_specification=None, |
| include_tasks=["valid", "a:b:c"]) |
| self.assertEqual( |
| "Invalid format for filtered tasks: [a:b:c]", ctx.exception.args[0]) |
| |
| def test_rejects_unknown_filter_type(self): |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| self.filter(workload_specification=None, |
| include_tasks=["valid", "op-type:index"]) |
| self.assertEqual("Invalid format for filtered tasks: [op-type:index]. Expected [type] but got [op-type].", |
| ctx.exception.args[0]) |
| |
| def test_filters_tasks(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "auto-managed": False}], |
| "operations": [ |
| { |
| "name": "create-index", |
| "operation-type": "create-index" |
| }, |
| { |
| "name": "bulk-index", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "node-stats", |
| "operation-type": "node-stats" |
| }, |
| { |
| "name": "cluster-stats", |
| "operation-type": "custom-operation-type" |
| }, |
| { |
| "name": "match-all", |
| "operation-type": "search", |
| "body": { |
| "query": { |
| "match_all": {} |
| } |
| } |
| }, |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "create-index" |
| }, |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "index-1", |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "index-2", |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "index-3", |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "match-all-parallel", |
| "operation": "match-all", |
| }, |
| ] |
| } |
| }, |
| { |
| "operation": "node-stats" |
| }, |
| { |
| "name": "match-all-serial", |
| "operation": "match-all" |
| }, |
| { |
| "operation": "cluster-stats" |
| }, |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "query-filtered", |
| "tags": "include-me", |
| "operation": "match-all", |
| }, |
| { |
| "name": "index-4", |
| "tags": ["include-me", "bulk-task"], |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "index-5", |
| "operation": "bulk-index", |
| } |
| ] |
| } |
| }, |
| { |
| "name": "final-cluster-stats", |
| "operation": "cluster-stats", |
| "tags": "include-me" |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| full_workload = reader("unittest", workload_specification, "/mappings") |
| self.assertEqual(7, len(full_workload.test_procedures[0].schedule)) |
| |
| filtered = self.filter(full_workload, include_tasks=["index-3", |
| "type:search", |
| # Filtering should also work for non-core operation types. |
| "type:custom-operation-type", |
| "tag:include-me"]) |
| |
| schedule = filtered.test_procedures[0].schedule |
| self.assertEqual(5, len(schedule)) |
| self.assertEqual(["index-3", "match-all-parallel"], |
| [t.name for t in schedule[0].tasks]) |
| self.assertEqual("match-all-serial", schedule[1].name) |
| self.assertEqual("cluster-stats", schedule[2].name) |
| self.assertEqual(["query-filtered", "index-4"], |
| [t.name for t in schedule[3].tasks]) |
| self.assertEqual("final-cluster-stats", schedule[4].name) |
| |
| def test_filters_exclude_tasks(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "auto-managed": False}], |
| "operations": [ |
| { |
| "name": "create-index", |
| "operation-type": "create-index" |
| }, |
| { |
| "name": "bulk-index", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "node-stats", |
| "operation-type": "node-stats" |
| }, |
| { |
| "name": "cluster-stats", |
| "operation-type": "custom-operation-type" |
| }, |
| { |
| "name": "match-all", |
| "operation-type": "search", |
| "body": { |
| "query": { |
| "match_all": {} |
| } |
| } |
| }, |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "create-index" |
| }, |
| { |
| "parallel": { |
| "tasks": [ |
| { |
| "name": "index-1", |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "index-2", |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "index-3", |
| "operation": "bulk-index", |
| }, |
| { |
| "name": "match-all-parallel", |
| "operation": "match-all", |
| }, |
| ] |
| } |
| }, |
| { |
| "operation": "node-stats" |
| }, |
| { |
| "name": "match-all-serial", |
| "operation": "match-all" |
| }, |
| { |
| "operation": "cluster-stats" |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| full_workload = reader("unittest", workload_specification, "/mappings") |
| self.assertEqual(5, len(full_workload.test_procedures[0].schedule)) |
| |
| filtered = self.filter(full_workload, exclude_tasks=[ |
| "index-3", "type:search", "create-index"]) |
| |
| schedule = filtered.test_procedures[0].schedule |
| self.assertEqual(3, len(schedule)) |
| self.assertEqual(["index-1", "index-2"], |
| [t.name for t in schedule[0].tasks]) |
| self.assertEqual("node-stats", schedule[1].name) |
| self.assertEqual("cluster-stats", schedule[2].name) |
| |
| def test_unmatched_exclude_runs_everything(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "auto-managed": False}], |
| "operations": [ |
| { |
| "name": "create-index", |
| "operation-type": "create-index" |
| }, |
| { |
| "name": "bulk-index", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "node-stats", |
| "operation-type": "node-stats" |
| }, |
| { |
| "name": "cluster-stats", |
| "operation-type": "custom-operation-type" |
| }, |
| { |
| "name": "match-all", |
| "operation-type": "search", |
| "body": { |
| "query": { |
| "match_all": {} |
| } |
| } |
| }, |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "create-index" |
| }, |
| { |
| "operation": "bulk-index" |
| }, |
| { |
| "operation": "node-stats" |
| }, |
| { |
| "name": "match-all-serial", |
| "operation": "match-all" |
| }, |
| { |
| "operation": "cluster-stats" |
| } |
| ] |
| } |
| ] |
| } |
| |
| reader = loader.WorkloadSpecificationReader() |
| full_workload = reader("unittest", workload_specification, "/mappings") |
| self.assertEqual(5, len(full_workload.test_procedures[0].schedule)) |
| |
| expected_schedule = full_workload.test_procedures[0].schedule.copy() |
| filtered = self.filter(full_workload, exclude_tasks=["nothing"]) |
| |
| schedule = filtered.test_procedures[0].schedule |
| self.assertEqual(expected_schedule, schedule) |
| |
| def test_unmatched_include_runs_nothing(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "auto-managed": False}], |
| "operations": [ |
| { |
| "name": "create-index", |
| "operation-type": "create-index" |
| }, |
| { |
| "name": "bulk-index", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "node-stats", |
| "operation-type": "node-stats" |
| }, |
| { |
| "name": "cluster-stats", |
| "operation-type": "custom-operation-type" |
| }, |
| { |
| "name": "match-all", |
| "operation-type": "search", |
| "body": { |
| "query": { |
| "match_all": {} |
| } |
| } |
| }, |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "create-index" |
| }, |
| { |
| "operation": "bulk-index" |
| }, |
| { |
| "operation": "node-stats" |
| }, |
| { |
| "name": "match-all-serial", |
| "operation": "match-all" |
| }, |
| { |
| "operation": "cluster-stats" |
| } |
| ] |
| } |
| ] |
| } |
| |
| reader = loader.WorkloadSpecificationReader() |
| full_workload = reader("unittest", workload_specification, "/mappings") |
| self.assertEqual(5, len(full_workload.test_procedures[0].schedule)) |
| |
| expected_schedule = [] |
| filtered = self.filter(full_workload, include_tasks=["nothing"]) |
| |
| schedule = filtered.test_procedures[0].schedule |
| self.assertEqual(expected_schedule, schedule) |
| |
| class WorkloadRandomizationTests(TestCase): |
| |
| # Helper class used to set up queries with mock standard values for testing |
| # We want >1 op to ensure logic for giving different ops their own lambdas is working properly |
| class StandardValueHelper: |
| def __init__(self): |
| self.op_name_1 = "op-name-1" |
| self.op_name_2 = "op-name-2" |
| self.field_name_1 = "dummy_field_1" |
| self.field_name_2 = "dummy_field_2" |
| self.field_name_3 = "dummy_field_3" |
| self.index_name = "dummy_index" |
| |
| # Make the saved standard values different from the functions generating the new values, |
| # to be able to distinguish when we generate a new value vs draw an "existing" one. |
| # in actual usage, these would come from the same function with some randomness in it |
| self.saved_values = { |
| self.op_name_1:{ |
| self.field_name_1:{"lte":40, "gte":30}, |
| self.field_name_2:{"lte":"06/06/2016", "gte":"05/05/2016", "format":"dd/MM/yyyy"} |
| }, |
| self.op_name_2:{ |
| self.field_name_3:{"top_left":[-9, 9], "bottom_right":[0, 0]} |
| } |
| } |
| |
| # Used to generate new values, in the source function |
| self.new_values = { |
| self.op_name_1:{ |
| self.field_name_1:{"lte":41, "gte":31}, |
| self.field_name_2:{"lte":"04/04/2016", "gte":"03/03/2016", "format":"dd/MM/yyyy"} |
| }, |
| self.op_name_2:{ |
| self.field_name_3:{"top_left":[-10, 10], "bottom_right":[0, 0]}, |
| } |
| } |
| |
| self.op_1_query = { |
| "name": self.op_name_1, |
| "operation-type": "search", |
| "body": { |
| "size": 0, |
| "query": { |
| "bool": { |
| "filter": { |
| "range": { |
| self.field_name_1: { |
| "lt": 50, |
| "gte": 0 |
| } |
| }, |
| "must": [ |
| { |
| "range": { |
| self.field_name_2: { |
| "gte": "01/01/2015", |
| "lte": "21/01/2015", |
| "format": "dd/MM/yyyy" |
| } |
| } |
| } |
| ] |
| } |
| } |
| } |
| } |
| } |
| |
| self.op_2_query = { |
| "name": self.op_name_2, |
| "operation-type": "search", |
| "body": { |
| "size": 0, |
| "query": { |
| "geo_bounding_box": { |
| self.field_name_3: { |
| "top_left": [-0.1, 61.0], |
| "bottom_right": [15.0, 48.0] |
| } |
| } |
| } |
| } |
| } |
| |
| def get_simple_workload(self): |
| # Modified from test_filters_tasks |
| workload_specification = { |
| "description": "description for unit test", |
| "collections": [{"name": self.index_name}], |
| "operations": [ |
| { |
| "name": "create-index", |
| "operation-type": "create-index" |
| }, |
| self.op_1_query, |
| self.op_2_query |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "create-index" |
| }, |
| { |
| "name": "dummy-task-name-1", |
| "operation": self.op_name_1, |
| }, |
| { |
| "name": "dummy-task-name-2", |
| "operation": self.op_name_2, |
| }, |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| full_workload = reader("unittest", workload_specification, "/mappings") |
| return full_workload |
| |
| def get_standard_value_source(self, op_name, field_name): |
| # Passed to the processor, to be able to find the standard value sources for all ops/fields. |
| # The actual source functions for the op/field pairs, which in a real application |
| # would be defined in the workload's workload.py and involve some randomization |
| return lambda: self.new_values[op_name][field_name] |
| |
| def get_standard_value(self, op_name, field_name, index): |
| # Passed to the processor, to be able to retrive the saved standard values for all ops/fields. |
| return self.saved_values[op_name][field_name] |
| |
| def test_range_finding_function(self): |
| cfg = config.Config() |
| processor = loader.QueryRandomizerWorkloadProcessor(cfg) |
| single_range_query = { |
| "name": "distance_amount_agg", |
| "operation-type": "search", |
| "body": { |
| "size": 0, |
| "query": { |
| "bool": { |
| "filter": { |
| "range": { |
| "trip_distance": { |
| "lt": 50, |
| "gte": 0 |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| single_range_query_result = processor.extract_fields_and_paths( |
| single_range_query, loader.QueryRandomizerWorkloadProcessor.DEFAULT_QUERY_RANDOMIZATION_INFO) |
| single_range_query_expected = [("trip_distance", ["bool", "filter", "range"])] |
| self.assertEqual(single_range_query_result, single_range_query_expected) |
| |
| multiple_nested_range_query = { |
| "name": "date_histogram_agg", |
| "operation-type": "search", |
| "body": { |
| "size": 0, |
| "query": { |
| "range": { |
| "dropoff_datetime": { |
| "gte": "01/01/2015", |
| "lte": "21/01/2015", |
| "format": "dd/MM/yyyy" |
| } |
| }, |
| "bool": { |
| "filter": { |
| "range": { |
| "dummy_field": { |
| "lte": 50, |
| "gt": 0 |
| } |
| } |
| }, |
| "must": [ |
| { |
| "range": { |
| "dummy_field_2": { |
| "gte": "1998-05-01T00:00:00Z", |
| "lt": "1998-05-02T00:00:00Z" |
| } |
| } |
| }, |
| { |
| "match": { |
| "status": "400" |
| } |
| }, |
| { |
| "range": { |
| "dummy_field_3": { |
| "gt": 10, |
| "lt": 11 |
| } |
| } |
| } |
| ] |
| } |
| } |
| } |
| } |
| multiple_nested_range_query_result = processor.extract_fields_and_paths( |
| multiple_nested_range_query, loader.QueryRandomizerWorkloadProcessor.DEFAULT_QUERY_RANDOMIZATION_INFO) |
| print("Multi result: ", multiple_nested_range_query_result) |
| multiple_nested_range_query_expected = [ |
| ("dropoff_datetime", ["range"]), |
| ("dummy_field", ["bool", "filter", "range"]), |
| ("dummy_field_2", ["bool", "must", 0, "range"]), |
| ("dummy_field_3", ["bool", "must", 2, "range"]) |
| ] |
| self.assertEqual(multiple_nested_range_query_result, multiple_nested_range_query_expected) |
| |
| with self.assertRaises(exceptions.SystemSetupError) as ctx: |
| params = {"body":{"contents":["not_a_valid_query"]}} |
| _ = processor.extract_fields_and_paths(params, loader.QueryRandomizerWorkloadProcessor.DEFAULT_QUERY_RANDOMIZATION_INFO) |
| self.assertEqual( |
| f"Cannot extract range query fields from these params: {params}\n, missing params[\"body\"][\"query\"]\n" |
| f"Make sure the operation in operations/default.json is well-formed", |
| ctx.exception.args[0]) |
| |
| # Test a non-default value for query_randomization_info |
| geo_point_query = { |
| "name": "bbox", |
| "operation-type": "search", |
| "body": { |
| "size": 0, |
| "query": { |
| "geo_bounding_box": { |
| "location": { |
| "top_left": [-0.1, 61.0], |
| "bottom_right": [15.0, 48.0] |
| } |
| } |
| } |
| } |
| } |
| geo_point_query_randomization_info = loader.QueryRandomizerWorkloadProcessor.QueryRandomizationInfo( |
| "geo_bounding_box", [["top_left"], ["bottom_right"]], []) |
| geo_point_result = processor.extract_fields_and_paths(geo_point_query, geo_point_query_randomization_info) |
| geo_point_expected = [("location", ["geo_bounding_box"])] |
| self.assertEqual(geo_point_result, geo_point_expected) |
| |
| |
| def test_get_randomized_values(self): |
| helper = self.StandardValueHelper() |
| |
| for rf, expected_values_dict in zip([1.0, 0.0], [helper.saved_values, helper.new_values]): |
| # first test where we always draw a saved value, not a new random one |
| # next test where we always draw a new random value. We've made them distinct, to be able to tell which codepath is taken |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "workload", "randomization.repeat_frequency", rf) |
| processor = loader.QueryRandomizerWorkloadProcessor(cfg) |
| self.assertAlmostEqual(processor.rf, rf) |
| |
| # Test resulting params for operation 1 |
| workload = helper.get_simple_workload() |
| modified_params = processor.get_randomized_values(workload, |
| helper.op_1_query, |
| loader.QueryRandomizerWorkloadProcessor.DEFAULT_QUERY_RANDOMIZATION_INFO, |
| op_name=helper.op_name_1, |
| get_standard_value=helper.get_standard_value, |
| get_standard_value_source=helper.get_standard_value_source) |
| modified_range_1 = modified_params["body"]["query"]["bool"]["filter"]["range"][helper.field_name_1] |
| modified_range_2 = modified_params["body"]["query"]["bool"]["filter"]["must"][0]["range"][helper.field_name_2] |
| self.assertEqual(modified_range_1["lt"], expected_values_dict[helper.op_name_1][helper.field_name_1]["lte"]) |
| # Note it should keep whichever of lt/lte it found in the original query |
| self.assertEqual(modified_range_1["gte"], expected_values_dict[helper.op_name_1][helper.field_name_1]["gte"]) |
| |
| self.assertEqual(modified_range_2["lte"], expected_values_dict[helper.op_name_1][helper.field_name_2]["lte"]) |
| self.assertEqual(modified_range_2["gte"], expected_values_dict[helper.op_name_1][helper.field_name_2]["gte"]) |
| self.assertEqual(modified_range_2["format"], expected_values_dict[helper.op_name_1][helper.field_name_2]["format"]) |
| |
| self.assertEqual(modified_params["index"], helper.index_name) |
| |
| # Test resulting params for operation 2, which uses a non-default query_randomization_info |
| workload = helper.get_simple_workload() |
| geo_point_query_randomization_info = loader.QueryRandomizerWorkloadProcessor.QueryRandomizationInfo( |
| "geo_bounding_box", [["top_left"], ["bottom_right"]], []) |
| modified_params = processor.get_randomized_values(workload, helper.op_2_query, geo_point_query_randomization_info, |
| op_name=helper.op_name_2, |
| get_standard_value=helper.get_standard_value, |
| get_standard_value_source=helper.get_standard_value_source) |
| modified_range_3 = modified_params["body"]["query"]["geo_bounding_box"][helper.field_name_3] |
| |
| self.assertEqual(modified_range_3["top_left"], expected_values_dict[helper.op_name_2][helper.field_name_3]["top_left"]) |
| self.assertEqual(modified_range_3["bottom_right"], expected_values_dict[helper.op_name_2][helper.field_name_3]["bottom_right"]) |
| self.assertEqual(modified_params["index"], helper.index_name) |
| |
| def test_on_after_load_workload(self): |
| cfg = config.Config() |
| processor = loader.QueryRandomizerWorkloadProcessor(cfg) |
| # Do nothing with default config as randomization.enabled is false |
| helper = self.StandardValueHelper() |
| input_workload = helper.get_simple_workload() |
| self.assertEqual( |
| repr(input_workload), |
| repr(processor.on_after_load_workload(input_workload, get_standard_value=helper.get_standard_value, |
| get_standard_value_source=helper.get_standard_value_source))) |
| # It seems that comparing the workloads directly will incorrectly call them equal, even if they have differences, |
| # so compare their string representations instead |
| |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "workload", "randomization.enabled", True) |
| processor = loader.QueryRandomizerWorkloadProcessor(cfg) |
| self.assertEqual(processor.randomization_enabled, True) |
| self.assertEqual(processor.N, loader.QueryRandomizerWorkloadProcessor.DEFAULT_N) |
| self.assertEqual(type(processor.N), int) |
| self.assertEqual(processor.rf, loader.QueryRandomizerWorkloadProcessor.DEFAULT_RF) |
| self.assertEqual(type(processor.rf), float) |
| input_workload = helper.get_simple_workload() |
| self.assertNotEqual( |
| repr(input_workload), |
| repr(processor.on_after_load_workload(input_workload, get_standard_value=helper.get_standard_value, |
| get_standard_value_source=helper.get_standard_value_source, |
| query_randomization_info=None))) |
| for test_procedure in input_workload.test_procedures: |
| for task in test_procedure.schedule: |
| for leaf_task in task: |
| try: |
| op_type = workload.OperationType.from_hyphenated_string(leaf_task.operation.type) |
| except KeyError: |
| op_type = None |
| if op_type == workload.OperationType.Search: |
| self.assertIsNotNone(leaf_task.operation.param_source) |
| |
| |
| # pylint: disable=too-many-public-methods |
| class WorkloadSpecificationReaderTests(TestCase): |
| def test_description_is_optional(self): |
| workload_specification = { |
| # no description here |
| "test_procedures": [] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual("unittest", resulting_workload.name) |
| self.assertEqual("", resulting_workload.description) |
| |
| def test_can_read_workload_info(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "types": ["test-type"]}], |
| "data-streams": [], |
| "corpora": [], |
| "operations": [], |
| "test_procedures": [] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual("unittest", resulting_workload.name) |
| self.assertEqual("description for unit test", |
| resulting_workload.description) |
| |
| def test_document_count_mandatory_if_file_present(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "types": ["docs"]}], |
| "corpora": [ |
| { |
| "name": "test", |
| "base-url": "https://localhost/data", |
| "documents": [{"source-file": "documents-main.json.bz2"}] |
| } |
| ], |
| "test_procedures": [] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual( |
| "Workload 'unittest' is invalid. Mandatory element 'document-count' is missing.", ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_with_mixed_warmup_iterations_and_measurement(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "index.json", |
| "types": ["docs"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "test", |
| "documents": [ |
| { |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk", |
| "bulk-size": 5000, |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-iterations": 3, |
| "time-period": 60 |
| } |
| ] |
| } |
| |
| ] |
| } |
| |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-test_procedure' " |
| "defines '3' warmup iterations and a time period of '60' seconds. Please do not mix time periods and iterations.", |
| ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_missing_test_procedure_or_test_procedures(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "index.json", |
| "types": ["docs"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "test", |
| "documents": [ |
| { |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| # no test_procedure or test_procedures element |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. You must define 'test_procedure', 'test_procedures' or " |
| "'schedule' but none is specified.", |
| ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_iteration_and_ramp_up_period(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "index.json", |
| "types": ["docs"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "test", |
| "documents": [ |
| { |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk", |
| "bulk-size": 5000, |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-challenge", |
| "schedule": [ |
| { |
| "clients": 8, |
| "operation": "index-append", |
| "ramp-up-time-period": 120, |
| "warmup-iterations": 3, |
| "iterations": 5 |
| } |
| ] |
| } |
| |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| |
| self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-challenge' defines a ramp-up time period of " |
| "120 seconds as well as 3 warmup iterations and 5 iterations but mixing time periods and iterations is not allowed.", ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_valid_ramp_down_period(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "body": "index.json", "types": ["docs"]}], |
| "corpora": [{ |
| "name": "test", |
| "documents": [{ |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| }] |
| }], |
| "operations": [{"name": "index-append", "operation-type": "bulk", "bulk-size": 5000}], |
| "test_procedures": [{ |
| "name": "default-challenge", |
| "schedule": [{ |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-time-period": 60, |
| "time-period": 300, |
| "ramp-up-time-period": 60, |
| "ramp-down-time-period": 60 |
| }] |
| }] |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| resulting_workload = reader("unittest", workload_specification, "/mappings") |
| task = resulting_workload.test_procedures[0].schedule[0] |
| self.assertEqual(60, task.warmup_time_period) |
| self.assertEqual(300, task.time_period) |
| self.assertEqual(60, task.ramp_up_time_period) |
| self.assertEqual(60, task.ramp_down_time_period) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_iteration_and_ramp_down_period_error(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "body": "index.json", "types": ["docs"]}], |
| "corpora": [{ |
| "name": "test", |
| "documents": [{ |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| }] |
| }], |
| "operations": [{"name": "index-append", "operation-type": "bulk", "bulk-size": 5000}], |
| "test_procedures": [{ |
| "name": "default-challenge", |
| "schedule": [{ |
| "clients": 8, |
| "operation": "index-append", |
| "time-period": 300, |
| "ramp-down-time-period": 60, |
| "warmup-iterations": 3, |
| "iterations": 5 |
| }] |
| }] |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| # The warmup-iterations + time-period check triggers before the ramp-down-specific check |
| self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-challenge' defines '3' warmup iterations and " |
| "a time period of '300' seconds. Please do not mix time periods and iterations.", ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_ramp_down_without_time_period_error(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "body": "index.json", "types": ["docs"]}], |
| "corpora": [{ |
| "name": "test", |
| "documents": [{ |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| }] |
| }], |
| "operations": [{"name": "index-append", "operation-type": "bulk", "bulk-size": 5000}], |
| "test_procedures": [{ |
| "name": "default-challenge", |
| "schedule": [{ |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-time-period": 30, |
| "ramp-down-time-period": 60 |
| }] |
| }] |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-challenge' defines a ramp-down time period of " |
| "60 seconds but no time-period.", ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_ramp_down_exceeds_time_period_error(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index", "body": "index.json", "types": ["docs"]}], |
| "corpora": [{ |
| "name": "test", |
| "documents": [{ |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| }] |
| }], |
| "operations": [{"name": "index-append", "operation-type": "bulk", "bulk-size": 5000}], |
| "test_procedures": [{ |
| "name": "default-challenge", |
| "schedule": [{ |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-time-period": 30, |
| "time-period": 60, |
| "ramp-down-time-period": 120 |
| }] |
| }] |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. The time-period of operation 'index-append' in test_procedure 'default-challenge' is " |
| "60 seconds but must be greater than or equal to the ramp-down-time-period of 120 seconds.", ctx.exception.args[0]) |
| |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_test_procedure_and_test_procedures_are_defined(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "index.json", |
| "types": ["docs"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "test", |
| "documents": [ |
| { |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| # We define both. Note that test_procedures without any properties |
| # would not pass JSON schema validation but we don't test this here. |
| "test_procedure": {}, |
| "test_procedures": [] |
| } |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. Multiple out of 'test_procedure', 'test_procedures' or 'schedule' " |
| "are defined but only " |
| "one of them is allowed.", ctx.exception.args[0]) |
| |
| @mock.patch("solrorbit.workload.loader.register_all_params_in_workload") |
| def test_parse_with_mixed_warmup_time_period_and_iterations(self, mocked_params_checker): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [ |
| { |
| "name": "test-index", |
| "body": "index.json", |
| "types": ["docs"] |
| } |
| ], |
| "corpora": [ |
| { |
| "name": "test", |
| "documents": [ |
| { |
| "source-file": "documents-main.json.bz2", |
| "document-count": 10, |
| "compressed-bytes": 100, |
| "uncompressed-bytes": 10000 |
| } |
| ] |
| } |
| ], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "index", |
| "bulk-size": 5000, |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "clients": 8, |
| "operation": "index-append", |
| "warmup-time-period": 20, |
| "iterations": 1000 |
| } |
| ] |
| } |
| |
| ] |
| } |
| |
| reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ |
| "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], |
| })) |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-test_procedure' " |
| "defines a warmup time " |
| "period of '20' seconds and '1000' iterations. " |
| "Please do not mix time periods and iterations.", |
| ctx.exception.args[0]) |
| |
| def test_parse_duplicate_implicit_task_names(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "operations": [ |
| { |
| "name": "search", |
| "operation-type": "search", |
| "index": "_all" |
| } |
| ], |
| "test_procedure": { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "search", |
| "clients": 1 |
| }, |
| { |
| "operation": "search", |
| "clients": 2 |
| } |
| ] |
| } |
| } |
| reader = loader.WorkloadSpecificationReader() |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. TestProcedure 'default-test_procedure' contains multiple tasks" |
| " with the name 'search'. Please" |
| " use the task's name property to assign a unique name for each task.", |
| ctx.exception.args[0]) |
| |
| def test_parse_duplicate_explicit_task_names(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "operations": [ |
| { |
| "name": "search", |
| "operation-type": "search", |
| "index": "_all" |
| } |
| ], |
| "test_procedure": { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "name": "duplicate-task-name", |
| "operation": "search", |
| "clients": 1 |
| }, |
| { |
| "name": "duplicate-task-name", |
| "operation": "search", |
| "clients": 2 |
| } |
| ] |
| } |
| } |
| reader = loader.WorkloadSpecificationReader() |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. TestProcedure 'default-test_procedure' contains multiple tasks with the name " |
| "'duplicate-task-name'. Please use the task's name property to assign a unique name for each task.", |
| ctx.exception.args[0]) |
| |
| def test_parse_unique_task_names(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "operations": [ |
| { |
| "name": "search", |
| "operation-type": "search", |
| "index": "_all" |
| } |
| ], |
| "test_procedure": { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "name": "search-one-client", |
| "operation": "search", |
| "clients": 1 |
| }, |
| { |
| "name": "search-two-clients", |
| "operation": "search", |
| "clients": 2 |
| } |
| ] |
| } |
| } |
| reader = loader.WorkloadSpecificationReader( |
| selected_test_procedure="default-test_procedure") |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual("unittest", resulting_workload.name) |
| test_procedure = resulting_workload.test_procedures[0] |
| self.assertTrue(test_procedure.selected) |
| schedule = test_procedure.schedule |
| self.assertEqual(2, len(schedule)) |
| self.assertEqual("search-one-client", schedule[0].name) |
| self.assertEqual("search", schedule[0].operation.name) |
| self.assertEqual("search-two-clients", schedule[1].name) |
| self.assertEqual("search", schedule[1].operation.name) |
| |
| def test_parse_clients_list(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "operations": [ |
| { |
| "name": "search", |
| "operation-type": "search", |
| "index": "_all" |
| } |
| ], |
| "test_procedure": { |
| "name": "default-test-procedure", |
| "schedule": [ |
| { |
| "name": "search-one-client", |
| "operation": "search", |
| "clients": 1, |
| "clients_list": [1, 2, 3] |
| }, |
| { |
| "name": "search-two-clients", |
| "operation": "search", |
| "clients": 2 |
| } |
| ] |
| } |
| } |
| |
| reader = loader.WorkloadSpecificationReader( |
| selected_test_procedure="default-test-procedure") |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual("unittest", resulting_workload.name) |
| test_procedure = resulting_workload.test_procedures[0] |
| self.assertTrue(test_procedure.selected) |
| schedule = test_procedure.schedule |
| self.assertEqual(4, len(schedule)) |
| |
| self.assertEqual("default-test-procedure_1_clients", schedule[0].name) |
| self.assertEqual("search", schedule[0].operation.name) |
| self.assertEqual("default-test-procedure_2_clients", schedule[1].name) |
| self.assertEqual("search", schedule[1].operation.name) |
| self.assertEqual("default-test-procedure_3_clients", schedule[2].name) |
| self.assertEqual("search", schedule[2].operation.name) |
| |
| self.assertEqual("search-two-clients", schedule[3].name) |
| self.assertEqual("search", schedule[3].operation.name) |
| # pylint: disable=W0212 |
| |
| def test_naming_with_clients_list(self): |
| reader = loader.WorkloadSpecificationReader( |
| selected_test_procedure="default-test_procedure") |
| # Test case 1: name contains both "_" and "-" |
| result = reader._rename_task_based_on_num_clients("test_name-task", 5) |
| self.assertEqual(result, "test_name-task_5_clients") |
| |
| # Test case 2: name contains only "-" |
| result = reader._rename_task_based_on_num_clients("test-name", 3) |
| self.assertEqual(result, "test-name-3-clients") |
| |
| # Test case 3: name contains only "_" |
| result = reader._rename_task_based_on_num_clients("test_name", 2) |
| self.assertEqual(result, "test_name_2_clients") |
| |
| # Test case 4: name contains neither "_" nor "-" |
| result = reader._rename_task_based_on_num_clients("testname", 1) |
| self.assertEqual(result, "testname_1_clients") |
| |
| def test_unique_test_procedure_names(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "test-test_procedure", |
| "description": "Some test_procedure", |
| "default": True, |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| }, |
| { |
| "name": "test-test_procedure", |
| "description": "Another test_procedure with the same name", |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual( |
| "Workload 'unittest' is invalid. Duplicate test_procedure with name 'test-test_procedure'.", ctx.exception.args[0]) |
| |
| def test_not_more_than_one_default_test_procedure_possible(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "description": "Default test_procedure", |
| "default": True, |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| }, |
| { |
| "name": "another-test_procedure", |
| "description": "See if we can sneek it in as another default", |
| "default": True, |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. Both 'default-test_procedure' and 'another-test_procedure' " |
| "are defined as default test_procedures. " |
| "Please define only one of them as default.", ctx.exception.args[0]) |
| |
| def test_at_least_one_default_test_procedure(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| }, |
| { |
| "name": "another-test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. No default test_procedure specified. " |
| "Please edit the workload and add \"default\": true " |
| "to one of the test_procedures test_procedure, another-test_procedure.", ctx.exception.args[0]) |
| |
| def test_exactly_one_default_test_procedure(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "test_procedure", |
| "default": True, |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| }, |
| { |
| "name": "another-test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader( |
| selected_test_procedure="another-test_procedure") |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual(2, len(resulting_workload.test_procedures)) |
| self.assertEqual("test_procedure", |
| resulting_workload.test_procedures[0].name) |
| self.assertTrue(resulting_workload.test_procedures[0].default) |
| self.assertFalse(resulting_workload.test_procedures[1].default) |
| self.assertTrue(resulting_workload.test_procedures[1].selected) |
| |
| def test_selects_sole_test_procedure_implicitly_as_default(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedure": { |
| "name": "test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual(1, len(resulting_workload.test_procedures)) |
| self.assertEqual("test_procedure", |
| resulting_workload.test_procedures[0].name) |
| self.assertTrue(resulting_workload.test_procedures[0].default) |
| self.assertTrue(resulting_workload.test_procedures[0].selected) |
| |
| def test_auto_generates_test_procedure_from_schedule(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual(1, len(resulting_workload.test_procedures)) |
| self.assertTrue(resulting_workload.test_procedures[0].auto_generated) |
| self.assertTrue(resulting_workload.test_procedures[0].default) |
| self.assertTrue(resulting_workload.test_procedures[0].selected) |
| |
| def test_inline_operations(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "test_procedure": { |
| "name": "test_procedure", |
| "schedule": [ |
| # an operation with parameters still needs to define a type |
| { |
| "operation": { |
| "operation-type": "bulk", |
| "bulk-size": 5000 |
| } |
| }, |
| # a parameterless operation can just use the operation type as implicit reference to the operation |
| { |
| "operation": "sleep" |
| } |
| ] |
| } |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| |
| test_procedure = resulting_workload.test_procedures[0] |
| self.assertEqual(2, len(test_procedure.schedule)) |
| self.assertEqual(workload.OperationType.Bulk.to_hyphenated_string( |
| ), test_procedure.schedule[0].operation.type) |
| self.assertEqual(workload.OperationType.Sleep.to_hyphenated_string( |
| ), test_procedure.schedule[1].operation.type) |
| |
| def test_supports_target_throughput(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedure": { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append", |
| "target-throughput": 10, |
| "warmup-time-period": 120, |
| "ramp-up-time-period": 60 |
| } |
| ] |
| } |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| indexing_task = resulting_workload.test_procedures[0].schedule[0] |
| self.assertEqual(10, indexing_task.params["target-throughput"]) |
| self.assertEqual(120, indexing_task.warmup_time_period) |
| self.assertEqual(60, indexing_task.ramp_up_time_period) |
| |
| def test_supports_target_interval(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append", |
| "target-interval": 5, |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual( |
| 5, resulting_workload.test_procedures[0].schedule[0].params["target-interval"]) |
| |
| def test_parallel_tasks_with_default_values(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-1", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "index-2", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "index-3", |
| "operation-type": "bulk" |
| }, |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "warmup-time-period": 2400, |
| "time-period": 36000, |
| "tasks": [ |
| { |
| "operation": "index-1", |
| "warmup-time-period": 300, |
| "clients": 2 |
| }, |
| { |
| "operation": "index-2", |
| "time-period": 3600, |
| "clients": 4 |
| }, |
| { |
| "operation": "index-3", |
| "target-throughput": 10, |
| "clients": 16 |
| }, |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| parallel_element = resulting_workload.test_procedures[0].schedule[0] |
| parallel_tasks = parallel_element.tasks |
| |
| self.assertEqual(22, parallel_element.clients) |
| self.assertEqual(3, len(parallel_tasks)) |
| |
| self.assertEqual("index-1", parallel_tasks[0].operation.name) |
| self.assertEqual(300, parallel_tasks[0].warmup_time_period) |
| self.assertEqual(36000, parallel_tasks[0].time_period) |
| self.assertEqual(2, parallel_tasks[0].clients) |
| self.assertFalse("target-throughput" in parallel_tasks[0].params) |
| |
| self.assertEqual("index-2", parallel_tasks[1].operation.name) |
| self.assertEqual(2400, parallel_tasks[1].warmup_time_period) |
| self.assertEqual(3600, parallel_tasks[1].time_period) |
| self.assertEqual(4, parallel_tasks[1].clients) |
| self.assertFalse("target-throughput" in parallel_tasks[1].params) |
| |
| self.assertEqual("index-3", parallel_tasks[2].operation.name) |
| self.assertEqual(2400, parallel_tasks[2].warmup_time_period) |
| self.assertEqual(36000, parallel_tasks[2].time_period) |
| self.assertEqual(16, parallel_tasks[2].clients) |
| self.assertEqual(10, parallel_tasks[2].params["target-throughput"]) |
| |
| def test_parallel_tasks_with_default_clients_does_not_propagate(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-1", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "warmup-time-period": 2400, |
| "time-period": 36000, |
| "clients": 2, |
| "tasks": [ |
| { |
| "name": "index-1-1", |
| "operation": "index-1" |
| }, |
| { |
| "name": "index-1-2", |
| "operation": "index-1" |
| }, |
| { |
| "name": "index-1-3", |
| "operation": "index-1" |
| }, |
| { |
| "name": "index-1-4", |
| "operation": "index-1" |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| parallel_element = resulting_workload.test_procedures[0].schedule[0] |
| parallel_tasks = parallel_element.tasks |
| |
| # we will only have two clients *in total* |
| self.assertEqual(2, parallel_element.clients) |
| self.assertEqual(4, len(parallel_tasks)) |
| for task in parallel_tasks: |
| self.assertEqual(1, task.clients) |
| |
| def test_parallel_tasks_with_completed_by_set(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-1", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "index-2", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "warmup-time-period": 2400, |
| "time-period": 36000, |
| "completed-by": "index-2", |
| "tasks": [ |
| { |
| "operation": "index-1" |
| }, |
| { |
| "operation": "index-2" |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| parallel_element = resulting_workload.test_procedures[0].schedule[0] |
| parallel_tasks = parallel_element.tasks |
| |
| # we will only have two clients *in total* |
| self.assertEqual(2, parallel_element.clients) |
| self.assertEqual(2, len(parallel_tasks)) |
| |
| self.assertEqual("index-1", parallel_tasks[0].operation.name) |
| self.assertFalse(parallel_tasks[0].completes_parent) |
| |
| self.assertEqual("index-2", parallel_tasks[1].operation.name) |
| self.assertTrue(parallel_tasks[1].completes_parent) |
| |
| def test_parallel_tasks_with_named_task_completed_by_set(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-1", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "index-2", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "warmup-time-period": 2400, |
| "time-period": 36000, |
| "completed-by": "name-index-2", |
| "tasks": [ |
| { |
| "name": "name-index-1", |
| "operation": "index-1" |
| }, |
| { |
| "name": "name-index-2", |
| "operation": "index-2" |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| parallel_element = resulting_workload.test_procedures[0].schedule[0] |
| parallel_tasks = parallel_element.tasks |
| |
| # we will only have two clients *in total* |
| self.assertEqual(2, parallel_element.clients) |
| self.assertEqual(2, len(parallel_tasks)) |
| |
| self.assertEqual("index-1", parallel_tasks[0].operation.name) |
| self.assertFalse(parallel_tasks[0].completes_parent) |
| |
| self.assertEqual("index-2", parallel_tasks[1].operation.name) |
| self.assertTrue(parallel_tasks[1].completes_parent) |
| |
| def test_parallel_tasks_with_completed_by_set_no_task_matches(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-1", |
| "operation-type": "bulk" |
| }, |
| { |
| "name": "index-2", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "completed-by": "non-existing-task", |
| "tasks": [ |
| { |
| "operation": "index-1" |
| }, |
| { |
| "operation": "index-2" |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. 'parallel' element for " |
| "test_procedure 'default-test_procedure' is marked with 'completed-by' " |
| "with task name 'non-existing-task' but no task with this name exists.", ctx.exception.args[0]) |
| |
| def test_parallel_tasks_with_completed_by_set_multiple_tasks_match(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-1", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "default-test_procedure", |
| "schedule": [ |
| { |
| "parallel": { |
| "completed-by": "index-1", |
| "tasks": [ |
| { |
| "operation": "index-1" |
| }, |
| { |
| "operation": "index-1" |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader() |
| |
| with self.assertRaises(loader.WorkloadSyntaxError) as ctx: |
| reader("unittest", workload_specification, "/mappings") |
| self.assertEqual("Workload 'unittest' is invalid. 'parallel' element for test_procedure " |
| "'default-test_procedure' contains multiple tasks with " |
| "the name 'index-1' which are marked with 'completed-by' but only task is allowed to match.", |
| ctx.exception.args[0]) |
| |
| def test_propagate_parameters_to_test_procedure_level(self): |
| workload_specification = { |
| "description": "description for unit test", |
| "parameters": { |
| "level": "workload", |
| "value": 7 |
| }, |
| "indices": [{"name": "test-index"}], |
| "operations": [ |
| { |
| "name": "index-append", |
| "operation-type": "bulk" |
| } |
| ], |
| "test_procedures": [ |
| { |
| "name": "test_procedure", |
| "default": True, |
| "parameters": { |
| "level": "test_procedure", |
| "another-value": 17 |
| }, |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| }, |
| { |
| "name": "another-test_procedure", |
| "schedule": [ |
| { |
| "operation": "index-append" |
| } |
| ] |
| } |
| |
| ] |
| } |
| reader = loader.WorkloadSpecificationReader( |
| selected_test_procedure="another-test_procedure") |
| resulting_workload = reader( |
| "unittest", workload_specification, "/mappings") |
| self.assertEqual(2, len(resulting_workload.test_procedures)) |
| self.assertEqual("test_procedure", |
| resulting_workload.test_procedures[0].name) |
| self.assertTrue(resulting_workload.test_procedures[0].default) |
| self.assertDictEqual({ |
| "level": "test_procedure", |
| "value": 7, |
| "another-value": 17 |
| }, resulting_workload.test_procedures[0].parameters) |
| |
| self.assertFalse(resulting_workload.test_procedures[1].default) |
| self.assertTrue(resulting_workload.test_procedures[1].selected) |
| self.assertDictEqual({ |
| "level": "workload", |
| "value": 7 |
| }, resulting_workload.test_procedures[1].parameters) |
| |
| |
| class MyMockWorkloadProcessor(loader.WorkloadProcessor): |
| pass |
| |
| |
| class WorkloadProcessorRegistryTests(TestCase): |
| def test_default_workload_processors(self): |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "system", "offline.mode", False) |
| tpr = loader.WorkloadProcessorRegistry(cfg) |
| expected_defaults = [ |
| loader.TaskFilterWorkloadProcessor, |
| loader.TestModeWorkloadProcessor, |
| loader.QueryRandomizerWorkloadProcessor, |
| loader.DefaultWorkloadPreparator |
| ] |
| actual_defaults = [proc.__class__ for proc in tpr.processors] |
| self.assertCountEqual(expected_defaults, actual_defaults) |
| |
| def test_override_default_preparator(self): |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "system", "offline.mode", False) |
| tpr = loader.WorkloadProcessorRegistry(cfg) |
| # call this once beforehand to make sure we don't "harden" the default in case calls are made out of order |
| tpr.processors # pylint: disable=pointless-statement |
| tpr.register_workload_processor(MyMockWorkloadProcessor()) |
| expected_processors = [ |
| loader.TaskFilterWorkloadProcessor, |
| loader.TestModeWorkloadProcessor, |
| loader.QueryRandomizerWorkloadProcessor, |
| MyMockWorkloadProcessor |
| ] |
| actual_processors = [proc.__class__ for proc in tpr.processors] |
| self.assertCountEqual(expected_processors, actual_processors) |
| |
| def test_allow_to_specify_default_preparator(self): |
| cfg = config.Config() |
| cfg.add(config.Scope.application, "system", "offline.mode", False) |
| tpr = loader.WorkloadProcessorRegistry(cfg) |
| tpr.register_workload_processor(MyMockWorkloadProcessor()) |
| # should be idempotent now that we have a custom config |
| tpr.processors # pylint: disable=pointless-statement |
| tpr.register_workload_processor(loader.DefaultWorkloadPreparator(cfg)) |
| expected_processors = [ |
| loader.TaskFilterWorkloadProcessor, |
| loader.TestModeWorkloadProcessor, |
| loader.QueryRandomizerWorkloadProcessor, |
| MyMockWorkloadProcessor, |
| loader.DefaultWorkloadPreparator, |
| ] |
| actual_processors = [proc.__class__ for proc in tpr.processors] |
| self.assertCountEqual(expected_processors, actual_processors) |