""" 报表生成服务 - PDF/Excel report generation for Tianpu EMS. """ import os import io from datetime import datetime, date, timedelta from pathlib import Path from typing import Any from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.models.device import Device from app.models.energy import EnergyDailySummary from app.models.alarm import AlarmEvent from app.models.carbon import CarbonEmission REPORTS_DIR = Path(__file__).resolve().parent.parent.parent / "reports" REPORTS_DIR.mkdir(exist_ok=True) PLATFORM_TITLE = "天普零碳园区智慧能源管理平台" ENERGY_TYPE_LABELS = { "electricity": "电力", "heat": "热能", "water": "水", "gas": "天然气", } DEVICE_STATUS_LABELS = { "online": "在线", "offline": "离线", "alarm": "告警", "maintenance": "维护中", } SEVERITY_LABELS = { "critical": "紧急", "major": "重要", "warning": "一般", } def _register_chinese_font(): """Register a Chinese font for ReportLab PDF generation.""" from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont font_paths = [ "C:/Windows/Fonts/simsun.ttc", "C:/Windows/Fonts/simhei.ttf", "C:/Windows/Fonts/msyh.ttc", "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", "/System/Library/Fonts/PingFang.ttc", ] for fp in font_paths: if os.path.exists(fp): try: pdfmetrics.registerFont(TTFont("ChineseFont", fp)) return "ChineseFont" except Exception: continue return "Helvetica" class ReportGenerator: """Generates PDF and Excel reports from EMS data.""" def __init__(self, db: AsyncSession): self.db = db # ------------------------------------------------------------------ # # Data fetching helpers # ------------------------------------------------------------------ # async def _fetch_energy_daily( self, start_date: date, end_date: date, device_ids: list[int] | None = None ) -> list[dict]: q = select(EnergyDailySummary).where( and_( func.date(EnergyDailySummary.date) >= start_date, func.date(EnergyDailySummary.date) <= end_date, ) ) if device_ids: q = q.where(EnergyDailySummary.device_id.in_(device_ids)) q = q.order_by(EnergyDailySummary.date) result = await self.db.execute(q) rows = result.scalars().all() return [ { "date": str(r.date.date()) if r.date else "", "device_id": r.device_id, "energy_type": ENERGY_TYPE_LABELS.get(r.energy_type, r.energy_type), "total_consumption": round(r.total_consumption or 0, 2), "total_generation": round(r.total_generation or 0, 2), "peak_power": round(r.peak_power or 0, 2), "avg_power": round(r.avg_power or 0, 2), "operating_hours": round(r.operating_hours or 0, 1), "cost": round(r.cost or 0, 2), "carbon_emission": round(r.carbon_emission or 0, 2), } for r in rows ] async def _fetch_devices(self) -> list[dict]: result = await self.db.execute( select(Device).where(Device.is_active == True).order_by(Device.id) ) return [ { "id": d.id, "name": d.name, "code": d.code, "device_type": d.device_type, "status": DEVICE_STATUS_LABELS.get(d.status, d.status), "rated_power": d.rated_power or 0, "location": d.location or "", "last_data_time": str(d.last_data_time) if d.last_data_time else "N/A", } for d in result.scalars().all() ] async def _fetch_alarms(self, start_date: date, end_date: date) -> list[dict]: q = select(AlarmEvent).where( and_( func.date(AlarmEvent.triggered_at) >= start_date, func.date(AlarmEvent.triggered_at) <= end_date, ) ).order_by(AlarmEvent.triggered_at.desc()) result = await self.db.execute(q) return [ { "id": a.id, "device_id": a.device_id, "severity": SEVERITY_LABELS.get(a.severity, a.severity), "title": a.title, "description": a.description or "", "value": a.value, "threshold": a.threshold, "status": a.status, "triggered_at": str(a.triggered_at) if a.triggered_at else "", "resolved_at": str(a.resolved_at) if a.resolved_at else "", } for a in result.scalars().all() ] async def _fetch_carbon(self, start_date: date, end_date: date) -> list[dict]: q = select(CarbonEmission).where( and_( func.date(CarbonEmission.date) >= start_date, func.date(CarbonEmission.date) <= end_date, ) ).order_by(CarbonEmission.date) result = await self.db.execute(q) return [ { "date": str(c.date.date()) if c.date else "", "scope": c.scope, "category": c.category, "emission": round(c.emission or 0, 2), "reduction": round(c.reduction or 0, 2), "energy_consumption": round(c.energy_consumption or 0, 2), "energy_unit": c.energy_unit or "", } for c in result.scalars().all() ] # ------------------------------------------------------------------ # # Public generation methods # ------------------------------------------------------------------ # async def generate_energy_daily_report( self, start_date: date, end_date: date, device_ids: list[int] | None = None, export_format: str = "xlsx", ) -> str: data = await self._fetch_energy_daily(start_date, end_date, device_ids) title = "每日能耗报表" date_range_str = f"{start_date} ~ {end_date}" summary = self._compute_energy_summary(data) headers = ["日期", "设备ID", "能源类型", "消耗量", "产出量", "峰值功率(kW)", "平均功率(kW)", "运行时长(h)", "费用(元)", "碳排放(kgCO₂)"] table_keys = ["date", "device_id", "energy_type", "total_consumption", "total_generation", "peak_power", "avg_power", "operating_hours", "cost", "carbon_emission"] filename = f"energy_daily_{start_date}_{end_date}_{datetime.now().strftime('%H%M%S')}" if export_format == "pdf": return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename) else: return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename) async def generate_monthly_summary( self, month: int, year: int, export_format: str = "xlsx" ) -> str: start = date(year, month, 1) if month == 12: end = date(year + 1, 1, 1) - timedelta(days=1) else: end = date(year, month + 1, 1) - timedelta(days=1) data = await self._fetch_energy_daily(start, end) title = f"{year}年{month}月能耗月报" date_range_str = f"{start} ~ {end}" summary = self._compute_energy_summary(data) headers = ["日期", "设备ID", "能源类型", "消耗量", "产出量", "峰值功率(kW)", "平均功率(kW)", "运行时长(h)", "费用(元)", "碳排放(kgCO₂)"] table_keys = ["date", "device_id", "energy_type", "total_consumption", "total_generation", "peak_power", "avg_power", "operating_hours", "cost", "carbon_emission"] filename = f"monthly_summary_{year}_{month:02d}" if export_format == "pdf": return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename) else: return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename) async def generate_device_status_report(self, export_format: str = "xlsx") -> str: data = await self._fetch_devices() title = "设备状态报表" date_range_str = f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}" summary = self._compute_device_summary(data) headers = ["设备ID", "设备名称", "设备编号", "设备类型", "状态", "额定功率(kW)", "位置", "最近数据时间"] table_keys = ["id", "name", "code", "device_type", "status", "rated_power", "location", "last_data_time"] filename = f"device_status_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if export_format == "pdf": return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename) else: return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename) async def generate_alarm_report( self, start_date: date, end_date: date, export_format: str = "xlsx" ) -> str: data = await self._fetch_alarms(start_date, end_date) title = "告警分析报表" date_range_str = f"{start_date} ~ {end_date}" summary = self._compute_alarm_summary(data) headers = ["告警ID", "设备ID", "严重程度", "标题", "描述", "触发值", "阈值", "状态", "触发时间", "解决时间"] table_keys = ["id", "device_id", "severity", "title", "description", "value", "threshold", "status", "triggered_at", "resolved_at"] filename = f"alarm_report_{start_date}_{end_date}" if export_format == "pdf": return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename) else: return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename) async def generate_carbon_report( self, start_date: date, end_date: date, export_format: str = "xlsx" ) -> str: data = await self._fetch_carbon(start_date, end_date) title = "碳排放分析报表" date_range_str = f"{start_date} ~ {end_date}" summary = self._compute_carbon_summary(data) headers = ["日期", "范围", "类别", "排放量(kgCO₂e)", "减排量(kgCO₂e)", "能耗", "单位"] table_keys = ["date", "scope", "category", "emission", "reduction", "energy_consumption", "energy_unit"] filename = f"carbon_report_{start_date}_{end_date}" if export_format == "pdf": return self._generate_pdf(title, date_range_str, summary, headers, table_keys, data, filename) else: return self._generate_excel(title, date_range_str, summary, headers, table_keys, data, filename) # ------------------------------------------------------------------ # # Summary computation helpers # ------------------------------------------------------------------ # @staticmethod def _compute_energy_summary(data: list[dict]) -> list[tuple[str, str]]: total_consumption = sum(r["total_consumption"] for r in data) total_generation = sum(r["total_generation"] for r in data) total_cost = sum(r["cost"] for r in data) total_carbon = sum(r["carbon_emission"] for r in data) return [ ("数据条数", str(len(data))), ("总消耗量", f"{total_consumption:,.2f}"), ("总产出量", f"{total_generation:,.2f}"), ("总费用(元)", f"{total_cost:,.2f}"), ("总碳排放(kgCO₂)", f"{total_carbon:,.2f}"), ] @staticmethod def _compute_device_summary(data: list[dict]) -> list[tuple[str, str]]: total = len(data) online = sum(1 for d in data if d["status"] == "在线") offline = sum(1 for d in data if d["status"] == "离线") alarm = sum(1 for d in data if d["status"] == "告警") return [ ("设备总数", str(total)), ("在线", str(online)), ("离线", str(offline)), ("告警", str(alarm)), ] @staticmethod def _compute_alarm_summary(data: list[dict]) -> list[tuple[str, str]]: total = len(data) critical = sum(1 for a in data if a["severity"] == "紧急") major = sum(1 for a in data if a["severity"] == "重要") resolved = sum(1 for a in data if a["status"] == "resolved") return [ ("告警总数", str(total)), ("紧急", str(critical)), ("重要", str(major)), ("已解决", str(resolved)), ] @staticmethod def _compute_carbon_summary(data: list[dict]) -> list[tuple[str, str]]: total_emission = sum(r["emission"] for r in data) total_reduction = sum(r["reduction"] for r in data) net = total_emission - total_reduction return [ ("数据条数", str(len(data))), ("总排放(kgCO₂e)", f"{total_emission:,.2f}"), ("总减排(kgCO₂e)", f"{total_reduction:,.2f}"), ("净排放(kgCO₂e)", f"{net:,.2f}"), ] # ------------------------------------------------------------------ # # PDF generation (ReportLab) # ------------------------------------------------------------------ # def _generate_pdf( self, title: str, date_range_str: str, summary: list[tuple[str, str]], headers: list[str], table_keys: list[str], data: list[dict], filename: str, ) -> str: from reportlab.lib.pagesizes import A4 from reportlab.lib import colors from reportlab.lib.styles import ParagraphStyle from reportlab.lib.units import mm from reportlab.platypus import ( SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer ) font_name = _register_chinese_font() filepath = str(REPORTS_DIR / f"{filename}.pdf") doc = SimpleDocTemplate( filepath, pagesize=A4, topMargin=20 * mm, bottomMargin=20 * mm, leftMargin=15 * mm, rightMargin=15 * mm, ) title_style = ParagraphStyle( "Title", fontName=font_name, fontSize=16, alignment=1, spaceAfter=6, ) subtitle_style = ParagraphStyle( "Subtitle", fontName=font_name, fontSize=10, alignment=1, textColor=colors.grey, spaceAfter=4, ) section_style = ParagraphStyle( "Section", fontName=font_name, fontSize=12, spaceBefore=12, spaceAfter=6, ) normal_style = ParagraphStyle( "Normal", fontName=font_name, fontSize=9, ) elements: list[Any] = [] # Header elements.append(Paragraph(PLATFORM_TITLE, subtitle_style)) elements.append(Paragraph(title, title_style)) elements.append(Paragraph(date_range_str, subtitle_style)) elements.append(Spacer(1, 8 * mm)) # Summary section elements.append(Paragraph("概要", section_style)) summary_data = [[Paragraph(k, normal_style), Paragraph(v, normal_style)] for k, v in summary] summary_table = Table(summary_data, colWidths=[120, 200]) summary_table.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (0, -1), colors.Color(0.94, 0.94, 0.94)), ("GRID", (0, 0), (-1, -1), 0.5, colors.grey), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("LEFTPADDING", (0, 0), (-1, -1), 6), ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 4), ("BOTTOMPADDING", (0, 0), (-1, -1), 4), ])) elements.append(summary_table) elements.append(Spacer(1, 8 * mm)) # Detail table elements.append(Paragraph("明细数据", section_style)) if data: page_width = A4[0] - 30 * mm col_width = page_width / len(headers) header_row = [Paragraph(h, ParagraphStyle("H", fontName=font_name, fontSize=7, alignment=1)) for h in headers] table_data = [header_row] cell_style = ParagraphStyle("Cell", fontName=font_name, fontSize=7) for row in data[:500]: # limit rows for PDF table_data.append([Paragraph(str(row.get(k, "")), cell_style) for k in table_keys]) detail_table = Table(table_data, colWidths=[col_width] * len(headers), repeatRows=1) detail_table.setStyle(TableStyle([ ("BACKGROUND", (0, 0), (-1, 0), colors.Color(0.2, 0.4, 0.7)), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("GRID", (0, 0), (-1, -1), 0.5, colors.lightgrey), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.Color(0.96, 0.96, 0.96)]), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("LEFTPADDING", (0, 0), (-1, -1), 3), ("RIGHTPADDING", (0, 0), (-1, -1), 3), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING", (0, 0), (-1, -1), 3), ])) elements.append(detail_table) if len(data) > 500: elements.append(Spacer(1, 4 * mm)) elements.append(Paragraph(f"(共 {len(data)} 条记录,PDF 仅显示前500条)", normal_style)) else: elements.append(Paragraph("暂无数据", normal_style)) # Footer elements.append(Spacer(1, 10 * mm)) footer_style = ParagraphStyle("Footer", fontName=font_name, fontSize=8, textColor=colors.grey, alignment=2) elements.append(Paragraph(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", footer_style)) doc.build(elements) return filepath # ------------------------------------------------------------------ # # Excel generation (OpenPyXL) # ------------------------------------------------------------------ # def _generate_excel( self, title: str, date_range_str: str, summary: list[tuple[str, str]], headers: list[str], table_keys: list[str], data: list[dict], filename: str, ) -> str: from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side filepath = str(REPORTS_DIR / f"{filename}.xlsx") wb = Workbook() header_font = Font(bold=True, color="FFFFFF", size=11) header_fill = PatternFill(start_color="336699", end_color="336699", fill_type="solid") header_align = Alignment(horizontal="center", vertical="center", wrap_text=True) thin_border = Border( left=Side(style="thin", color="CCCCCC"), right=Side(style="thin", color="CCCCCC"), top=Side(style="thin", color="CCCCCC"), bottom=Side(style="thin", color="CCCCCC"), ) title_font = Font(bold=True, size=14) subtitle_font = Font(size=10, color="666666") summary_key_fill = PatternFill(start_color="F0F0F0", end_color="F0F0F0", fill_type="solid") # --- Summary sheet --- ws_summary = wb.active ws_summary.title = "概要" ws_summary.append([PLATFORM_TITLE]) ws_summary.merge_cells("A1:D1") ws_summary["A1"].font = title_font ws_summary.append([title]) ws_summary.merge_cells("A2:D2") ws_summary["A2"].font = Font(bold=True, size=12) ws_summary.append([date_range_str]) ws_summary.merge_cells("A3:D3") ws_summary["A3"].font = subtitle_font ws_summary.append([]) ws_summary.append(["指标", "值"]) ws_summary["A5"].font = Font(bold=True) ws_summary["B5"].font = Font(bold=True) for label, value in summary: row = ws_summary.max_row + 1 ws_summary.append([label, value]) ws_summary.cell(row=row, column=1).fill = summary_key_fill ws_summary.column_dimensions["A"].width = 25 ws_summary.column_dimensions["B"].width = 30 # --- Detail sheet --- ws_detail = wb.create_sheet("明细数据") # Header row for col_idx, h in enumerate(headers, 1): cell = ws_detail.cell(row=1, column=col_idx, value=h) cell.font = header_font cell.fill = header_fill cell.alignment = header_align cell.border = thin_border # Data rows for row_idx, row_data in enumerate(data, 2): for col_idx, key in enumerate(table_keys, 1): val = row_data.get(key, "") cell = ws_detail.cell(row=row_idx, column=col_idx, value=val) cell.border = thin_border if isinstance(val, float): cell.number_format = "#,##0.00" cell.alignment = Alignment(vertical="center") # Auto-width columns for col_idx in range(1, len(headers) + 1): max_len = len(str(headers[col_idx - 1])) for row_idx in range(2, min(len(data) + 2, 102)): val = ws_detail.cell(row=row_idx, column=col_idx).value if val: max_len = max(max_len, len(str(val))) ws_detail.column_dimensions[ws_detail.cell(row=1, column=col_idx).column_letter].width = min(max_len + 4, 40) # Auto-filter if data: ws_detail.auto_filter.ref = f"A1:{ws_detail.cell(row=1, column=len(headers)).column_letter}{len(data) + 1}" # Freeze header ws_detail.freeze_panes = "A2" wb.save(filepath) return filepath