"""AI运维智能体 API - 设备健康、异常检测、诊断、预测维护、洞察""" from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ from datetime import datetime, timezone, timedelta from pydantic import BaseModel from app.core.database import get_db from app.core.deps import get_current_user from app.models.user import User from app.models.device import Device from app.models.ai_ops import ( DeviceHealthScore, AnomalyDetection, DiagnosticReport, MaintenancePrediction, OpsInsight, ) from app.services.ai_ops import ( calculate_device_health, scan_anomalies, run_diagnostics, generate_maintenance_predictions, generate_insights, get_dashboard_data, ) router = APIRouter(prefix="/ai-ops", tags=["AI运维智能体"]) # ── Device Health ─────────────────────────────────────────────────── @router.get("/health") async def get_all_health( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取所有设备最新健康评分""" subq = ( select( DeviceHealthScore.device_id, func.max(DeviceHealthScore.timestamp).label("max_ts"), ).group_by(DeviceHealthScore.device_id).subquery() ) result = await db.execute( select(DeviceHealthScore).join( subq, and_( DeviceHealthScore.device_id == subq.c.device_id, DeviceHealthScore.timestamp == subq.c.max_ts, ) ) ) scores = result.scalars().all() # Get device info device_ids = [s.device_id for s in scores] dev_map = {} if device_ids: dev_result = await db.execute( select(Device.id, Device.name, Device.device_type, Device.code) .where(Device.id.in_(device_ids)) ) dev_map = {r.id: {"name": r.name, "type": r.device_type, "code": r.code} for r in dev_result.all()} return [{ "device_id": s.device_id, "device_name": dev_map.get(s.device_id, {}).get("name", f"#{s.device_id}"), "device_type": dev_map.get(s.device_id, {}).get("type", "unknown"), "device_code": dev_map.get(s.device_id, {}).get("code", ""), "health_score": s.health_score, "status": s.status, "trend": s.trend, "factors": s.factors, "timestamp": str(s.timestamp), } for s in scores] @router.get("/health/{device_id}") async def get_device_health( device_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取单设备健康详情""" result = await db.execute( select(DeviceHealthScore).where( DeviceHealthScore.device_id == device_id ).order_by(DeviceHealthScore.timestamp.desc()).limit(1) ) score = result.scalar_one_or_none() if not score: raise HTTPException(status_code=404, detail="暂无该设备健康数据") dev_result = await db.execute(select(Device).where(Device.id == device_id)) device = dev_result.scalar_one_or_none() return { "device_id": score.device_id, "device_name": device.name if device else f"#{device_id}", "device_type": device.device_type if device else "unknown", "health_score": score.health_score, "status": score.status, "trend": score.trend, "factors": score.factors, "timestamp": str(score.timestamp), } @router.get("/health/{device_id}/history") async def get_health_history( device_id: int, days: int = Query(7, ge=1, le=90), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取设备健康评分历史""" cutoff = datetime.now(timezone.utc) - timedelta(days=days) result = await db.execute( select(DeviceHealthScore).where(and_( DeviceHealthScore.device_id == device_id, DeviceHealthScore.timestamp >= cutoff, )).order_by(DeviceHealthScore.timestamp.asc()) ) scores = result.scalars().all() return [{ "timestamp": str(s.timestamp), "health_score": s.health_score, "status": s.status, "trend": s.trend, "factors": s.factors, } for s in scores] @router.post("/health/calculate") async def trigger_health_calculation( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """触发全部设备健康评分计算""" result = await db.execute(select(Device).where(Device.is_active == True)) devices = result.scalars().all() scores = [] for device in devices: try: score = await calculate_device_health(db, device) scores.append({ "device_id": score.device_id, "health_score": score.health_score, "status": score.status, }) except Exception as e: scores.append({"device_id": device.id, "error": str(e)}) return {"calculated": len(scores), "results": scores} # ── Anomaly Detection ─────────────────────────────────────────────── @router.get("/anomalies") async def list_anomalies( device_id: int | None = None, severity: str | None = None, status: str | None = None, days: int = Query(7, ge=1, le=90), page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """列出异常检测记录""" cutoff = datetime.now(timezone.utc) - timedelta(days=days) query = select(AnomalyDetection).where(AnomalyDetection.detected_at >= cutoff) if device_id: query = query.where(AnomalyDetection.device_id == device_id) if severity: query = query.where(AnomalyDetection.severity == severity) if status: query = query.where(AnomalyDetection.status == status) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(AnomalyDetection.detected_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) anomalies = result.scalars().all() # Get device names dev_ids = list(set(a.device_id for a in anomalies)) dev_map = {} if dev_ids: dev_result = await db.execute(select(Device.id, Device.name).where(Device.id.in_(dev_ids))) dev_map = {r.id: r.name for r in dev_result.all()} return { "total": total, "items": [{ "id": a.id, "device_id": a.device_id, "device_name": dev_map.get(a.device_id, f"#{a.device_id}"), "detected_at": str(a.detected_at), "anomaly_type": a.anomaly_type, "severity": a.severity, "description": a.description, "metric_name": a.metric_name, "expected_value": a.expected_value, "actual_value": a.actual_value, "deviation_percent": a.deviation_percent, "status": a.status, "resolution_notes": a.resolution_notes, } for a in anomalies], } @router.get("/anomalies/{device_id}") async def get_device_anomalies( device_id: int, days: int = Query(7, ge=1, le=90), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取设备异常记录""" cutoff = datetime.now(timezone.utc) - timedelta(days=days) result = await db.execute( select(AnomalyDetection).where(and_( AnomalyDetection.device_id == device_id, AnomalyDetection.detected_at >= cutoff, )).order_by(AnomalyDetection.detected_at.desc()) ) anomalies = result.scalars().all() return [{ "id": a.id, "detected_at": str(a.detected_at), "anomaly_type": a.anomaly_type, "severity": a.severity, "description": a.description, "metric_name": a.metric_name, "expected_value": a.expected_value, "actual_value": a.actual_value, "deviation_percent": a.deviation_percent, "status": a.status, } for a in anomalies] class AnomalyStatusUpdate(BaseModel): status: str # investigating, resolved, false_positive resolution_notes: str | None = None @router.put("/anomalies/{anomaly_id}/status") async def update_anomaly_status( anomaly_id: int, data: AnomalyStatusUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """更新异常状态""" result = await db.execute(select(AnomalyDetection).where(AnomalyDetection.id == anomaly_id)) anomaly = result.scalar_one_or_none() if not anomaly: raise HTTPException(status_code=404, detail="异常记录不存在") anomaly.status = data.status if data.resolution_notes: anomaly.resolution_notes = data.resolution_notes return {"message": "已更新", "id": anomaly.id, "status": anomaly.status} @router.post("/anomalies/scan") async def trigger_anomaly_scan( device_id: int | None = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """触发异常扫描""" anomalies = await scan_anomalies(db, device_id) return { "scanned_at": str(datetime.now(timezone.utc)), "anomalies_found": len(anomalies), "anomalies": [{ "device_id": a.device_id, "anomaly_type": a.anomaly_type, "severity": a.severity, "description": a.description, } for a in anomalies], } # ── Diagnostics ───────────────────────────────────────────────────── @router.get("/diagnostics") async def list_diagnostics( device_id: int | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """列出诊断报告""" query = select(DiagnosticReport) if device_id: query = query.where(DiagnosticReport.device_id == device_id) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(DiagnosticReport.generated_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) reports = result.scalars().all() dev_ids = list(set(r.device_id for r in reports)) dev_map = {} if dev_ids: dev_result = await db.execute(select(Device.id, Device.name).where(Device.id.in_(dev_ids))) dev_map = {r.id: r.name for r in dev_result.all()} return { "total": total, "items": [{ "id": r.id, "device_id": r.device_id, "device_name": dev_map.get(r.device_id, f"#{r.device_id}"), "generated_at": str(r.generated_at), "report_type": r.report_type, "findings": r.findings, "recommendations": r.recommendations, "estimated_impact": r.estimated_impact, "status": r.status, } for r in reports], } @router.post("/diagnostics/{device_id}/run") async def trigger_diagnostics( device_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """对指定设备运行诊断""" try: report = await run_diagnostics(db, device_id) return { "id": report.id, "device_id": report.device_id, "report_type": report.report_type, "findings": report.findings, "recommendations": report.recommendations, "estimated_impact": report.estimated_impact, "status": report.status, "generated_at": str(report.generated_at), } except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) @router.get("/diagnostics/{report_id}") async def get_diagnostic_detail( report_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取诊断报告详情""" result = await db.execute(select(DiagnosticReport).where(DiagnosticReport.id == report_id)) report = result.scalar_one_or_none() if not report: raise HTTPException(status_code=404, detail="诊断报告不存在") dev_result = await db.execute(select(Device.name).where(Device.id == report.device_id)) device_name = dev_result.scalar() or f"#{report.device_id}" return { "id": report.id, "device_id": report.device_id, "device_name": device_name, "generated_at": str(report.generated_at), "report_type": report.report_type, "findings": report.findings, "recommendations": report.recommendations, "estimated_impact": report.estimated_impact, "status": report.status, } # ── Predictive Maintenance ────────────────────────────────────────── @router.get("/maintenance/predictions") async def list_predictions( status: str | None = None, urgency: str | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """列出维护预测""" query = select(MaintenancePrediction) if status: query = query.where(MaintenancePrediction.status == status) if urgency: query = query.where(MaintenancePrediction.urgency == urgency) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(MaintenancePrediction.predicted_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) predictions = result.scalars().all() dev_ids = list(set(p.device_id for p in predictions)) dev_map = {} if dev_ids: dev_result = await db.execute(select(Device.id, Device.name).where(Device.id.in_(dev_ids))) dev_map = {r.id: r.name for r in dev_result.all()} return { "total": total, "items": [{ "id": p.id, "device_id": p.device_id, "device_name": dev_map.get(p.device_id, f"#{p.device_id}"), "predicted_at": str(p.predicted_at), "component": p.component, "failure_mode": p.failure_mode, "probability": p.probability, "predicted_failure_date": str(p.predicted_failure_date) if p.predicted_failure_date else None, "recommended_action": p.recommended_action, "urgency": p.urgency, "estimated_downtime_hours": p.estimated_downtime_hours, "estimated_repair_cost": p.estimated_repair_cost, "status": p.status, } for p in predictions], } @router.get("/maintenance/schedule") async def get_maintenance_schedule( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取推荐维护计划""" result = await db.execute( select(MaintenancePrediction).where( MaintenancePrediction.status.in_(["predicted", "scheduled"]) ).order_by(MaintenancePrediction.predicted_failure_date.asc()) ) predictions = result.scalars().all() dev_ids = list(set(p.device_id for p in predictions)) dev_map = {} if dev_ids: dev_result = await db.execute(select(Device.id, Device.name).where(Device.id.in_(dev_ids))) dev_map = {r.id: r.name for r in dev_result.all()} return [{ "id": p.id, "device_id": p.device_id, "device_name": dev_map.get(p.device_id, f"#{p.device_id}"), "component": p.component, "failure_mode": p.failure_mode, "probability": p.probability, "predicted_failure_date": str(p.predicted_failure_date) if p.predicted_failure_date else None, "recommended_action": p.recommended_action, "urgency": p.urgency, "estimated_downtime_hours": p.estimated_downtime_hours, "estimated_repair_cost": p.estimated_repair_cost, "status": p.status, } for p in predictions] class PredictionStatusUpdate(BaseModel): status: str # scheduled, completed, false_alarm @router.put("/maintenance/predictions/{prediction_id}") async def update_prediction( prediction_id: int, data: PredictionStatusUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """更新预测状态""" result = await db.execute(select(MaintenancePrediction).where(MaintenancePrediction.id == prediction_id)) pred = result.scalar_one_or_none() if not pred: raise HTTPException(status_code=404, detail="预测记录不存在") pred.status = data.status return {"message": "已更新", "id": pred.id, "status": pred.status} @router.post("/maintenance/predict") async def trigger_predictions( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """触发维护预测生成""" predictions = await generate_maintenance_predictions(db) return { "generated": len(predictions), "predictions": [{ "device_id": p.device_id, "component": p.component, "urgency": p.urgency, "probability": p.probability, } for p in predictions], } # ── Insights ──────────────────────────────────────────────────────── @router.get("/insights") async def list_insights( insight_type: str | None = None, impact_level: str | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """列出运营洞察""" query = select(OpsInsight) if insight_type: query = query.where(OpsInsight.insight_type == insight_type) if impact_level: query = query.where(OpsInsight.impact_level == impact_level) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(OpsInsight.generated_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) insights = result.scalars().all() return { "total": total, "items": [{ "id": i.id, "insight_type": i.insight_type, "title": i.title, "description": i.description, "data": i.data, "impact_level": i.impact_level, "actionable": i.actionable, "recommended_action": i.recommended_action, "generated_at": str(i.generated_at), "valid_until": str(i.valid_until) if i.valid_until else None, } for i in insights], } @router.get("/insights/latest") async def get_latest_insights( limit: int = Query(5, ge=1, le=20), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取最新洞察""" now = datetime.now(timezone.utc) result = await db.execute( select(OpsInsight).where( OpsInsight.valid_until >= now ).order_by(OpsInsight.generated_at.desc()).limit(limit) ) insights = result.scalars().all() return [{ "id": i.id, "insight_type": i.insight_type, "title": i.title, "description": i.description, "impact_level": i.impact_level, "actionable": i.actionable, "recommended_action": i.recommended_action, "generated_at": str(i.generated_at), } for i in insights] @router.post("/insights/generate") async def trigger_insights( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """触发洞察生成""" insights = await generate_insights(db) return { "generated": len(insights), "insights": [{ "title": i.title, "insight_type": i.insight_type, "impact_level": i.impact_level, } for i in insights], } # ── Dashboard ─────────────────────────────────────────────────────── @router.get("/dashboard") async def ai_ops_dashboard( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """AI运维总览仪表盘""" return await get_dashboard_data(db)