blob: 0d7262d6d80154064580de99db8a2b49f776ac64 [file]
import asyncio
import threading
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from common.rocketmq.rocketmq_utils import logger
from supervisor_agent.utils.session.session_manager import session_manager
from supervisor_agent.utils.stream.stream_manager import stream_queue_manager
from supervisor_agent.utils.rocketmq.mq_service import init_rocketmq
from supervisor_agent.api.routes import router
def session_cleanup_worker(interval: int = 300):
"""Background thread to periodically clean up expired sessions"""
logger.info(f"Session cleanup worker started (interval: {interval}s)")
while True:
try:
time.sleep(interval)
cleaned_count = session_manager.cleanup_expired_sessions()
if cleaned_count > 0:
logger.info(f"Cleaned up {cleaned_count} expired sessions")
except Exception as e:
logger.error(f"Session cleanup error: {e}", exc_info=True)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle and configure event loop for stream queue manager"""
# Set event loop for stream queue manager
loop = asyncio.get_running_loop()
stream_queue_manager.set_loop(loop)
logger.info("Event loop configured for stream queue manager")
# Start background session cleanup thread
cleanup_thread = threading.Thread(
target=session_cleanup_worker,
kwargs={"interval": 300}, # Check every 5 minutes
daemon=True # Thread will exit when main process exits
)
cleanup_thread.start()
logger.info("Session cleanup background thread started")
yield
# Cleanup on shutdown (optional)
logger.info("Application shutting down...")
app = FastAPI(lifespan=lifespan)
# Configure CORS middleware to allow all origins, methods, and headers
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"]
)
# Include API routes from router module
app.include_router(router)
if __name__ == "__main__":
import uvicorn
# Initialize RocketMQ consumer and producer clients
init_rocketmq()
logger.info("Start supervisor agent successfully")
# Start FastAPI server with uvicorn
uvicorn.run(app, host="localhost", port=8000)