blob: b2c15ec00e4e8553118d1be17dfff170bd5ae795 [file] [log] [blame]
from typing import Callable, NamedTuple
QUEUE_SETTINGS_CALCULATORS = []
class QueueSettingsCalculator(NamedTuple):
id: str
name: str
func: Callable
def queue_settings_calculator(_func=None, *, id=None, name=None, **kwargs):
"""Decorator for registering queue settings calculator functions."""
def decorator(func):
# Register decorator
name_ = name
if name_ is None:
name_ = func.__name__
id_ = id
if id_ is None:
id_ = func.__module__ + ":" + func.__name__
if exists(id_):
raise Exception(f"Duplicate queue settings calculator id: {id_}")
QUEUE_SETTINGS_CALCULATORS.append(QueueSettingsCalculator(id_, name_, func))
return func
if _func is None:
return decorator
else:
return decorator(_func)
def calculate_queue_settings(calculator_id, request, experiment_model):
"""Invoke a queue settings calculator by id."""
calcs = [calc for calc in QUEUE_SETTINGS_CALCULATORS if calc.id == calculator_id]
if len(calcs) == 0:
raise LookupError(f"Could not find queue settings calculator for {calculator_id}")
calc = calcs[0]
try:
return calc.func(request, experiment_model)
except Exception as e:
raise Exception(f"Failed to calculate queue settings for {calculator_id}") from e
def get_all():
"""Return a list of all registered queue settings calculators."""
return QUEUE_SETTINGS_CALCULATORS.copy()
def exists(calculator_id):
calcs = [calc for calc in QUEUE_SETTINGS_CALCULATORS if calc.id == calculator_id]
return len(calcs) == 1
def reset_registry():
"""Reset registry, used for testing."""
global QUEUE_SETTINGS_CALCULATORS
QUEUE_SETTINGS_CALCULATORS = []