blob: 05fab3ff5e8988366103b567645d8311f8011cdb [file]
# SPDX-License-Identifier: Apache-2.0
#
# Modifications by Apache Solr contributors; see git log for details.
# Licensed under the Apache License, Version 2.0.
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import collections
import numbers
import re
from enum import Enum, unique, auto
from solrorbit import exceptions
class Collection:
"""
Defines a Solr collection (Solr-native equivalent of Index).
Attributes:
name: Collection name.
configset: Configset name registered on the cluster.
configset_path: Local path to the configset directory (containing conf/).
num_shards: Number of shards (default: 1).
replication_factor: NRT replicas per shard — alias for ``nrtReplicas`` in Solr V2 API (default: 1).
pull_replicas: Pull (read-only) replicas per shard (default: 0).
tlog_replicas: TLOG replicas per shard (default: 0).
"""
def __init__(self, name: str, configset: str = None,
configset_path: str = None,
num_shards: int = 1, replication_factor: int = 1,
pull_replicas: int = 0, tlog_replicas: int = 0):
self.name = name
self.configset = configset or name
self.configset_path = configset_path
self.num_shards = num_shards
self.replication_factor = replication_factor
self.pull_replicas = pull_replicas
self.tlog_replicas = tlog_replicas
def matches(self, pattern):
if pattern is None:
return True
elif pattern in ["_all", "*"]:
return True
elif self.name == pattern:
return True
else:
return False
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
class Documents:
SOURCE_FORMAT_BULK = "bulk"
SOURCE_FORMAT_HDF5 = "hdf5"
SOURCE_FORMAT_BIG_ANN = "big-ann"
SUPPORTED_SOURCE_FORMAT = [SOURCE_FORMAT_BULK, SOURCE_FORMAT_HDF5, SOURCE_FORMAT_BIG_ANN]
def __init__(self, source_format, document_file=None, document_file_parts=None, document_archive=None, base_url=None, source_url=None,
includes_action_and_meta_data=False,
number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_collection=None,
target_type=None, meta_data=None):
"""
:param source_format: The format of these documents. Mandatory.
:param document_file: The file name of benchmark documents after decompression. Optional (e.g. for percolation we
just need a mapping but no documents)
:param document_file_parts: If the document file is provided as parts, a list of dicts, each holding the filename and file size.
:param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for
percolation we just need a mapping but no documents)
:param base_url: The URL from which to load data if they are not available locally. Excludes the file or object name. Optional.
:param source_url: The full URL to the file or object from which to load data if not available locally. Optional.
:param includes_action_and_meta_data: True, if the source file already includes the action and meta-data line. False, if it only
contains documents.
:param number_of_documents: The number of documents
in the benchmark document. Needed for proper progress reporting. Only needed if
a document_archive is given.
:param compressed_size_in_bytes: The compressed size in bytes of
the benchmark document. Needed for verification of the download and
user reporting. Only useful if a document_archive is given (optional but recommended to be set).
:param uncompressed_size_in_bytes: The size in bytes of the benchmark document after decompressing it.
Only useful if a document_archive is given (optional but recommended to be set).
:param target_collection: The Solr collection to target for bulk operations. May be ``None`` if
``includes_action_and_meta_data`` is ``False``.
:param target_type: The document type to target for bulk operations. May be ``None`` if ``includes_action_and_meta_data``
is ``False``.
:param meta_data: A dict containing key-value pairs with additional meta-data describing documents. Optional.
"""
self.source_format = source_format
self.document_file = document_file
self.document_file_parts = document_file_parts
self.document_archive = document_archive
self.base_url = base_url
self.source_url = source_url
self.includes_action_and_meta_data = includes_action_and_meta_data
self._number_of_documents = number_of_documents
self._compressed_size_in_bytes = compressed_size_in_bytes
self._uncompressed_size_in_bytes = uncompressed_size_in_bytes
self.target_collection = target_collection
self.target_type = target_type
self.meta_data = meta_data or {}
def has_compressed_corpus(self):
return self.document_archive is not None
def has_uncompressed_corpus(self):
return self.document_file is not None
@property
def number_of_documents(self):
return self._number_of_documents
@number_of_documents.setter
def number_of_documents(self, value):
self._number_of_documents = value
@property
def uncompressed_size_in_bytes(self):
return self._uncompressed_size_in_bytes
@uncompressed_size_in_bytes.setter
def uncompressed_size_in_bytes(self, value):
self._uncompressed_size_in_bytes = value
@property
def compressed_size_in_bytes(self):
return self._compressed_size_in_bytes
@compressed_size_in_bytes.setter
def compressed_size_in_bytes(self, value):
self._compressed_size_in_bytes = value
@property
def number_of_lines(self):
if self.includes_action_and_meta_data:
return self.number_of_documents * 2
else:
return self.number_of_documents
@property
def is_bulk(self):
return self.source_format == Documents.SOURCE_FORMAT_BULK
@property
def support_file_offset_table(self):
# Will support create file offset table only for bulk source formats. In future we can move it to
# a list instead of checking bulk directly
return self.source_format == Documents.SOURCE_FORMAT_BULK
@property
def is_supported_source_format(self):
return self.source_format in Documents.SUPPORTED_SOURCE_FORMAT
def __str__(self):
return "%s documents from %s" % (self.source_format, self.document_file)
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.source_format) ^ hash(self.document_file) ^ hash(self.document_archive) ^ hash(self.base_url) ^ \
hash(self.source_url) ^ hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ \
hash(self.compressed_size_in_bytes) ^ hash(self.uncompressed_size_in_bytes) ^ hash(self.target_collection) ^ \
hash(self.target_type) ^ hash(frozenset(self.meta_data.items()))
def __eq__(self, othr):
return (isinstance(othr, type(self)) and
(self.source_format, self.document_file, self.document_archive, self.base_url, self.source_url,
self.includes_action_and_meta_data, self.number_of_documents, self.compressed_size_in_bytes,
self.uncompressed_size_in_bytes, self.target_collection, self.target_type, self.meta_data) ==
(othr.source_format, othr.document_file, othr.document_archive, othr.base_url, othr.source_url,
othr.includes_action_and_meta_data, othr.number_of_documents, othr.compressed_size_in_bytes,
othr.uncompressed_size_in_bytes, othr.target_collection, othr.target_type, othr.meta_data))
class DocumentCorpus:
def __init__(self, name, documents=None, streaming_ingestion=False, meta_data=None):
"""
:param name: The name of this document corpus. Mandatory.
:param documents: A list of ``Documents`` instances that belong to this corpus.
:param meta_data: A dict containing key-value pairs with additional meta-data describing this corpus. Optional.
"""
self.name = name
self.documents = documents or []
self.streaming_ingestion = streaming_ingestion
self.meta_data = meta_data or {}
def number_of_documents(self, source_format):
num = 0
for doc in self.documents:
if doc.source_format == source_format:
num += doc.number_of_documents
return num
def compressed_size_in_bytes(self, source_format):
num = 0
for doc in self.documents:
if doc.source_format == source_format and doc.compressed_size_in_bytes is not None:
num += doc.compressed_size_in_bytes
else:
return None
return num
def uncompressed_size_in_bytes(self, source_format):
num = 0
for doc in self.documents:
if doc.source_format == source_format and doc.uncompressed_size_in_bytes is not None:
num += doc.uncompressed_size_in_bytes
else:
return None
return num
def filter(self, source_format=None, target_collections=None):
filtered = []
for d in self.documents:
# skip if source format or target collection does not match
if source_format and d.source_format != source_format:
continue
if target_collections and d.target_collection not in target_collections:
continue
filtered.append(d)
return DocumentCorpus(self.name, filtered, streaming_ingestion=self.streaming_ingestion,
meta_data=dict(self.meta_data))
def union(self, other):
"""
Creates a new corpus based on the current and the provided other corpus. This is not meant as a generic union
of two arbitrary corpora but rather to unify the documents referenced by two instances of the same corpus. This
is useful when two tasks reference different subsets of a corpus and a unified view (e.g. for downloading the
appropriate document files) is required.
:param other: The other corpus to unify with this one. Must have the same name and meta-data.
:return: A document corpus instance with the same and meta-data but with documents from both corpora.
"""
if self.name != other.name:
raise exceptions.BenchmarkAssertionError(f"Corpora names differ: [{self.name}] and [{other.name}].")
if self.meta_data != other.meta_data:
raise exceptions.BenchmarkAssertionError(f"Corpora meta-data differ: [{self.meta_data}] and [{other.meta_data}].")
if self is other:
return self
else:
return DocumentCorpus(name=self.name,
documents=list(set(self.documents).union(other.documents)),
streaming_ingestion=self.streaming_ingestion,
meta_data=dict(self.meta_data))
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name) ^ hash(self.documents) ^ hash(frozenset(self.meta_data.items()))
def __eq__(self, othr):
return (isinstance(othr, type(self)) and
(self.name, self.documents, self.meta_data) ==
(othr.name, othr.documents, othr.meta_data))
class Workload:
"""
A workload defines the data set that is used. It corresponds loosely to a use case (e.g. logging, event processing, analytics, ...)
"""
def __init__(self, name, description=None, meta_data=None, test_procedures=None,
corpora=None, has_plugins=False, collections=None):
"""
Creates a new workload.
:param name: A short, descriptive name for this workload. As per convention, this name should be in lower-case without spaces.
:param description: A description for this workload (should be less than 80 characters).
:param meta_data: An optional dict of meta-data elements to attach to each metrics record. Default: {}.
:param test_procedures: A list of one or more test_procedures to use.
Precondition: If the list is non-empty it contains exactly one element
with its ``default`` property set to ``True``.
:param corpora: A list of document corpus definitions for this workload. May be None.
:param has_plugins: True iff the workload also defines plugins (e.g. custom runners or parameter sources).
:param collections: A list of Solr collections for this workload. May be None.
"""
self.name = name
self.meta_data = meta_data if meta_data else {}
self.description = description if description is not None else ""
self.test_procedures = test_procedures if test_procedures else []
self.collections = collections if collections else []
self.corpora = corpora if corpora else []
self.has_plugins = has_plugins
@property
def default_test_procedure(self):
for test_procedure in self.test_procedures:
if test_procedure.default:
return test_procedure
# This should only happen if we don't have any test_procedures
return None
@property
def selected_test_procedure(self):
for test_procedure in self.test_procedures:
if test_procedure.selected:
return test_procedure
return None
@property
def selected_test_procedure_or_default(self):
selected = self.selected_test_procedure
return selected if selected else self.default_test_procedure
def find_test_procedure_or_default(self, name):
"""
:param name: The name of the test_procedure to find.
:return: The test_procedure with the given name. The default test_procedure, if the name is "" or ``None``.
"""
if name in [None, ""]:
return self.default_test_procedure
else:
return self.find_test_procedure(name)
def find_test_procedure(self, name):
for test_procedure in self.test_procedures:
if test_procedure.name == name:
return test_procedure
raise exceptions.InvalidName("Unknown test_procedure [%s] for workload [%s]" % (name, self.name))
@property
def number_of_documents(self):
num_docs = 0
for corpus in self.corpora:
# TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
num_docs += corpus.number_of_documents(Documents.SOURCE_FORMAT_BULK)
return num_docs
@property
def compressed_size_in_bytes(self):
size = 0
for corpus in self.corpora:
# TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
curr_size = corpus.compressed_size_in_bytes(Documents.SOURCE_FORMAT_BULK)
if curr_size is not None:
size += curr_size
else:
return None
return size
@property
def uncompressed_size_in_bytes(self):
size = 0
for corpus in self.corpora:
# TODO #341: Improve API to let users define what they want (everything, just specific types, ...)
curr_size = corpus.uncompressed_size_in_bytes(Documents.SOURCE_FORMAT_BULK)
if curr_size is not None:
size += curr_size
else:
return None
return size
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name) ^ hash(self.meta_data) ^ hash(self.description) ^ hash(self.test_procedures) ^ \
hash(self.corpora)
def __eq__(self, othr):
return (isinstance(othr, type(self)) and
(self.name, self.meta_data, self.description, self.test_procedures, self.collections, self.corpora) ==
(othr.name, othr.meta_data, othr.description, othr.test_procedures, othr.collections, othr.corpora))
class TestProcedure:
"""
A test procedure defines the concrete operations that will be done.
"""
#Pytest throws a collection warning if the following line is removed
__test__ = False
def __init__(self,
name,
description=None,
user_info=None,
default=False,
selected=False,
auto_generated=False,
parameters=None,
meta_data=None,
schedule=None):
self.name = name
self.parameters = parameters if parameters else {}
self.meta_data = meta_data if meta_data else {}
self.description = description
self.user_info = user_info
self.default = default
self.selected = selected
self.auto_generated = auto_generated
self.schedule = schedule if schedule else []
def prepend_tasks(self, tasks):
self.schedule = tasks + self.schedule
def remove_task(self, task):
self.schedule.remove(task)
def __str__(self):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.name) ^ hash(self.description) ^ hash(self.default) ^ \
hash(self.selected) ^ hash(self.auto_generated) ^ hash(self.parameters) ^ hash(self.meta_data) ^ \
hash(self.schedule)
def __eq__(self, othr):
return (isinstance(othr, type(self)) and
(self.name, self.description, self.default, self.selected, self.auto_generated,
self.parameters, self.meta_data, self.schedule) ==
(othr.name, othr.description, othr.default, othr.selected, othr.auto_generated,
othr.parameters, othr.meta_data, othr.schedule))
@unique
class AdminStatus(Enum):
# We can't use True/False as they are keywords
Yes = auto()
No = auto()
@unique
class OperationType(Enum):
# for the time being we are not considering this action as administrative
Search = (3, AdminStatus.No)
Bulk = (4, AdminStatus.No)
RawRequest = (5, AdminStatus.No)
WaitForBackupCreate = (7, AdminStatus.No)
Composite = (8, AdminStatus.No)
# administrative actions
Sleep = (32, AdminStatus.Yes)
DeleteBackupRepository = (33, AdminStatus.Yes)
CreateBackupRepository = (34, AdminStatus.Yes)
CreateBackup = (35, AdminStatus.Yes)
RestoreBackup = (36, AdminStatus.Yes)
def __init__(self, op_id: int, admin_status: AdminStatus):
self.op_id = op_id
self.admin_status = admin_status
@property
def admin_op(self):
# pylint: disable=comparison-with-callable
return self.admin_status == AdminStatus.Yes
def to_hyphenated_string(self):
"""
Turns enum constants into hyphenated names, e.g. ``WaitForTransform`` becomes ``wait-for-transform``.
"""
# Pylint complains that self.name is not iterable
# pylint: disable=not-an-iterable
return "".join(["-" + c.lower() if c.isupper() else c for c in self.name]).lstrip("-")
# pylint: disable=too-many-return-statements
@classmethod
def from_hyphenated_string(cls, v):
if v == "search":
return OperationType.Search
elif v == "bulk":
return OperationType.Bulk
elif v == "raw-request":
return OperationType.RawRequest
elif v == "sleep":
return OperationType.Sleep
elif v == "delete-backup-repository":
return OperationType.DeleteBackupRepository
elif v == "create-backup-repository":
return OperationType.CreateBackupRepository
elif v == "create-backup":
return OperationType.CreateBackup
elif v == "wait-for-backup-create":
return OperationType.WaitForBackupCreate
elif v == "restore-backup":
return OperationType.RestoreBackup
elif v == "composite":
return OperationType.Composite
else:
raise KeyError(f"No enum value for [{v}]")
class TaskNameFilter:
def __init__(self, name):
self.name = name
def matches(self, task):
return self.name == task.name
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return isinstance(other, type(self)) and self.name == other.name
def __str__(self, *args, **kwargs):
return f"filter for task name [{self.name}]"
class TaskOpTypeFilter:
def __init__(self, op_type_name):
self.op_type = op_type_name
def matches(self, task):
return self.op_type == task.operation.type
def __hash__(self):
return hash(self.op_type)
def __eq__(self, other):
return isinstance(other, type(self)) and self.op_type == other.op_type
def __str__(self, *args, **kwargs):
return f"filter for operation type [{self.op_type}]"
class TaskTagFilter:
def __init__(self, tag_name):
self.tag_name = tag_name
def matches(self, task):
return self.tag_name in task.tags
def __hash__(self):
return hash(self.tag_name)
def __eq__(self, other):
return isinstance(other, type(self)) and self.tag_name == other.tag_name
def __str__(self, *args, **kwargs):
return f"filter for tasks tagged [{self.tag_name}]"
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
# Schedule elements
class Parallel:
def __init__(self, tasks, clients=None):
self.tasks = tasks
self._clients = clients
self.nested = True
@property
def clients(self):
if self._clients is not None:
return self._clients
else:
num_clients = 0
for task in self.tasks:
num_clients += task.clients
return num_clients
def matches(self, task_filter):
# a parallel element matches if any of its elements match
for task in self.tasks:
if task.matches(task_filter):
return True
return False
def remove_task(self, task):
self.tasks.remove(task)
def __iter__(self):
return iter(self.tasks)
def __str__(self, *args, **kwargs):
return "%d parallel tasks" % len(self.tasks)
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
def __hash__(self):
return hash(self.tasks)
def __eq__(self, other):
return isinstance(other, type(self)) and self.tasks == other.tasks
Throughput = collections.namedtuple("Throughput", ["value", "unit"])
class Task:
THROUGHPUT_PATTERN = re.compile(r"(?P<value>(\d*\.)?\d+)\s(?P<unit>\w+/s)")
IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]
def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None,
warmup_time_period=None, time_period=None, ramp_up_time_period=None, ramp_down_time_period=None,
clients=1, completes_parent=False,
schedule=None, params=None):
self.name = name
self.operation = operation
if isinstance(tags, str):
self.tags = [tags]
elif tags:
self.tags = tags
else:
self.tags = []
self.meta_data = meta_data if meta_data else {}
self.warmup_iterations = warmup_iterations
self.iterations = iterations
self.warmup_time_period = warmup_time_period
self.time_period = time_period
self.ramp_up_time_period = ramp_up_time_period
self.ramp_down_time_period = ramp_down_time_period
self.clients = clients
self.completes_parent = completes_parent
self.schedule = schedule
self.params = params if params else {}
self.nested = False
def matches(self, task_filter):
return task_filter.matches(self)
@property
def target_throughput(self):
def numeric(v):
# While booleans can be converted to a number (False -> 0, True -> 1), we don't want to allow that here
return isinstance(v, numbers.Number) and not isinstance(v, bool)
target_throughput = self.params.get("target-throughput")
target_interval = self.params.get("target-interval")
if target_interval is not None and target_throughput is not None:
raise exceptions.InvalidSyntax(f"Task [{self}] specifies target-interval [{target_interval}] and "
f"target-throughput [{target_throughput}] but only one of them is allowed.")
value = None
unit = "ops/s"
if target_interval:
if not numeric(target_interval):
raise exceptions.InvalidSyntax(f"Target interval [{target_interval}] for task [{self}] must be numeric.")
value = 1 / float(target_interval)
elif target_throughput:
if isinstance(target_throughput, str):
matches = re.match(Task.THROUGHPUT_PATTERN, target_throughput)
if matches:
value = float(matches.group("value"))
unit = matches.group("unit")
else:
raise exceptions.InvalidSyntax(f"Task [{self}] specifies invalid target throughput [{target_throughput}].")
elif numeric(target_throughput):
value = float(target_throughput)
else:
raise exceptions.InvalidSyntax(f"Target throughput [{target_throughput}] for task [{self}] "
f"must be string or numeric.")
if value:
return Throughput(value, unit)
else:
return None
@property
def ignore_response_error_level(self):
ignore_response_error_level = self.params.get("ignore-response-error-level")
if ignore_response_error_level and \
ignore_response_error_level not in Task.IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST:
raise exceptions.InvalidSyntax(
f"Task [{self}] specifies ignore-response-error-level to [{ignore_response_error_level}] but "
f"the only allowed values are [{','.join(Task.IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST)}].")
return ignore_response_error_level
def error_behavior(self, default_error_behavior):
"""
Returns the desired behavior when encountering errors during task execution.
:param default_error_behavior: (str) the default error behavior for the benchmark
:return: (str) prescribing error handling when a non-fatal error occurs:
"abort": will fail when any error gets encountered
"continue": will continue for non fatal errors
"""
behavior = "continue"
if default_error_behavior == "abort":
if self.ignore_response_error_level != "non-fatal":
behavior = "abort"
return behavior
def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ \
hash(self.ramp_down_time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ hash(self.completes_parent)
def __eq__(self, other):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations,
self.warmup_time_period, self.time_period, self.ramp_up_time_period,
self.ramp_down_time_period, self.clients, self.schedule,
self.completes_parent) == (other.name, other.operation, other.warmup_iterations,
other.iterations, other.warmup_time_period, other.time_period,
other.ramp_up_time_period, other.ramp_down_time_period,
other.clients, other.schedule,
other.completes_parent)
def __iter__(self):
return iter([self])
def __str__(self, *args, **kwargs):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)
class Operation:
def __init__(self, name, operation_type, meta_data=None, params=None, param_source=None):
if params is None:
params = {}
self.name = name
self.meta_data = meta_data if meta_data else {}
self.type = operation_type
self.params = params
self.param_source = param_source
@property
def include_in_reporting(self):
return self.params.get("include-in-reporting", True)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return isinstance(other, type(self)) and self.name == other.name
def __str__(self, *args, **kwargs):
return self.name
def __repr__(self):
r = []
for prop, value in vars(self).items():
r.append("%s = [%s]" % (prop, repr(value)))
return ", ".join(r)