import csv import io from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, Query, HTTPException from fastapi.responses import StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_, text, Integer from sqlalchemy.orm import joinedload from pydantic import BaseModel from app.core.database import get_db from app.core.config import get_settings from app.core.deps import get_current_user from app.models.energy import EnergyData, EnergyDailySummary, EnergyCategory from app.models.device import Device from app.models.user import User from app.core.deps import require_roles router = APIRouter(prefix="/energy", tags=["能耗数据"]) @router.get("/history") async def query_history( device_id: int | None = None, data_type: str = "power", start_time: str | None = None, end_time: str | None = None, granularity: str = Query("hour", pattern="^(raw|5min|hour|day)$"), page: int = Query(1, ge=1), page_size: int = Query(100, ge=1, le=1000), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """历史数据查询""" query = select(EnergyData).where(EnergyData.data_type == data_type) if device_id: query = query.where(EnergyData.device_id == device_id) if start_time: query = query.where(EnergyData.timestamp >= start_time) if end_time: query = query.where(EnergyData.timestamp <= end_time) if granularity == "raw": query = query.order_by(EnergyData.timestamp.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) return [{"timestamp": str(d.timestamp), "value": d.value, "unit": d.unit, "device_id": d.device_id} for d in result.scalars().all()] else: settings = get_settings() if granularity == "5min": if settings.is_sqlite: time_bucket = func.strftime('%Y-%m-%d %H:', EnergyData.timestamp).op('||')( func.printf('%02d:00', (func.cast(func.strftime('%M', EnergyData.timestamp), Integer) / 5) * 5) ).label('time_bucket') else: time_bucket = func.to_timestamp( func.floor(func.extract('epoch', EnergyData.timestamp) / 300) * 300 ).label('time_bucket') elif granularity == "hour": if settings.is_sqlite: time_bucket = func.strftime('%Y-%m-%d %H:00:00', EnergyData.timestamp).label('time_bucket') else: time_bucket = func.date_trunc('hour', EnergyData.timestamp).label('time_bucket') else: # day if settings.is_sqlite: time_bucket = func.strftime('%Y-%m-%d', EnergyData.timestamp).label('time_bucket') else: time_bucket = func.date_trunc('day', EnergyData.timestamp).label('time_bucket') agg_query = select( time_bucket, func.avg(EnergyData.value).label('avg_value'), func.max(EnergyData.value).label('max_value'), func.min(EnergyData.value).label('min_value'), ).where(EnergyData.data_type == data_type) if device_id: agg_query = agg_query.where(EnergyData.device_id == device_id) if start_time: agg_query = agg_query.where(EnergyData.timestamp >= start_time) if end_time: agg_query = agg_query.where(EnergyData.timestamp <= end_time) agg_query = agg_query.group_by(text('time_bucket')).order_by(text('time_bucket')) result = await db.execute(agg_query) return [{"time": str(r[0]), "avg": round(r[1], 2), "max": round(r[2], 2), "min": round(r[3], 2)} for r in result.all()] @router.get("/params") async def query_electrical_params( device_id: int = Query(..., description="设备ID"), params: str = Query("power", description="参数列表(逗号分隔): power,voltage,current,power_factor,temperature,frequency,cop"), start_time: str | None = None, end_time: str | None = None, granularity: str = Query("raw", pattern="^(raw|5min|hour|day)$"), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """电参量查询 - Query multiple electrical parameters for a device""" param_list = [p.strip() for p in params.split(",") if p.strip()] result_data = {} settings = get_settings() for param in param_list: base_filter = and_( EnergyData.device_id == device_id, EnergyData.data_type == param, ) conditions = [base_filter] if start_time: conditions.append(EnergyData.timestamp >= start_time) if end_time: conditions.append(EnergyData.timestamp <= end_time) combined = and_(*conditions) if granularity == "raw": query = ( select(EnergyData.timestamp, EnergyData.value, EnergyData.unit) .where(combined) .order_by(EnergyData.timestamp) .limit(5000) ) rows = await db.execute(query) result_data[param] = [ {"timestamp": str(r[0]), "value": r[1], "unit": r[2]} for r in rows.all() ] else: if granularity == "5min": if settings.is_sqlite: time_bucket = func.strftime('%Y-%m-%d %H:', EnergyData.timestamp).op('||')( func.printf('%02d:00', (func.cast(func.strftime('%M', EnergyData.timestamp), Integer) / 5) * 5) ).label('time_bucket') else: time_bucket = func.to_timestamp( func.floor(func.extract('epoch', EnergyData.timestamp) / 300) * 300 ).label('time_bucket') elif granularity == "hour": if settings.is_sqlite: time_bucket = func.strftime('%Y-%m-%d %H:00:00', EnergyData.timestamp).label('time_bucket') else: time_bucket = func.date_trunc('hour', EnergyData.timestamp).label('time_bucket') else: # day if settings.is_sqlite: time_bucket = func.strftime('%Y-%m-%d', EnergyData.timestamp).label('time_bucket') else: time_bucket = func.date_trunc('day', EnergyData.timestamp).label('time_bucket') agg_query = ( select(time_bucket, func.avg(EnergyData.value).label('avg_value')) .where(combined) .group_by(text('time_bucket')) .order_by(text('time_bucket')) ) rows = await db.execute(agg_query) result_data[param] = [ {"timestamp": str(r[0]), "value": round(r[1], 2)} for r in rows.all() ] return result_data @router.get("/daily-summary") async def daily_summary( start_date: str | None = None, end_date: str | None = None, energy_type: str | None = None, device_id: int | None = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """每日能耗汇总""" query = select(EnergyDailySummary) if start_date: query = query.where(EnergyDailySummary.date >= start_date) if end_date: query = query.where(EnergyDailySummary.date <= end_date) if energy_type: query = query.where(EnergyDailySummary.energy_type == energy_type) if device_id: query = query.where(EnergyDailySummary.device_id == device_id) query = query.order_by(EnergyDailySummary.date.desc()).limit(365) result = await db.execute(query) return [{ "date": str(s.date), "device_id": s.device_id, "energy_type": s.energy_type, "consumption": s.total_consumption, "generation": s.total_generation, "peak_power": s.peak_power, "avg_power": s.avg_power, "operating_hours": s.operating_hours, "cost": s.cost, "carbon_emission": s.carbon_emission, } for s in result.scalars().all()] @router.get("/comparison") async def energy_comparison( device_id: int | None = None, energy_type: str = "electricity", period: str = Query("month", pattern="^(day|week|month|year)$"), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """能耗同比环比分析""" now = datetime.now(timezone.utc) if period == "day": current_start = now.replace(hour=0, minute=0, second=0, microsecond=0) prev_start = current_start - timedelta(days=1) yoy_start = current_start.replace(year=current_start.year - 1) elif period == "week": current_start = now - timedelta(days=now.weekday()) current_start = current_start.replace(hour=0, minute=0, second=0, microsecond=0) prev_start = current_start - timedelta(weeks=1) yoy_start = current_start.replace(year=current_start.year - 1) elif period == "month": current_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) prev_start = (current_start - timedelta(days=1)).replace(day=1) yoy_start = current_start.replace(year=current_start.year - 1) else: current_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) prev_start = current_start.replace(year=current_start.year - 1) yoy_start = prev_start async def sum_consumption(start, end): q = select(func.sum(EnergyDailySummary.total_consumption)).where( and_(EnergyDailySummary.date >= start, EnergyDailySummary.date < end, EnergyDailySummary.energy_type == energy_type) ) if device_id: q = q.where(EnergyDailySummary.device_id == device_id) r = await db.execute(q) return r.scalar() or 0 current = await sum_consumption(current_start, now) previous = await sum_consumption(prev_start, current_start) yoy = await sum_consumption(yoy_start, yoy_start.replace(year=yoy_start.year + 1)) return { "current": round(current, 2), "previous": round(previous, 2), "yoy": round(yoy, 2), "mom_change": round((current - previous) / previous * 100, 1) if previous else 0, "yoy_change": round((current - yoy) / yoy * 100, 1) if yoy else 0, } @router.get("/export") async def export_energy_data( start_time: str = Query(..., description="开始时间, e.g. 2026-03-01"), end_time: str = Query(..., description="结束时间, e.g. 2026-03-31"), device_id: int | None = Query(None, description="设备ID (可选)"), data_type: str | None = Query(None, description="数据类型 (可选, e.g. power, energy)"), format: str = Query("csv", pattern="^(csv|xlsx)$", description="导出格式: csv 或 xlsx"), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """导出能耗数据为CSV或Excel文件""" # Parse date strings to datetime for proper PostgreSQL comparison try: start_dt = datetime.fromisoformat(start_time) except ValueError: start_dt = datetime.strptime(start_time, "%Y-%m-%d") try: end_dt = datetime.fromisoformat(end_time) except ValueError: end_dt = datetime.strptime(end_time, "%Y-%m-%d").replace(hour=23, minute=59, second=59) # If end_time was just a date (no time component), set to end of day if end_dt.hour == 0 and end_dt.minute == 0 and end_dt.second == 0 and "T" not in end_time: end_dt = end_dt.replace(hour=23, minute=59, second=59) # Query energy data with device names query = ( select(EnergyData, Device.name.label("device_name")) .join(Device, EnergyData.device_id == Device.id, isouter=True) .where( and_( EnergyData.timestamp >= start_dt, EnergyData.timestamp <= end_dt, ) ) ) if device_id: query = query.where(EnergyData.device_id == device_id) if data_type: query = query.where(EnergyData.data_type == data_type) query = query.order_by(EnergyData.timestamp) result = await db.execute(query) rows = result.all() headers = ["timestamp", "device_name", "data_type", "value", "unit"] data_rows = [] for row in rows: energy = row[0] # EnergyData object device_name = row[1] or f"Device#{energy.device_id}" data_rows.append([ str(energy.timestamp) if energy.timestamp else "", device_name, energy.data_type or "", energy.value, energy.unit or "", ]) date_suffix = f"{start_time}_{end_time}".replace("-", "") if format == "xlsx": return _export_xlsx(headers, data_rows, f"energy_export_{date_suffix}.xlsx") else: return _export_csv(headers, data_rows, f"energy_export_{date_suffix}.csv") def _export_csv(headers: list[str], rows: list[list], filename: str) -> StreamingResponse: """Generate CSV streaming response.""" output = io.StringIO() # Add BOM for Excel compatibility with Chinese characters output.write('\ufeff') writer = csv.writer(output) writer.writerow(headers) writer.writerows(rows) output.seek(0) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv; charset=utf-8", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) def _export_xlsx(headers: list[str], rows: list[list], filename: str) -> StreamingResponse: """Generate XLSX streaming response.""" from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side wb = Workbook() ws = wb.active ws.title = "能耗数据" header_font = Font(bold=True, color="FFFFFF", size=11) header_fill = PatternFill(start_color="336699", end_color="336699", fill_type="solid") header_align = Alignment(horizontal="center", vertical="center") thin_border = Border( left=Side(style="thin", color="CCCCCC"), right=Side(style="thin", color="CCCCCC"), top=Side(style="thin", color="CCCCCC"), bottom=Side(style="thin", color="CCCCCC"), ) # Write headers for col_idx, h in enumerate(headers, 1): cell = ws.cell(row=1, column=col_idx, value=h) cell.font = header_font cell.fill = header_fill cell.alignment = header_align cell.border = thin_border # Write data for row_idx, row_data in enumerate(rows, 2): for col_idx, val in enumerate(row_data, 1): cell = ws.cell(row=row_idx, column=col_idx, value=val) cell.border = thin_border if isinstance(val, float): cell.number_format = "#,##0.00" # Auto-width for col_idx in range(1, len(headers) + 1): max_len = len(str(headers[col_idx - 1])) for row_idx in range(2, min(len(rows) + 2, 102)): val = ws.cell(row=row_idx, column=col_idx).value if val: max_len = max(max_len, len(str(val))) ws.column_dimensions[ws.cell(row=1, column=col_idx).column_letter].width = min(max_len + 4, 40) ws.freeze_panes = "A2" if rows: ws.auto_filter.ref = f"A1:{ws.cell(row=1, column=len(headers)).column_letter}{len(rows) + 1}" output = io.BytesIO() wb.save(output) output.seek(0) return StreamingResponse( output, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) # ── Energy Category (分项能耗) ────────────────────────────────────── class CategoryCreate(BaseModel): name: str code: str parent_id: int | None = None sort_order: int = 0 icon: str | None = None color: str | None = None def _category_to_dict(c: EnergyCategory) -> dict: return { "id": c.id, "name": c.name, "code": c.code, "parent_id": c.parent_id, "sort_order": c.sort_order, "icon": c.icon, "color": c.color, "created_at": str(c.created_at) if c.created_at else None, } def _build_category_tree(items: list[dict], parent_id: int | None = None) -> list[dict]: tree = [] for item in items: if item["parent_id"] == parent_id: children = _build_category_tree(items, item["id"]) if children: item["children"] = children tree.append(item) return tree @router.get("/categories") async def list_categories( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取能耗分项类别(树结构)""" result = await db.execute( select(EnergyCategory).order_by(EnergyCategory.sort_order, EnergyCategory.id) ) items = [_category_to_dict(c) for c in result.scalars().all()] return _build_category_tree(items) @router.post("/categories") async def create_category( data: CategoryCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): """创建能耗分项类别""" cat = EnergyCategory(**data.model_dump()) db.add(cat) await db.flush() return _category_to_dict(cat) @router.put("/categories/{cat_id}") async def update_category( cat_id: int, data: CategoryCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): """更新能耗分项类别""" result = await db.execute(select(EnergyCategory).where(EnergyCategory.id == cat_id)) cat = result.scalar_one_or_none() if not cat: raise HTTPException(status_code=404, detail="分项类别不存在") for k, v in data.model_dump(exclude_unset=True).items(): setattr(cat, k, v) return _category_to_dict(cat) @router.get("/by-category") async def energy_by_category( start_date: str | None = None, end_date: str | None = None, energy_type: str = "electricity", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """按分项类别统计能耗""" query = ( select( EnergyCategory.id, EnergyCategory.name, EnergyCategory.code, EnergyCategory.color, func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0).label("consumption"), ) .select_from(EnergyCategory) .outerjoin(Device, Device.category_id == EnergyCategory.id) .outerjoin( EnergyDailySummary, and_( EnergyDailySummary.device_id == Device.id, EnergyDailySummary.energy_type == energy_type, ), ) ) if start_date: query = query.where(EnergyDailySummary.date >= start_date) if end_date: query = query.where(EnergyDailySummary.date <= end_date) query = query.group_by(EnergyCategory.id, EnergyCategory.name, EnergyCategory.code, EnergyCategory.color) result = await db.execute(query) rows = result.all() total = sum(r.consumption for r in rows) or 1 return [ { "id": r.id, "name": r.name, "code": r.code, "color": r.color, "consumption": round(r.consumption, 2), "percentage": round(r.consumption / total * 100, 1), } for r in rows ] @router.get("/category-ranking") async def category_ranking( start_date: str | None = None, end_date: str | None = None, energy_type: str = "electricity", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """分项能耗排名""" query = ( select( EnergyCategory.name, EnergyCategory.color, func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0).label("consumption"), ) .select_from(EnergyCategory) .outerjoin(Device, Device.category_id == EnergyCategory.id) .outerjoin( EnergyDailySummary, and_( EnergyDailySummary.device_id == Device.id, EnergyDailySummary.energy_type == energy_type, ), ) ) if start_date: query = query.where(EnergyDailySummary.date >= start_date) if end_date: query = query.where(EnergyDailySummary.date <= end_date) query = query.group_by(EnergyCategory.name, EnergyCategory.color).order_by(text("consumption DESC")) result = await db.execute(query) return [{"name": r.name, "color": r.color, "consumption": round(r.consumption, 2)} for r in result.all()] @router.get("/category-trend") async def category_trend( start_date: str | None = None, end_date: str | None = None, energy_type: str = "electricity", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """分项能耗每日趋势""" query = ( select( EnergyDailySummary.date, EnergyCategory.name, EnergyCategory.color, func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0).label("consumption"), ) .select_from(EnergyDailySummary) .join(Device, EnergyDailySummary.device_id == Device.id) .join(EnergyCategory, Device.category_id == EnergyCategory.id) .where(EnergyDailySummary.energy_type == energy_type) ) if start_date: query = query.where(EnergyDailySummary.date >= start_date) if end_date: query = query.where(EnergyDailySummary.date <= end_date) query = query.group_by(EnergyDailySummary.date, EnergyCategory.name, EnergyCategory.color) query = query.order_by(EnergyDailySummary.date) result = await db.execute(query) return [ {"date": str(r.date), "category": r.name, "color": r.color, "consumption": round(r.consumption, 2)} for r in result.all() ] # ── Loss / YoY / MoM Analysis ───────────────────────────────────── from app.models.device import DeviceGroup @router.get("/loss") async def get_energy_loss( start_date: str, end_date: str, energy_type: str = "electricity", db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """能耗损耗分析 - Compare parent meter vs sum of sub-meters""" # Get all groups that have children groups_result = await db.execute(select(DeviceGroup)) all_groups = groups_result.scalars().all() group_map = {g.id: g for g in all_groups} parent_ids = {g.parent_id for g in all_groups if g.parent_id is not None} results = [] for gid in parent_ids: group = group_map.get(gid) if not group: continue child_group_ids = [g.id for g in all_groups if g.parent_id == gid] # Parent consumption: devices directly in this group parent_q = select(func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0)).select_from( EnergyDailySummary ).join(Device, EnergyDailySummary.device_id == Device.id).where( and_( Device.group_id == gid, EnergyDailySummary.energy_type == energy_type, EnergyDailySummary.date >= start_date, EnergyDailySummary.date <= end_date, ) ) parent_consumption = (await db.execute(parent_q)).scalar() or 0 # Children consumption: devices in child groups if child_group_ids: children_q = select(func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0)).select_from( EnergyDailySummary ).join(Device, EnergyDailySummary.device_id == Device.id).where( and_( Device.group_id.in_(child_group_ids), EnergyDailySummary.energy_type == energy_type, EnergyDailySummary.date >= start_date, EnergyDailySummary.date <= end_date, ) ) children_consumption = (await db.execute(children_q)).scalar() or 0 else: children_consumption = 0 loss = parent_consumption - children_consumption loss_rate = (loss / parent_consumption * 100) if parent_consumption > 0 else 0 results.append({ "group_name": group.name, "parent_consumption": round(parent_consumption, 2), "children_consumption": round(children_consumption, 2), "loss": round(loss, 2), "loss_rate_pct": round(loss_rate, 1), }) return results @router.get("/yoy") async def get_yoy_comparison( year: int | None = None, energy_type: str = "electricity", group_id: int | None = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """同比分析 - Current year vs previous year, month by month""" current_year = year or datetime.now(timezone.utc).year prev_year = current_year - 1 settings = get_settings() results = [] for month in range(1, 13): for yr, label in [(current_year, "current_year"), (prev_year, "previous_year")]: month_start = f"{yr}-{month:02d}-01" if month == 12: month_end = f"{yr + 1}-01-01" else: month_end = f"{yr}-{month + 1:02d}-01" q = select(func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0)).where( and_( EnergyDailySummary.energy_type == energy_type, EnergyDailySummary.date >= month_start, EnergyDailySummary.date < month_end, ) ) if group_id: q = q.select_from(EnergyDailySummary).join( Device, EnergyDailySummary.device_id == Device.id ).where(Device.group_id == group_id) val = (await db.execute(q)).scalar() or 0 # Find or create month entry existing = next((r for r in results if r["month"] == month), None) if not existing: existing = {"month": month, "current_year": 0, "previous_year": 0, "change_pct": 0} results.append(existing) existing[label] = round(val, 2) # Calculate change percentages for r in results: if r["previous_year"] > 0: r["change_pct"] = round((r["current_year"] - r["previous_year"]) / r["previous_year"] * 100, 1) return results @router.get("/mom") async def get_mom_comparison( period: str = "month", energy_type: str = "electricity", group_id: int | None = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """环比分析 - Current period vs previous period""" now = datetime.now(timezone.utc) if period == "month": current_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) prev_start = (current_start - timedelta(days=1)).replace(day=1) # Generate daily labels days_in_month = (now - current_start).days + 1 prev_end = current_start labels = [f"{i + 1}日" for i in range(31)] elif period == "week": current_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0) prev_start = current_start - timedelta(weeks=1) prev_end = current_start labels = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] else: # day current_start = now.replace(hour=0, minute=0, second=0, microsecond=0) prev_start = current_start - timedelta(days=1) prev_end = current_start labels = [f"{i}:00" for i in range(24)] async def get_period_data(start, end): q = select( EnergyDailySummary.date, func.sum(EnergyDailySummary.total_consumption).label("consumption"), ).where( and_( EnergyDailySummary.energy_type == energy_type, EnergyDailySummary.date >= str(start)[:10], EnergyDailySummary.date < str(end)[:10], ) ) if group_id: q = q.select_from(EnergyDailySummary).join( Device, EnergyDailySummary.device_id == Device.id ).where(Device.group_id == group_id) q = q.group_by(EnergyDailySummary.date).order_by(EnergyDailySummary.date) result = await db.execute(q) return [{"date": str(r.date), "consumption": round(r.consumption, 2)} for r in result.all()] current_data = await get_period_data(current_start, now) previous_data = await get_period_data(prev_start, prev_end) # Build comparison items max_len = max(len(current_data), len(previous_data), 1) items = [] for i in range(max_len): cur_val = current_data[i]["consumption"] if i < len(current_data) else 0 prev_val = previous_data[i]["consumption"] if i < len(previous_data) else 0 change_pct = round((cur_val - prev_val) / prev_val * 100, 1) if prev_val > 0 else 0 items.append({ "label": labels[i] if i < len(labels) else str(i + 1), "current_period": cur_val, "previous_period": prev_val, "change_pct": change_pct, }) total_current = sum(d["consumption"] for d in current_data) total_previous = sum(d["consumption"] for d in previous_data) total_change = round((total_current - total_previous) / total_previous * 100, 1) if total_previous > 0 else 0 return { "items": items, "total_current": round(total_current, 2), "total_previous": round(total_previous, 2), "total_change_pct": total_change, }