blob: 551ca53724b321f2f7345ae55eb353eda7f98d61 [file] [log] [blame]
"""
Health check and testing API endpoints.
"""
import time
import tempfile
from typing import Dict, Any
from pathlib import Path
from fastapi import APIRouter, HTTPException, Depends
from datetime import datetime
from execute import SimulationRunner
from reasoning import ReasoningAgent
from ..models import HealthResponse, HealthStatus, ComponentHealth, TestRequest, TestResponse
from ..dependencies import get_database
router = APIRouter()
@router.get("/status", response_model=HealthResponse, summary="System health status")
async def health_status(db = Depends(get_database)):
"""
Get overall system health status.
Checks all major components and returns their health status.
"""
components = []
overall_status = HealthStatus.HEALTHY
# Check database
try:
with db.config.get_sqlite_connection() as conn:
conn.execute("SELECT 1").fetchone()
components.append(ComponentHealth(
name="database",
status=HealthStatus.HEALTHY,
message="Database connection successful",
last_check=datetime.utcnow()
))
except Exception as e:
components.append(ComponentHealth(
name="database",
status=HealthStatus.UNHEALTHY,
message=f"Database error: {str(e)}",
last_check=datetime.utcnow()
))
overall_status = HealthStatus.UNHEALTHY
# Check simulation execution
try:
# Create a simple test script
test_script = '''
def simulate(**params):
return {"result": params.get("x", 0) * 2}
'''
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(test_script)
temp_path = f.name
try:
runner = SimulationRunner()
result = runner.run(Path(temp_path), {"x": 5})
if result.get("_ok", False) and result.get("result") == 10:
components.append(ComponentHealth(
name="simulation_execution",
status=HealthStatus.HEALTHY,
message="Simulation execution working",
last_check=datetime.utcnow()
))
else:
components.append(ComponentHealth(
name="simulation_execution",
status=HealthStatus.DEGRADED,
message="Simulation execution returned unexpected results",
last_check=datetime.utcnow()
))
if overall_status == HealthStatus.HEALTHY:
overall_status = HealthStatus.DEGRADED
finally:
Path(temp_path).unlink(missing_ok=True)
except Exception as e:
components.append(ComponentHealth(
name="simulation_execution",
status=HealthStatus.UNHEALTHY,
message=f"Simulation execution error: {str(e)}",
last_check=datetime.utcnow()
))
overall_status = HealthStatus.UNHEALTHY
# Check models directory
try:
from ..config import settings
models_dir = Path(settings.models_dir)
if models_dir.exists():
model_count = len(list(models_dir.glob("*/simulate.py")))
components.append(ComponentHealth(
name="models_directory",
status=HealthStatus.HEALTHY,
message=f"Models directory accessible, {model_count} models found",
last_check=datetime.utcnow()
))
else:
components.append(ComponentHealth(
name="models_directory",
status=HealthStatus.DEGRADED,
message="Models directory not found",
last_check=datetime.utcnow()
))
if overall_status == HealthStatus.HEALTHY:
overall_status = HealthStatus.DEGRADED
except Exception as e:
components.append(ComponentHealth(
name="models_directory",
status=HealthStatus.UNHEALTHY,
message=f"Models directory error: {str(e)}",
last_check=datetime.utcnow()
))
overall_status = HealthStatus.UNHEALTHY
return HealthResponse(
status=overall_status,
components=components
)
@router.post("/test", response_model=TestResponse, summary="Run system tests")
async def run_test(request: TestRequest, db = Depends(get_database)):
"""
Run specific system tests.
- **test_type**: Type of test to run (simulation, database, reasoning, etc.)
- **parameters**: Optional test parameters
Returns test results and performance metrics.
"""
start_time = time.time()
try:
if request.test_type == "simulation":
return await _test_simulation(request.parameters)
elif request.test_type == "database":
return await _test_database(db, request.parameters)
elif request.test_type == "reasoning":
return await _test_reasoning(db, request.parameters)
elif request.test_type == "end_to_end":
return await _test_end_to_end(db, request.parameters)
else:
raise HTTPException(
status_code=400,
detail=f"Unknown test type: {request.test_type}"
)
except Exception as e:
execution_time = time.time() - start_time
return TestResponse(
test_type=request.test_type,
success=False,
message=f"Test failed: {str(e)}",
execution_time=execution_time
)
async def _test_simulation(parameters: Dict[str, Any]) -> TestResponse:
"""Test simulation execution."""
start_time = time.time()
# Create test script
test_script = '''
def simulate(x=1, y=2):
import math
result = x * y + math.sqrt(x + y)
return {
"product": x * y,
"sum": x + y,
"result": result
}
'''
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(test_script)
temp_path = f.name
try:
runner = SimulationRunner()
test_params = parameters.get("params", {"x": 3, "y": 4})
result = runner.run(Path(temp_path), test_params)
execution_time = time.time() - start_time
success = result.get("_ok", False)
details = {
"input_params": test_params,
"output_keys": list(result.keys()),
"execution_time_s": result.get("_duration_s", 0),
"stdout_length": len(result.get("_stdout", "")),
"stderr_length": len(result.get("_stderr", ""))
}
if success:
expected_product = test_params["x"] * test_params["y"]
actual_product = result.get("product")
if actual_product != expected_product:
success = False
details["validation_error"] = f"Expected product {expected_product}, got {actual_product}"
return TestResponse(
test_type="simulation",
success=success,
message="Simulation test completed" if success else "Simulation test failed",
details=details,
execution_time=execution_time
)
finally:
Path(temp_path).unlink(missing_ok=True)
async def _test_database(db, parameters: Dict[str, Any]) -> TestResponse:
"""Test database operations."""
start_time = time.time()
details = {}
try:
# Test database connection
with db.config.get_sqlite_connection() as conn:
# Test basic query
result = conn.execute("SELECT COUNT(*) as count FROM simulations").fetchone()
details["models_count"] = result["count"] if result else 0
# Test results table
result = conn.execute("SELECT COUNT(*) as count FROM results").fetchone()
details["results_count"] = result["count"] if result else 0
# Test reasoning table
result = conn.execute("SELECT COUNT(*) as count FROM reasoning_agent").fetchone()
details["conversations_count"] = result["count"] if result else 0
# Test write operation (if requested)
if parameters.get("test_write", False):
test_id = f"test_{int(time.time())}"
conn.execute("""
INSERT INTO simulations (id, name, metadata, script_path)
VALUES (?, ?, ?, ?)
""", (test_id, "test_model", "{}", "/tmp/test.py"))
# Clean up
conn.execute("DELETE FROM simulations WHERE id = ?", (test_id,))
details["write_test"] = "passed"
execution_time = time.time() - start_time
return TestResponse(
test_type="database",
success=True,
message="Database test completed successfully",
details=details,
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
return TestResponse(
test_type="database",
success=False,
message=f"Database test failed: {str(e)}",
details=details,
execution_time=execution_time
)
async def _test_reasoning(db, parameters: Dict[str, Any]) -> TestResponse:
"""Test reasoning agent (mock test)."""
start_time = time.time()
# For now, just test that we can create a reasoning agent
# In a real test, we'd need a valid model with results
try:
details = {
"reasoning_agent_class": "ReasoningAgent",
"test_mode": "mock",
"note": "Full reasoning test requires valid model with simulation results"
}
# Check if we have any models to test with
models = db.simulation_repository.list()
if models:
details["available_models"] = len(models)
details["sample_model"] = models[0].get("id") if models else None
else:
details["available_models"] = 0
execution_time = time.time() - start_time
return TestResponse(
test_type="reasoning",
success=True,
message="Reasoning test completed (mock mode)",
details=details,
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
return TestResponse(
test_type="reasoning",
success=False,
message=f"Reasoning test failed: {str(e)}",
execution_time=execution_time
)
async def _test_end_to_end(db, parameters: Dict[str, Any]) -> TestResponse:
"""Test complete end-to-end workflow."""
start_time = time.time()
details = {}
try:
# 1. Create and store a test model
test_script = '''
def simulate(amplitude=1.0, frequency=1.0, phase=0.0):
import math
import numpy as np
t = np.linspace(0, 2*math.pi, 100)
signal = amplitude * np.sin(frequency * t + phase)
return {
"max_value": float(np.max(signal)),
"min_value": float(np.min(signal)),
"mean_value": float(np.mean(signal)),
"signal_length": len(signal)
}
'''
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(test_script)
temp_path = f.name
try:
# Store model
from db import store_simulation_script
model_id = store_simulation_script(
model_name="e2e_test_model",
metadata={"description": "End-to-end test model", "test": True},
script_path=temp_path
)
details["model_created"] = model_id
# 2. Run simulation
runner = SimulationRunner()
result = runner.run(Path(temp_path), {"amplitude": 2.0, "frequency": 0.5})
details["simulation_success"] = result.get("_ok", False)
details["simulation_results"] = {k: v for k, v in result.items() if not k.startswith("_")}
# 3. Store results
if result.get("_ok", False):
from db import store_simulation_results
store_simulation_results(model_id, [result], ["amplitude", "frequency"])
details["results_stored"] = True
# 4. Test reasoning (mock)
details["reasoning_available"] = True
# 5. Clean up
with db.config.get_sqlite_connection() as conn:
conn.execute("DELETE FROM results WHERE model_id = ?", (model_id,))
conn.execute("DELETE FROM simulations WHERE id = ?", (model_id,))
details["cleanup_completed"] = True
finally:
Path(temp_path).unlink(missing_ok=True)
execution_time = time.time() - start_time
success = all([
details.get("model_created"),
details.get("simulation_success", False),
details.get("results_stored", False),
details.get("cleanup_completed", False)
])
return TestResponse(
test_type="end_to_end",
success=success,
message="End-to-end test completed" if success else "End-to-end test had failures",
details=details,
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
return TestResponse(
test_type="end_to_end",
success=False,
message=f"End-to-end test failed: {str(e)}",
details=details,
execution_time=execution_time
)