- FastAPI 应用框架 - SQLAlchemy 异步数据库 - 事件采集和分析 API - Alembic 数据库迁移 - Docker 部署配置 - 完整的项目文档
53 lines
1.2 KiB
Python
53 lines
1.2 KiB
Python
"""
|
|
健康检查接口
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import text
|
|
|
|
from src.config.database import get_db
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/health")
|
|
async def health_check():
|
|
"""健康检查"""
|
|
return {
|
|
"status": "healthy",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"version": "0.1.0",
|
|
}
|
|
|
|
|
|
@router.get("/health/db")
|
|
async def database_health(db: AsyncSession = Depends(get_db)):
|
|
"""数据库健康检查"""
|
|
try:
|
|
await db.execute(text("SELECT 1"))
|
|
return {
|
|
"status": "healthy",
|
|
"database": "connected",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"status": "unhealthy",
|
|
"database": "disconnected",
|
|
"error": str(e),
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@router.get("/health/ready")
|
|
async def readiness_check(db: AsyncSession = Depends(get_db)):
|
|
"""就绪检查"""
|
|
try:
|
|
await db.execute(text("SELECT 1"))
|
|
return {"status": "ready"}
|
|
except Exception:
|
|
return {"status": "not_ready"}
|
|
|