from datetime import date, datetime from pathlib import Path import logging from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import FileResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel from app.core.database import get_db from app.core.deps import get_current_user from app.models.report import ReportTemplate, ReportTask from app.models.user import User from app.services.report_generator import ReportGenerator, REPORTS_DIR from app.services.audit import log_audit logger = logging.getLogger(__name__) router = APIRouter(prefix="/reports", tags=["报表管理"]) class TemplateCreate(BaseModel): name: str report_type: str description: str | None = None fields: list[dict] filters: dict | None = None aggregation: str = "sum" time_granularity: str = "hour" class TaskCreate(BaseModel): template_id: int name: str | None = None schedule: str | None = None recipients: list[str] | None = None export_format: str = "xlsx" class QuickReportRequest(BaseModel): report_type: str # daily, monthly, device_status, alarm, carbon export_format: str = "xlsx" start_date: date | None = None end_date: date | None = None month: int | None = None year: int | None = None device_ids: list[int] | None = None # ------------------------------------------------------------------ # # Templates # ------------------------------------------------------------------ # @router.get("/templates") async def list_templates(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(ReportTemplate).order_by(ReportTemplate.id)) return [{ "id": t.id, "name": t.name, "report_type": t.report_type, "description": t.description, "fields": t.fields, "is_system": t.is_system, "aggregation": t.aggregation, "time_granularity": t.time_granularity, } for t in result.scalars().all()] @router.post("/templates") async def create_template(data: TemplateCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): template = ReportTemplate(**data.model_dump(), created_by=user.id) db.add(template) await db.flush() return {"id": template.id, "name": template.name} # ------------------------------------------------------------------ # # Tasks CRUD # ------------------------------------------------------------------ # @router.get("/tasks") async def list_tasks(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(ReportTask).order_by(ReportTask.id.desc())) return [{ "id": t.id, "template_id": t.template_id, "name": t.name, "schedule": t.schedule, "status": t.status, "export_format": t.export_format, "file_path": t.file_path, "last_run": str(t.last_run) if t.last_run else None, } for t in result.scalars().all()] @router.post("/tasks") async def create_task(data: TaskCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): task = ReportTask(**data.model_dump(), created_by=user.id) db.add(task) await db.flush() return {"id": task.id} # ------------------------------------------------------------------ # # Run / Status / Download # ------------------------------------------------------------------ # REPORT_TYPE_METHODS = { "daily": "generate_energy_daily_report", "monthly": "generate_monthly_summary", "device_status": "generate_device_status_report", "alarm": "generate_alarm_report", "carbon": "generate_carbon_report", } def _parse_date(val, default: date) -> date: if not val: return default if isinstance(val, date): return val try: return date.fromisoformat(str(val)) except (ValueError, TypeError): return default @router.post("/tasks/{task_id}/run") async def run_task( task_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute(select(ReportTask).where(ReportTask.id == task_id)) task = result.scalar_one_or_none() if not task: raise HTTPException(status_code=404, detail="任务不存在") # Try Celery first try: from app.core.config import get_settings from app.tasks.report_tasks import CELERY_AVAILABLE if CELERY_AVAILABLE and get_settings().CELERY_ENABLED: task.status = "running" await db.flush() from app.tasks.report_tasks import generate_report_task generate_report_task.delay(task_id) return {"message": "报表生成任务已提交(异步)", "task_id": task.id, "mode": "async"} except Exception: pass # Inline async generation (avoids event loop issues with BackgroundTasks) task.status = "running" await db.flush() template = (await db.execute( select(ReportTemplate).where(ReportTemplate.id == task.template_id) )).scalar_one_or_none() if not template: task.status = "failed" await db.flush() raise HTTPException(status_code=400, detail=f"模板 {task.template_id} 不存在") filters = template.filters or {} today = date.today() start_date = _parse_date(filters.get("start_date"), default=today.replace(day=1)) end_date = _parse_date(filters.get("end_date"), default=today) device_ids = filters.get("device_ids") export_format = task.export_format or "xlsx" report_type = template.report_type method_name = REPORT_TYPE_METHODS.get(report_type) if not method_name: task.status = "failed" await db.flush() raise HTTPException(status_code=400, detail=f"未知报表类型: {report_type}") try: gen = ReportGenerator(db) method = getattr(gen, method_name) if report_type == "monthly": month = filters.get("month", today.month) year = filters.get("year", today.year) filepath = await method(month=month, year=year, export_format=export_format) elif report_type == "device_status": filepath = await method(export_format=export_format) else: kwargs = {"start_date": start_date, "end_date": end_date, "export_format": export_format} if device_ids and report_type == "daily": kwargs["device_ids"] = device_ids filepath = await method(**kwargs) task.status = "completed" task.file_path = filepath task.last_run = datetime.now() await db.flush() await log_audit(db, user.id, "export", "report", detail=f"运行报表任务 #{task_id}") logger.info(f"Report task {task_id} completed: {filepath}") return {"message": "报表生成完成", "task_id": task.id, "mode": "sync", "status": "completed"} except Exception as e: logger.error(f"Report task {task_id} failed: {e}") task.status = "failed" await db.flush() raise HTTPException(status_code=500, detail=f"报表生成失败: {str(e)}") @router.get("/tasks/{task_id}/status") async def task_status( task_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute(select(ReportTask).where(ReportTask.id == task_id)) task = result.scalar_one_or_none() if not task: raise HTTPException(status_code=404, detail="任务不存在") return { "id": task.id, "status": task.status, "file_path": task.file_path, "last_run": str(task.last_run) if task.last_run else None, } @router.get("/tasks/{task_id}/download") async def download_report( task_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute(select(ReportTask).where(ReportTask.id == task_id)) task = result.scalar_one_or_none() if not task: raise HTTPException(status_code=404, detail="任务不存在") if task.status != "completed" or not task.file_path: raise HTTPException(status_code=400, detail="报表尚未生成完成") if not Path(task.file_path).exists(): raise HTTPException(status_code=404, detail="报表文件不存在") filename = Path(task.file_path).name media_type = ( "application/pdf" if filename.endswith(".pdf") else "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) return FileResponse(task.file_path, filename=filename, media_type=media_type) # ------------------------------------------------------------------ # # Quick report (synchronous, no task record needed) # ------------------------------------------------------------------ # @router.post("/generate") async def generate_quick_report( req: QuickReportRequest, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """Generate a report synchronously and return the download URL. Useful for demo and quick one-off reports without creating a task record. """ gen = ReportGenerator(db) today = date.today() try: if req.report_type == "daily": filepath = await gen.generate_energy_daily_report( start_date=req.start_date or today.replace(day=1), end_date=req.end_date or today, device_ids=req.device_ids, export_format=req.export_format, ) elif req.report_type == "monthly": filepath = await gen.generate_monthly_summary( month=req.month or today.month, year=req.year or today.year, export_format=req.export_format, ) elif req.report_type == "device_status": filepath = await gen.generate_device_status_report( export_format=req.export_format, ) elif req.report_type == "alarm": filepath = await gen.generate_alarm_report( start_date=req.start_date or today.replace(day=1), end_date=req.end_date or today, export_format=req.export_format, ) elif req.report_type == "carbon": filepath = await gen.generate_carbon_report( start_date=req.start_date or today.replace(day=1), end_date=req.end_date or today, export_format=req.export_format, ) else: raise HTTPException(status_code=400, detail=f"未知的报表类型: {req.report_type}") except Exception as e: raise HTTPException(status_code=500, detail=f"报表生成失败: {str(e)}") filename = Path(filepath).name await log_audit(db, user.id, "export", "report", detail=f"生成报表: {req.report_type} ({req.export_format})") return { "message": "报表生成成功", "filename": filename, "download_url": f"/api/v1/reports/download/{filename}", } @router.get("/download/{filename}") async def download_by_filename( filename: str, user: User = Depends(get_current_user), ): """Download a generated report file by filename.""" filepath = REPORTS_DIR / filename if not filepath.exists(): raise HTTPException(status_code=404, detail="文件不存在") # Prevent path traversal if not filepath.resolve().parent == REPORTS_DIR.resolve(): raise HTTPException(status_code=400, detail="非法文件路径") media_type = ( "application/pdf" if filename.endswith(".pdf") else "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ) return FileResponse(str(filepath), filename=filename, media_type=media_type)