Files
zpark-ems/backend/app/services/report_generator.py
Du Wenbo 026c837b91 Squashed 'core/' content from commit 92ec910
git-subtree-dir: core
git-subtree-split: 92ec910a132e379a3a6e442a75bcb07cac0f0010
2026-04-04 18:17:10 +08:00

524 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
报表生成服务 - PDF/Excel report generation for Tianpu EMS.
"""
import os
import io
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import Any
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.device import Device
from app.models.energy import EnergyDailySummary
from app.models.alarm import AlarmEvent
from app.models.carbon import CarbonEmission
REPORTS_DIR = Path(__file__).resolve().parent.parent.parent / "reports"
REPORTS_DIR.mkdir(exist_ok=True)
PLATFORM_TITLE = "天普零碳园区智慧能源管理平台"
ENERGY_TYPE_LABELS = {
"electricity": "电力",
"heat": "热能",
"water": "",
"gas": "天然气",
}
DEVICE_STATUS_LABELS = {
"online": "在线",
"offline": "离线",
"alarm": "告警",
"maintenance": "维护中",
}
SEVERITY_LABELS = {
"critical": "紧急",
"major": "重要",
"warning": "一般",
}
def _register_chinese_font():
"""Register a Chinese font for ReportLab PDF generation."""
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
font_paths = [
"C:/Windows/Fonts/simsun.ttc",
"C:/Windows/Fonts/simhei.ttf",
"C:/Windows/Fonts/msyh.ttc",
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/System/Library/Fonts/PingFang.ttc",
]
for fp in font_paths:
if os.path.exists(fp):
try:
pdfmetrics.registerFont(TTFont("ChineseFont", fp))
return "ChineseFont"
except Exception:
continue
return "Helvetica"
class ReportGenerator:
"""Generates PDF and Excel reports from EMS data."""
def __init__(self, db: AsyncSession):
self.db = db
# ------------------------------------------------------------------ #
# Data fetching helpers
# ------------------------------------------------------------------ #
async def _fetch_energy_daily(
self, start_date: date, end_date: date, device_ids: list[int] | None = None
) -> list[dict]:
q = select(EnergyDailySummary).where(
and_(
func.date(EnergyDailySummary.date) >= start_date,
func.date(EnergyDailySummary.date) <= end_date,
)
)
if device_ids:
q = q.where(EnergyDailySummary.device_id.in_(device_ids))
q = q.order_by(EnergyDailySummary.date)
result = await self.db.execute(q)
rows = result.scalars().all()
return [
{
"date": str(r.date.date()) if r.date else "",
"device_id": r.device_id,
"energy_type": ENERGY_TYPE_LABELS.get(r.energy_type, r.energy_type),
"total_consumption": round(r.total_consumption or 0, 2),
"total_generation": round(r.total_generation or 0, 2),
"peak_power": round(r.peak_power or 0, 2),
"avg_power": round(r.avg_power or 0, 2),
"operating_hours": round(r.operating_hours or 0, 1),
"cost": round(r.cost or 0, 2),
"carbon_emission": round(r.carbon_emission or 0, 2),
}
for r in rows
]
async def _fetch_devices(self) -> list[dict]:
result = await self.db.execute(
select(Device).where(Device.is_active == True).order_by(Device.id)
)
return [
{
"id": d.id,
"name": d.name,
"code": d.code,
"device_type": d.device_type,
"status": DEVICE_STATUS_LABELS.get(d.status, d.status),
"rated_power": d.rated_power or 0,
"location": d.location or "",
"last_data_time": str(d.last_data_time) if d.last_data_time else "N/A",
}
for d in result.scalars().all()
]
async def _fetch_alarms(self, start_date: date, end_date: date) -> list[dict]:
q = select(AlarmEvent).where(
and_(
func.date(AlarmEvent.triggered_at) >= start_date,
func.date(AlarmEvent.triggered_at) <= end_date,
)
).order_by(AlarmEvent.triggered_at.desc())
result = await self.db.execute(q)
return [
{
"id": a.id,
"device_id": a.device_id,
"severity": SEVERITY_LABELS.get(a.severity, a.severity),
"title": a.title,
"description": a.description or "",
"value": a.value,
"threshold": a.threshold,
"status": a.status,
"triggered_at": str(a.triggered_at) if a.triggered_at else "",
"resolved_at": str(a.resolved_at) if a.resolved_at else "",
}
for a in result.scalars().all()
]
async def _fetch_carbon(self, start_date: date, end_date: date) -> list[dict]:
q = select(CarbonEmission).where(
and_(
func.date(CarbonEmission.date) >= start_date,
func.date(CarbonEmission.date) <= end_date,
)
).order_by(CarbonEmission.date)
result = await self.db.execute(q)
return [
{
"date": str(c.date.date()) if c.date else "",
"scope": c.scope,
"category": c.category,
"emission": round(c.emission or 0, 2),
"reduction": round(c.reduction or 0, 2),
"energy_consumption": round(c.energy_consumption or 0, 2),
"energy_unit": c.energy_unit or "",
}
for c in result.scalars().all()
]
# ------------------------------------------------------------------ #
# Public generation methods
# ------------------------------------------------------------------ #
async def generate_energy_daily_report(
self,
start_date: date,
end_date: date,
device_ids: list[int] | None = None,
export_format: str = "xlsx",
) -> str:
data = await self._fetch_energy_daily(start_date, end_date, device_ids)
title = "每日能耗报表"
date_range_str = f"{start_date} ~ {end_date}"
summary = self._compute_energy_summary(data)
headers = ["日期", "设备ID", "能源类型", "消耗量", "产出量", "峰值功率(kW)", "平均功率(kW)", "运行时长(h)", "费用(元)", "碳排放(kgCO₂)"]
table_keys = ["date", "device_id", "energy_type", "total_consumption", "total_generation", "peak_power", "avg_power", "operating_hours", "cost", "carbon_emission"]
filename = f"energy_daily_{start_date}_{end_date}_{datetime.now().strftime('%H%M%S')}"
if export_format == "pdf":
return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename)
else:
return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename)
async def generate_monthly_summary(
self, month: int, year: int, export_format: str = "xlsx"
) -> str:
start = date(year, month, 1)
if month == 12:
end = date(year + 1, 1, 1) - timedelta(days=1)
else:
end = date(year, month + 1, 1) - timedelta(days=1)
data = await self._fetch_energy_daily(start, end)
title = f"{year}{month}月能耗月报"
date_range_str = f"{start} ~ {end}"
summary = self._compute_energy_summary(data)
headers = ["日期", "设备ID", "能源类型", "消耗量", "产出量", "峰值功率(kW)", "平均功率(kW)", "运行时长(h)", "费用(元)", "碳排放(kgCO₂)"]
table_keys = ["date", "device_id", "energy_type", "total_consumption", "total_generation", "peak_power", "avg_power", "operating_hours", "cost", "carbon_emission"]
filename = f"monthly_summary_{year}_{month:02d}"
if export_format == "pdf":
return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename)
else:
return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename)
async def generate_device_status_report(self, export_format: str = "xlsx") -> str:
data = await self._fetch_devices()
title = "设备状态报表"
date_range_str = f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}"
summary = self._compute_device_summary(data)
headers = ["设备ID", "设备名称", "设备编号", "设备类型", "状态", "额定功率(kW)", "位置", "最近数据时间"]
table_keys = ["id", "name", "code", "device_type", "status", "rated_power", "location", "last_data_time"]
filename = f"device_status_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if export_format == "pdf":
return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename)
else:
return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename)
async def generate_alarm_report(
self, start_date: date, end_date: date, export_format: str = "xlsx"
) -> str:
data = await self._fetch_alarms(start_date, end_date)
title = "告警分析报表"
date_range_str = f"{start_date} ~ {end_date}"
summary = self._compute_alarm_summary(data)
headers = ["告警ID", "设备ID", "严重程度", "标题", "描述", "触发值", "阈值", "状态", "触发时间", "解决时间"]
table_keys = ["id", "device_id", "severity", "title", "description", "value", "threshold", "status", "triggered_at", "resolved_at"]
filename = f"alarm_report_{start_date}_{end_date}"
if export_format == "pdf":
return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename)
else:
return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename)
async def generate_carbon_report(
self, start_date: date, end_date: date, export_format: str = "xlsx"
) -> str:
data = await self._fetch_carbon(start_date, end_date)
title = "碳排放分析报表"
date_range_str = f"{start_date} ~ {end_date}"
summary = self._compute_carbon_summary(data)
headers = ["日期", "范围", "类别", "排放量(kgCO₂e)", "减排量(kgCO₂e)", "能耗", "单位"]
table_keys = ["date", "scope", "category", "emission", "reduction", "energy_consumption", "energy_unit"]
filename = f"carbon_report_{start_date}_{end_date}"
if export_format == "pdf":
return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename)
else:
return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename)
# ------------------------------------------------------------------ #
# Summary computation helpers
# ------------------------------------------------------------------ #
@staticmethod
def _compute_energy_summary(data: list[dict]) -> list[tuple[str, str]]:
total_consumption = sum(r["total_consumption"] for r in data)
total_generation = sum(r["total_generation"] for r in data)
total_cost = sum(r["cost"] for r in data)
total_carbon = sum(r["carbon_emission"] for r in data)
return [
("数据条数", str(len(data))),
("总消耗量", f"{total_consumption:,.2f}"),
("总产出量", f"{total_generation:,.2f}"),
("总费用(元)", f"{total_cost:,.2f}"),
("总碳排放(kgCO₂)", f"{total_carbon:,.2f}"),
]
@staticmethod
def _compute_device_summary(data: list[dict]) -> list[tuple[str, str]]:
total = len(data)
online = sum(1 for d in data if d["status"] == "在线")
offline = sum(1 for d in data if d["status"] == "离线")
alarm = sum(1 for d in data if d["status"] == "告警")
return [
("设备总数", str(total)),
("在线", str(online)),
("离线", str(offline)),
("告警", str(alarm)),
]
@staticmethod
def _compute_alarm_summary(data: list[dict]) -> list[tuple[str, str]]:
total = len(data)
critical = sum(1 for a in data if a["severity"] == "紧急")
major = sum(1 for a in data if a["severity"] == "重要")
resolved = sum(1 for a in data if a["status"] == "resolved")
return [
("告警总数", str(total)),
("紧急", str(critical)),
("重要", str(major)),
("已解决", str(resolved)),
]
@staticmethod
def _compute_carbon_summary(data: list[dict]) -> list[tuple[str, str]]:
total_emission = sum(r["emission"] for r in data)
total_reduction = sum(r["reduction"] for r in data)
net = total_emission - total_reduction
return [
("数据条数", str(len(data))),
("总排放(kgCO₂e)", f"{total_emission:,.2f}"),
("总减排(kgCO₂e)", f"{total_reduction:,.2f}"),
("净排放(kgCO₂e)", f"{net:,.2f}"),
]
# ------------------------------------------------------------------ #
# PDF generation (ReportLab)
# ------------------------------------------------------------------ #
def _generate_pdf(
self,
title: str,
date_range_str: str,
summary: list[tuple[str, str]],
headers: list[str],
table_keys: list[str],
data: list[dict],
filename: str,
) -> str:
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib.units import mm
from reportlab.platypus import (
SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
)
font_name = _register_chinese_font()
filepath = str(REPORTS_DIR / f"{filename}.pdf")
doc = SimpleDocTemplate(
filepath, pagesize=A4,
topMargin=20 * mm, bottomMargin=20 * mm,
leftMargin=15 * mm, rightMargin=15 * mm,
)
title_style = ParagraphStyle(
"Title", fontName=font_name, fontSize=16, alignment=1, spaceAfter=6,
)
subtitle_style = ParagraphStyle(
"Subtitle", fontName=font_name, fontSize=10, alignment=1,
textColor=colors.grey, spaceAfter=4,
)
section_style = ParagraphStyle(
"Section", fontName=font_name, fontSize=12, spaceBefore=12, spaceAfter=6,
)
normal_style = ParagraphStyle(
"Normal", fontName=font_name, fontSize=9,
)
elements: list[Any] = []
# Header
elements.append(Paragraph(PLATFORM_TITLE, subtitle_style))
elements.append(Paragraph(title, title_style))
elements.append(Paragraph(date_range_str, subtitle_style))
elements.append(Spacer(1, 8 * mm))
# Summary section
elements.append(Paragraph("概要", section_style))
summary_data = [[Paragraph(k, normal_style), Paragraph(v, normal_style)] for k, v in summary]
summary_table = Table(summary_data, colWidths=[120, 200])
summary_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (0, -1), colors.Color(0.94, 0.94, 0.94)),
("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 4),
("BOTTOMPADDING", (0, 0), (-1, -1), 4),
]))
elements.append(summary_table)
elements.append(Spacer(1, 8 * mm))
# Detail table
elements.append(Paragraph("明细数据", section_style))
if data:
page_width = A4[0] - 30 * mm
col_width = page_width / len(headers)
header_row = [Paragraph(h, ParagraphStyle("H", fontName=font_name, fontSize=7, alignment=1)) for h in headers]
table_data = [header_row]
cell_style = ParagraphStyle("Cell", fontName=font_name, fontSize=7)
for row in data[:500]: # limit rows for PDF
table_data.append([Paragraph(str(row.get(k, "")), cell_style) for k in table_keys])
detail_table = Table(table_data, colWidths=[col_width] * len(headers), repeatRows=1)
detail_table.setStyle(TableStyle([
("BACKGROUND", (0, 0), (-1, 0), colors.Color(0.2, 0.4, 0.7)),
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey),
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.Color(0.96, 0.96, 0.96)]),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("LEFTPADDING", (0, 0), (-1, -1), 3),
("RIGHTPADDING", (0, 0), (-1, -1), 3),
("TOPPADDING", (0, 0), (-1, -1), 3),
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
]))
elements.append(detail_table)
if len(data) > 500:
elements.append(Spacer(1, 4 * mm))
elements.append(Paragraph(f"(共 {len(data)} 条记录PDF 仅显示前500条)", normal_style))
else:
elements.append(Paragraph("暂无数据", normal_style))
# Footer
elements.append(Spacer(1, 10 * mm))
footer_style = ParagraphStyle("Footer", fontName=font_name, fontSize=8, textColor=colors.grey, alignment=2)
elements.append(Paragraph(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", footer_style))
doc.build(elements)
return filepath
# ------------------------------------------------------------------ #
# Excel generation (OpenPyXL)
# ------------------------------------------------------------------ #
def _generate_excel(
self,
title: str,
date_range_str: str,
summary: list[tuple[str, str]],
headers: list[str],
table_keys: list[str],
data: list[dict],
filename: str,
) -> str:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
filepath = str(REPORTS_DIR / f"{filename}.xlsx")
wb = Workbook()
header_font = Font(bold=True, color="FFFFFF", size=11)
header_fill = PatternFill(start_color="336699", end_color="336699", fill_type="solid")
header_align = Alignment(horizontal="center", vertical="center", wrap_text=True)
thin_border = Border(
left=Side(style="thin", color="CCCCCC"),
right=Side(style="thin", color="CCCCCC"),
top=Side(style="thin", color="CCCCCC"),
bottom=Side(style="thin", color="CCCCCC"),
)
title_font = Font(bold=True, size=14)
subtitle_font = Font(size=10, color="666666")
summary_key_fill = PatternFill(start_color="F0F0F0", end_color="F0F0F0", fill_type="solid")
# --- Summary sheet ---
ws_summary = wb.active
ws_summary.title = "概要"
ws_summary.append([PLATFORM_TITLE])
ws_summary.merge_cells("A1:D1")
ws_summary["A1"].font = title_font
ws_summary.append([title])
ws_summary.merge_cells("A2:D2")
ws_summary["A2"].font = Font(bold=True, size=12)
ws_summary.append([date_range_str])
ws_summary.merge_cells("A3:D3")
ws_summary["A3"].font = subtitle_font
ws_summary.append([])
ws_summary.append(["指标", ""])
ws_summary["A5"].font = Font(bold=True)
ws_summary["B5"].font = Font(bold=True)
for label, value in summary:
row = ws_summary.max_row + 1
ws_summary.append([label, value])
ws_summary.cell(row=row, column=1).fill = summary_key_fill
ws_summary.column_dimensions["A"].width = 25
ws_summary.column_dimensions["B"].width = 30
# --- Detail sheet ---
ws_detail = wb.create_sheet("明细数据")
# Header row
for col_idx, h in enumerate(headers, 1):
cell = ws_detail.cell(row=1, column=col_idx, value=h)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
cell.border = thin_border
# Data rows
for row_idx, row_data in enumerate(data, 2):
for col_idx, key in enumerate(table_keys, 1):
val = row_data.get(key, "")
cell = ws_detail.cell(row=row_idx, column=col_idx, value=val)
cell.border = thin_border
if isinstance(val, float):
cell.number_format = "#,##0.00"
cell.alignment = Alignment(vertical="center")
# Auto-width columns
for col_idx in range(1, len(headers) + 1):
max_len = len(str(headers[col_idx - 1]))
for row_idx in range(2, min(len(data) + 2, 102)):
val = ws_detail.cell(row=row_idx, column=col_idx).value
if val:
max_len = max(max_len, len(str(val)))
ws_detail.column_dimensions[ws_detail.cell(row=1, column=col_idx).column_letter].width = min(max_len + 4, 40)
# Auto-filter
if data:
ws_detail.auto_filter.ref = f"A1:{ws_detail.cell(row=1, column=len(headers)).column_letter}{len(data) + 1}"
# Freeze header
ws_detail.freeze_panes = "A2"
wb.save(filepath)
return filepath