System Management: - System Settings page with 8 configurable parameters (admin only) - Audit Log page with filterable table (user, action, resource, date range) - Audit logging wired into auth, devices, users, alarms, reports API handlers - SystemSetting model + migration (002) Device Detail: - Dedicated /devices/:id page with 4 tabs (realtime, historical trends, alarm history, device info) - ECharts historical charts with granularity/time range selectors - Device name clickable in Devices and Monitoring tables → navigates to detail Email & Scheduling: - Email service with SMTP support (STARTTLS/SSL/plain) - Alarm email notification with professional HTML template - Report scheduler using APScheduler for cron-based auto-generation - Scheduled report task seeded (daily at 8am) UI Enhancements: - Dark mode toggle (persisted to localStorage, Ant Design darkAlgorithm) - Data comparison view in Analysis page (dual date range, side-by-side metrics) - i18n framework (i18next) with zh/en translations for menu and common UI - Language switcher in header (中文/English) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
193 lines
7.3 KiB
Python
193 lines
7.3 KiB
Python
"""报表定时调度服务 - Schedule report tasks via APScheduler and send results by email."""
|
|
import logging
|
|
from datetime import date, timedelta, datetime, timezone
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from sqlalchemy import select
|
|
|
|
from app.core.database import async_session
|
|
from app.models.report import ReportTask, ReportTemplate
|
|
from app.services.report_generator import ReportGenerator
|
|
from app.services.email_service import send_email
|
|
|
|
logger = logging.getLogger("report_scheduler")
|
|
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
|
|
|
|
def _parse_cron(cron_expr: str) -> dict:
|
|
"""Parse a 5-field cron expression into APScheduler CronTrigger kwargs."""
|
|
parts = cron_expr.strip().split()
|
|
if len(parts) != 5:
|
|
raise ValueError(f"Invalid cron expression (need 5 fields): {cron_expr}")
|
|
return {
|
|
"minute": parts[0],
|
|
"hour": parts[1],
|
|
"day": parts[2],
|
|
"month": parts[3],
|
|
"day_of_week": parts[4],
|
|
}
|
|
|
|
|
|
async def _run_report_task(task_id: int):
|
|
"""Execute a single report task: generate the report and email it to recipients."""
|
|
logger.info(f"Running scheduled report task id={task_id}")
|
|
|
|
async with async_session() as session:
|
|
# Load task
|
|
task_result = await session.execute(
|
|
select(ReportTask).where(ReportTask.id == task_id)
|
|
)
|
|
task = task_result.scalar_one_or_none()
|
|
if not task:
|
|
logger.warning(f"Report task id={task_id} not found, skipping.")
|
|
return
|
|
if not task.is_active:
|
|
logger.info(f"Report task id={task_id} is inactive, skipping.")
|
|
return
|
|
|
|
# Update status
|
|
task.status = "running"
|
|
task.last_run = datetime.now(timezone.utc)
|
|
await session.flush()
|
|
|
|
# Load template to determine report type
|
|
tmpl_result = await session.execute(
|
|
select(ReportTemplate).where(ReportTemplate.id == task.template_id)
|
|
)
|
|
template = tmpl_result.scalar_one_or_none()
|
|
if not template:
|
|
logger.error(f"Template id={task.template_id} not found for task id={task_id}")
|
|
task.status = "failed"
|
|
await session.commit()
|
|
return
|
|
|
|
try:
|
|
generator = ReportGenerator(session)
|
|
today = date.today()
|
|
export_format = task.export_format or "xlsx"
|
|
|
|
# Choose generation method based on template report_type
|
|
if template.report_type == "daily":
|
|
yesterday = today - timedelta(days=1)
|
|
filepath = await generator.generate_energy_daily_report(
|
|
start_date=yesterday, end_date=yesterday, export_format=export_format
|
|
)
|
|
elif template.report_type == "monthly":
|
|
# Generate for previous month
|
|
first_of_month = today.replace(day=1)
|
|
last_month_end = first_of_month - timedelta(days=1)
|
|
last_month_start = last_month_end.replace(day=1)
|
|
filepath = await generator.generate_monthly_summary(
|
|
month=last_month_start.month,
|
|
year=last_month_start.year,
|
|
export_format=export_format,
|
|
)
|
|
elif template.report_type == "custom" and "device" in template.name.lower():
|
|
filepath = await generator.generate_device_status_report(
|
|
export_format=export_format
|
|
)
|
|
else:
|
|
# Default: daily report for yesterday
|
|
yesterday = today - timedelta(days=1)
|
|
filepath = await generator.generate_energy_daily_report(
|
|
start_date=yesterday, end_date=yesterday, export_format=export_format
|
|
)
|
|
|
|
task.file_path = filepath
|
|
task.status = "completed"
|
|
logger.info(f"Report task id={task_id} completed: {filepath}")
|
|
|
|
# Send email with attachment if recipients configured
|
|
recipients = task.recipients or []
|
|
if isinstance(recipients, list) and recipients:
|
|
report_name = task.name or template.name
|
|
subject = f"{report_name} - 天普EMS自动报表"
|
|
body_html = f"""
|
|
<div style="font-family: 'Microsoft YaHei', sans-serif; padding: 20px;">
|
|
<h2 style="color: #1a73e8;">天普零碳园区智慧能源管理平台</h2>
|
|
<p>您好,</p>
|
|
<p>系统已自动生成 <strong>{report_name}</strong>,请查收附件。</p>
|
|
<p style="color: #666; font-size: 13px;">
|
|
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}<br>
|
|
报表类型: {template.report_type}<br>
|
|
格式: {export_format.upper()}
|
|
</p>
|
|
<hr style="border: none; border-top: 1px solid #e8e8e8; margin: 20px 0;">
|
|
<p style="font-size: 12px; color: #999;">此为系统自动发送,请勿回复。</p>
|
|
</div>
|
|
"""
|
|
await send_email(
|
|
to=recipients,
|
|
subject=subject,
|
|
body_html=body_html,
|
|
attachments=[filepath],
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Report task id={task_id} failed: {e}", exc_info=True)
|
|
task.status = "failed"
|
|
|
|
await session.commit()
|
|
|
|
|
|
async def _load_and_schedule_tasks():
|
|
"""Load all active report tasks with schedules and register them with APScheduler."""
|
|
global _scheduler
|
|
if not _scheduler:
|
|
return
|
|
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(ReportTask).where(
|
|
ReportTask.is_active == True,
|
|
ReportTask.schedule != None,
|
|
ReportTask.schedule != "",
|
|
)
|
|
)
|
|
tasks = result.scalars().all()
|
|
|
|
for task in tasks:
|
|
try:
|
|
cron_kwargs = _parse_cron(task.schedule)
|
|
_scheduler.add_job(
|
|
_run_report_task,
|
|
CronTrigger(**cron_kwargs),
|
|
args=[task.id],
|
|
id=f"report_task_{task.id}",
|
|
replace_existing=True,
|
|
misfire_grace_time=3600,
|
|
)
|
|
logger.info(
|
|
f"Scheduled report task id={task.id} name='{task.name}' "
|
|
f"cron='{task.schedule}'"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule report task id={task.id}: {e}")
|
|
|
|
logger.info(f"Report scheduler loaded {len(tasks)} task(s).")
|
|
|
|
|
|
async def start_scheduler():
|
|
"""Start the APScheduler-based report scheduler."""
|
|
global _scheduler
|
|
if _scheduler and _scheduler.running:
|
|
logger.warning("Report scheduler is already running.")
|
|
return
|
|
|
|
_scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
|
|
_scheduler.start()
|
|
logger.info("Report scheduler started.")
|
|
|
|
await _load_and_schedule_tasks()
|
|
|
|
|
|
async def stop_scheduler():
|
|
"""Stop the report scheduler gracefully."""
|
|
global _scheduler
|
|
if _scheduler and _scheduler.running:
|
|
_scheduler.shutdown(wait=False)
|
|
logger.info("Report scheduler stopped.")
|
|
_scheduler = None
|