blob: 7d0b5fa513b0e1186674c3b45ad072b97be1011b [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import os
import socket
import subprocess
import tempfile
import time
import urllib.request
from pathlib import Path
import pytest
from e2e_test_utils import _remove_trailing_whitespaces, container
CARBON_PORT = 2003
HTTP_PORT = 80
def test_analyze_graphite():
"""
End-to-end test for the Graphite example from docs/GRAPHITE.md.
Starts Graphite docker container, writes sample data, then runs otava analyze,
and verifies the output contains expected change points.
"""
with container(
"graphiteapp/graphite-statsd",
ports=[HTTP_PORT, CARBON_PORT],
readiness_check=_graphite_readiness_check,
) as (container_id, port_map):
# Seed data into Graphite using the same pattern as datagen.sh
data_points = _seed_graphite_data(port_map[CARBON_PORT])
# Wait for data to be written and available
_wait_for_graphite_data(
http_port=port_map[HTTP_PORT],
metric_path="performance-tests.daily.my-product.client.throughput",
expected_points=data_points,
)
# Run the Otava analysis
proc = subprocess.run(
["uv", "run", "otava", "analyze", "my-product.test", "--since=-10m"],
capture_output=True,
text=True,
timeout=600,
env=dict(
os.environ,
OTAVA_CONFIG=str(Path("examples/graphite/config/otava.yaml")),
GRAPHITE_ADDRESS=f"http://localhost:{port_map[HTTP_PORT]}/",
GRAFANA_ADDRESS="http://localhost:3000/",
GRAFANA_USER="admin",
GRAFANA_PASSWORD="admin",
),
)
if proc.returncode != 0:
pytest.fail(
"Command returned non-zero exit code.\n\n"
f"Command: {proc.args!r}\n"
f"Exit code: {proc.returncode}\n\n"
f"Stdout:\n{proc.stdout}\n\n"
f"Stderr:\n{proc.stderr}\n"
)
# Verify output contains expected columns and change point indicators
output = _remove_trailing_whitespaces(proc.stdout)
# Check that the header contains expected column names
assert "throughput" in output
assert "response_time" in output
assert "cpu_usage" in output
# Data shows throughput dropped from ~61k to ~57k (-5.6%) and cpu increased from 0.2 to 0.8 (+300%)
assert "-5.6%" in output # throughput change
assert "+300.0%" in output # cpu_usage change
def _graphite_readiness_check(container_id: str, port_map: dict[int, int]) -> bool:
"""
Check if Graphite is fully ready by writing a canary metric and verifying it's queryable.
This ensures both Carbon (write path) and Graphite-web (read path) are operational.
"""
carbon_port = port_map[CARBON_PORT]
http_port = port_map[HTTP_PORT]
# Send a canary metric to Carbon
timestamp = int(time.time())
canary_metrics = "test.canary.readiness"
message = f"{canary_metrics} 1 {timestamp}\n"
try:
with socket.create_connection(("localhost", carbon_port), timeout=5) as sock:
sock.sendall(message.encode("utf-8"))
except OSError:
return False
# Check if the canary metric is queryable via Graphite-web
url = f"http://localhost:{http_port}/render?target={canary_metrics}&format=json&from=-1min"
try:
with urllib.request.urlopen(url, timeout=5) as response:
data = json.loads(response.read().decode("utf-8"))
if data and len(data) > 0:
datapoints = data[0].get("datapoints", [])
# Check if we have at least one non-null data point
if any(dp[0] is not None for dp in datapoints):
return True
except (urllib.error.URLError, json.JSONDecodeError, OSError):
pass
return False
def _seed_graphite_data(
carbon_port: int,
prefix: str = "performance-tests.daily.my-product",
) -> int:
"""
Seed Graphite with test data matching the pattern from examples/graphite/datagen/datagen.sh.
Data pattern (from newest to oldest, matching datagen.sh array order):
- throughput: 56950, 57980, 57123, 60960, 60160, 61160 (index 0 is newest)
- response_time (p50): 85, 87, 88, 89, 85, 87
- cpu_usage: 0.7, 0.9, 0.8, 0.1, 0.3, 0.2
When displayed chronologically (oldest to newest), this shows:
- throughput dropped from ~61k to ~57k (-5.6% regression)
- cpu increased from 0.2 to 0.8 (+300% regression)
"""
throughput_path = f"{prefix}.client.throughput"
throughput_values = [56950, 57980, 57123, 60960, 60160, 61160]
p50_path = f"{prefix}.client.p50"
p50_values = [85, 87, 88, 89, 85, 87]
cpu_path = f"{prefix}.server.cpu"
cpu_values = [0.7, 0.9, 0.8, 0.1, 0.3, 0.2]
start_timestamp = int(time.time())
num_points = len(throughput_values)
for i in range(num_points):
# Data is sent from newest to oldest (same as datagen.sh)
timestamp = start_timestamp - (i * 60)
_send_to_graphite(carbon_port, throughput_path, throughput_values[i], timestamp)
_send_to_graphite(carbon_port, p50_path, p50_values[i], timestamp)
_send_to_graphite(carbon_port, cpu_path, cpu_values[i], timestamp)
return num_points
def _send_to_graphite(carbon_port: int, path: str, value: float, timestamp: int):
"""
Send a single metric to Graphite via the Carbon plaintext protocol.
"""
message = f"{path} {value} {timestamp}\n"
try:
with socket.create_connection(("localhost", carbon_port), timeout=5) as sock:
sock.sendall(message.encode("utf-8"))
except OSError as e:
pytest.fail(f"Failed to send metric to Graphite: {e}")
def _wait_for_graphite_data(
http_port: int,
metric_path: str,
expected_points: int,
timeout: float = 120,
poll_interval: float = 0.5,
) -> None:
"""
Wait for Graphite to have the expected data points available.
Polls the Graphite render API until the specified metric has at least
the expected number of non-null data points, or until the timeout expires.
"""
url = f"http://localhost:{http_port}/render?target={metric_path}&format=json&from=-10min"
deadline = time.time() + timeout
last_observed_count = 0
while time.time() < deadline:
try:
with urllib.request.urlopen(url, timeout=5) as response:
data = json.loads(response.read().decode("utf-8"))
if data and len(data) > 0:
datapoints = data[0].get("datapoints", [])
# Count non-null values
non_null_count = sum(1 for dp in datapoints if dp[0] is not None)
last_observed_count = non_null_count
if non_null_count >= expected_points:
return
except (urllib.error.URLError, json.JSONDecodeError, OSError):
pass # Retry on connection errors
time.sleep(poll_interval)
pytest.fail(
f"Timeout waiting for Graphite data. "
f"Expected {expected_points} points for metric '{metric_path}' within {timeout}s, got {last_observed_count}"
)
def test_analyze_graphite_with_branch():
"""
End-to-end test for Graphite with %{BRANCH} substitution.
Verifies that using --branch correctly substitutes %{BRANCH} in the prefix
to fetch data from a branch-specific Graphite path.
"""
with container(
"graphiteapp/graphite-statsd",
ports=[HTTP_PORT, CARBON_PORT],
readiness_check=_graphite_readiness_check,
) as (container_id, port_map):
# Seed data into a branch-specific path
branch_name = "feature-xyz"
prefix = f"performance-tests.{branch_name}.my-product"
data_points = _seed_graphite_data(port_map[CARBON_PORT], prefix=prefix)
# Wait for data to be written and available
_wait_for_graphite_data(
http_port=port_map[HTTP_PORT],
metric_path=f"performance-tests.{branch_name}.my-product.client.throughput",
expected_points=data_points,
)
# Create a temporary config file with %{BRANCH} in the prefix
config_content = """
tests:
branch-test:
type: graphite
prefix: performance-tests.%{BRANCH}.my-product
tags: [perf-test, branch]
metrics:
throughput:
suffix: client.throughput
direction: 1
scale: 1
response_time:
suffix: client.p50
direction: -1
scale: 1
cpu_usage:
suffix: server.cpu
direction: -1
scale: 1
"""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", delete=False
) as config_file:
config_file.write(config_content)
config_file_path = config_file.name
try:
# Run the Otava analysis with --branch
proc = subprocess.run(
[
"uv",
"run",
"otava",
"analyze",
"branch-test",
"--branch",
branch_name,
"--since=-10m",
],
capture_output=True,
text=True,
timeout=600,
env=dict(
os.environ,
OTAVA_CONFIG=config_file_path,
GRAPHITE_ADDRESS=f"http://localhost:{port_map[HTTP_PORT]}/",
),
)
if proc.returncode != 0:
pytest.fail(
"Command returned non-zero exit code.\n\n"
f"Command: {proc.args!r}\n"
f"Exit code: {proc.returncode}\n\n"
f"Stdout:\n{proc.stdout}\n\n"
f"Stderr:\n{proc.stderr}\n"
)
# Verify output contains expected columns
output = _remove_trailing_whitespaces(proc.stdout)
# Check that the header contains expected column names
assert "throughput" in output
assert "response_time" in output
assert "cpu_usage" in output
# Data shows throughput dropped from ~61k to ~57k (-5.6%) and cpu increased +300%
assert "-5.6%" in output # throughput change
assert "+300.0%" in output # cpu_usage change
finally:
os.unlink(config_file_path)