from fastapi import APIRouter, Depends, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from datetime import datetime from app.core.database import get_db from app.core.deps import get_current_user, require_roles from app.models.user import User, AuditLog router = APIRouter(prefix="/audit", tags=["审计日志"]) @router.get("/logs") async def list_audit_logs( user_id: int | None = None, action: str | None = None, resource: str | None = None, start_time: str | None = None, end_time: str | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), current_user: User = Depends(require_roles("admin", "energy_manager")), ): """Return paginated audit logs with optional filters.""" query = select( AuditLog.id, AuditLog.user_id, User.username, AuditLog.action, AuditLog.resource, AuditLog.detail, AuditLog.ip_address, AuditLog.created_at, ).outerjoin(User, AuditLog.user_id == User.id) if user_id is not None: query = query.where(AuditLog.user_id == user_id) if action: query = query.where(AuditLog.action == action) if resource: query = query.where(AuditLog.resource == resource) if start_time: try: st = datetime.fromisoformat(start_time) query = query.where(AuditLog.created_at >= st) except ValueError: pass if end_time: try: et = datetime.fromisoformat(end_time) query = query.where(AuditLog.created_at <= et) except ValueError: pass # Count count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() # Paginate query = query.order_by(AuditLog.created_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) items = [] for row in result.all(): items.append({ "id": row.id, "user_id": row.user_id, "username": row.username or "-", "action": row.action, "resource": row.resource, "detail": row.detail, "ip_address": row.ip_address, "created_at": str(row.created_at) if row.created_at else None, }) return {"total": total, "items": items}