blob: 7bc046ade42864ed84b88eb2a3e23c52d3671cb3 [file]
from unittest.mock import Mock
import pytest
from solrorbit import config
from solrorbit.aggregator import Aggregator, AggregatedResults
@pytest.fixture
def mock_config():
mock_cfg = Mock(spec=config.Config)
mock_cfg.opts.side_effect = lambda *args: "test_procedure_name" if args == ("workload", "test_procedure.name") else "/path/to/root"
return mock_cfg
@pytest.fixture
def mock_test_runs():
return {
"test1": Mock(),
"test2": Mock()
}
@pytest.fixture
def mock_args():
return Mock(
results_file="",
test_run_id="",
workload_repository="default"
)
@pytest.fixture
def mock_test_store():
mock_store = Mock()
mock_store.find_by_test_run_id.side_effect = [
Mock(results={"key1": {"nested": 10}}, workload="workload1", test_procedure="test_proc1"),
Mock(results={"key1": {"nested": 20}}, workload="workload1", test_procedure="test_proc1")
]
return mock_store
@pytest.fixture
def aggregator(mock_config, mock_test_runs, mock_args, mock_test_store):
aggregator = Aggregator(mock_config, mock_test_runs, mock_args)
aggregator.test_store = mock_test_store
return aggregator
def test_count_iterations_for_each_op(aggregator):
mock_workload = Mock()
mock_task = Mock(spec=['name', 'iterations'])
mock_task.name = "op1"
mock_task.iterations = 5
mock_schedule = [mock_task]
mock_test_procedure = Mock(spec=['name', 'schedule'])
mock_test_procedure.name = "test_procedure_name"
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]
mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure)
mock_test_run = Mock(test_run_id="test1", workload_params={})
aggregator.loaded_workload = mock_workload
aggregator.test_procedure_name = "test_procedure_name"
aggregator.count_iterations_for_each_op(mock_test_run)
assert "test1" in aggregator.accumulated_iterations, "test1 not found in accumulated_iterations"
assert "op1" in aggregator.accumulated_iterations["test1"], "op1 not found in accumulated_iterations for test1"
assert aggregator.accumulated_iterations["test1"]["op1"] == 5
def test_accumulate_results(aggregator):
mock_test_run = Mock()
mock_test_run.results = {
"op_metrics": [
{
"task": "task1",
"throughput": 100,
"latency": {"avg": 10, "unit": "ms"},
"service_time": 5,
"client_processing_time": 2,
"processing_time": 3,
"error_rate": 0.1,
"duration": 60
}
]
}
aggregator.accumulate_results(mock_test_run)
assert "task1" in aggregator.accumulated_results
assert all(metric in aggregator.accumulated_results["task1"] for metric in aggregator.metrics)
def test_test_run_compatibility_check(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_run_id.side_effect = [
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload1", test_procedure="test_proc1"), # Add one more mock response
]
aggregator.test_store = mock_test_store
aggregator.test_runs = {"test1": Mock(), "test2": Mock()}
assert aggregator.test_run_compatibility_check()
def test_aggregate_json_by_key(aggregator):
result = aggregator.aggregate_json_by_key("key1.nested")
assert result == 15
def test_calculate_weighted_average(aggregator):
task_metrics = {
"throughput": [100, 200],
"latency": [{"avg": 10, "unit": "ms"}, {"avg": 20, "unit": "ms"}]
}
task_name = "op1"
# set up accumulated_iterations
aggregator.accumulated_iterations = {
"test1": {"op1": 2},
"test2": {"op1": 3}
}
aggregator.test_runs = {"test1": Mock(), "test2": Mock()}
result = aggregator.calculate_weighted_average(task_metrics, task_name)
assert result["throughput"] == 160 # (100*2 + 200*3) / (2+3)
assert result["latency"]["avg"] == 16 # (10*2 + 20*3) / (2+3)
assert result["latency"]["unit"] == "ms"
def test_calculate_rsd(aggregator):
values = [1, 2, 3, 4, 5]
rsd = aggregator.calculate_rsd(values, "test_metric")
assert isinstance(rsd, float)
def test_test_run_compatibility_check_incompatible(aggregator):
mock_test_store = Mock()
mock_test_store.find_by_test_run_id.side_effect = [
Mock(workload="workload1", test_procedure="test_proc1"),
Mock(workload="workload2", test_procedure="test_proc1"),
]
aggregator.test_store = mock_test_store
aggregator.test_runs = {"test1": Mock(), "test2": Mock()}
with pytest.raises(ValueError):
aggregator.test_run_compatibility_check()
def test_aggregated_results():
results = {"key": "value"}
agg_results = AggregatedResults(results)
assert agg_results.as_dict() == results