Shared backend + frontend for multi-customer EMS deployments. - 12 enterprise modules: quota, cost, charging, maintenance, analysis, etc. - 120+ API endpoints, 37 database tables - Customer config mechanism (CUSTOMER env var + YAML config) - Collectors: Modbus TCP, MQTT, HTTP API, Sungrow iSolarCloud - Frontend: React 19 + Ant Design + ECharts + Three.js - Infrastructure: Redis cache, rate limiting, aggregation engine Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
764 lines
29 KiB
Python
764 lines
29 KiB
Python
import csv
|
|
import io
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, Depends, Query, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select, func, and_, text, Integer
|
|
from sqlalchemy.orm import joinedload
|
|
from pydantic import BaseModel
|
|
from app.core.database import get_db
|
|
from app.core.config import get_settings
|
|
from app.core.deps import get_current_user
|
|
from app.models.energy import EnergyData, EnergyDailySummary, EnergyCategory
|
|
from app.models.device import Device
|
|
from app.models.user import User
|
|
from app.core.deps import require_roles
|
|
|
|
router = APIRouter(prefix="/energy", tags=["能耗数据"])
|
|
|
|
|
|
@router.get("/history")
|
|
async def query_history(
|
|
device_id: int | None = None,
|
|
data_type: str = "power",
|
|
start_time: str | None = None,
|
|
end_time: str | None = None,
|
|
granularity: str = Query("hour", pattern="^(raw|5min|hour|day)$"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(100, ge=1, le=1000),
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""历史数据查询"""
|
|
query = select(EnergyData).where(EnergyData.data_type == data_type)
|
|
if device_id:
|
|
query = query.where(EnergyData.device_id == device_id)
|
|
if start_time:
|
|
query = query.where(EnergyData.timestamp >= start_time)
|
|
if end_time:
|
|
query = query.where(EnergyData.timestamp <= end_time)
|
|
|
|
if granularity == "raw":
|
|
query = query.order_by(EnergyData.timestamp.desc()).offset((page - 1) * page_size).limit(page_size)
|
|
result = await db.execute(query)
|
|
return [{"timestamp": str(d.timestamp), "value": d.value, "unit": d.unit, "device_id": d.device_id}
|
|
for d in result.scalars().all()]
|
|
else:
|
|
settings = get_settings()
|
|
if granularity == "5min":
|
|
if settings.is_sqlite:
|
|
time_bucket = func.strftime('%Y-%m-%d %H:', EnergyData.timestamp).op('||')(
|
|
func.printf('%02d:00', (func.cast(func.strftime('%M', EnergyData.timestamp), Integer) / 5) * 5)
|
|
).label('time_bucket')
|
|
else:
|
|
time_bucket = func.to_timestamp(
|
|
func.floor(func.extract('epoch', EnergyData.timestamp) / 300) * 300
|
|
).label('time_bucket')
|
|
elif granularity == "hour":
|
|
if settings.is_sqlite:
|
|
time_bucket = func.strftime('%Y-%m-%d %H:00:00', EnergyData.timestamp).label('time_bucket')
|
|
else:
|
|
time_bucket = func.date_trunc('hour', EnergyData.timestamp).label('time_bucket')
|
|
else: # day
|
|
if settings.is_sqlite:
|
|
time_bucket = func.strftime('%Y-%m-%d', EnergyData.timestamp).label('time_bucket')
|
|
else:
|
|
time_bucket = func.date_trunc('day', EnergyData.timestamp).label('time_bucket')
|
|
agg_query = select(
|
|
time_bucket,
|
|
func.avg(EnergyData.value).label('avg_value'),
|
|
func.max(EnergyData.value).label('max_value'),
|
|
func.min(EnergyData.value).label('min_value'),
|
|
).where(EnergyData.data_type == data_type)
|
|
if device_id:
|
|
agg_query = agg_query.where(EnergyData.device_id == device_id)
|
|
if start_time:
|
|
agg_query = agg_query.where(EnergyData.timestamp >= start_time)
|
|
if end_time:
|
|
agg_query = agg_query.where(EnergyData.timestamp <= end_time)
|
|
agg_query = agg_query.group_by(text('time_bucket')).order_by(text('time_bucket'))
|
|
result = await db.execute(agg_query)
|
|
return [{"time": str(r[0]), "avg": round(r[1], 2), "max": round(r[2], 2), "min": round(r[3], 2)}
|
|
for r in result.all()]
|
|
|
|
|
|
@router.get("/params")
|
|
async def query_electrical_params(
|
|
device_id: int = Query(..., description="设备ID"),
|
|
params: str = Query("power", description="参数列表(逗号分隔): power,voltage,current,power_factor,temperature,frequency,cop"),
|
|
start_time: str | None = None,
|
|
end_time: str | None = None,
|
|
granularity: str = Query("raw", pattern="^(raw|5min|hour|day)$"),
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""电参量查询 - Query multiple electrical parameters for a device"""
|
|
param_list = [p.strip() for p in params.split(",") if p.strip()]
|
|
result_data = {}
|
|
settings = get_settings()
|
|
|
|
for param in param_list:
|
|
base_filter = and_(
|
|
EnergyData.device_id == device_id,
|
|
EnergyData.data_type == param,
|
|
)
|
|
conditions = [base_filter]
|
|
if start_time:
|
|
conditions.append(EnergyData.timestamp >= start_time)
|
|
if end_time:
|
|
conditions.append(EnergyData.timestamp <= end_time)
|
|
combined = and_(*conditions)
|
|
|
|
if granularity == "raw":
|
|
query = (
|
|
select(EnergyData.timestamp, EnergyData.value, EnergyData.unit)
|
|
.where(combined)
|
|
.order_by(EnergyData.timestamp)
|
|
.limit(5000)
|
|
)
|
|
rows = await db.execute(query)
|
|
result_data[param] = [
|
|
{"timestamp": str(r[0]), "value": r[1], "unit": r[2]}
|
|
for r in rows.all()
|
|
]
|
|
else:
|
|
if granularity == "5min":
|
|
if settings.is_sqlite:
|
|
time_bucket = func.strftime('%Y-%m-%d %H:', EnergyData.timestamp).op('||')(
|
|
func.printf('%02d:00', (func.cast(func.strftime('%M', EnergyData.timestamp), Integer) / 5) * 5)
|
|
).label('time_bucket')
|
|
else:
|
|
time_bucket = func.to_timestamp(
|
|
func.floor(func.extract('epoch', EnergyData.timestamp) / 300) * 300
|
|
).label('time_bucket')
|
|
elif granularity == "hour":
|
|
if settings.is_sqlite:
|
|
time_bucket = func.strftime('%Y-%m-%d %H:00:00', EnergyData.timestamp).label('time_bucket')
|
|
else:
|
|
time_bucket = func.date_trunc('hour', EnergyData.timestamp).label('time_bucket')
|
|
else: # day
|
|
if settings.is_sqlite:
|
|
time_bucket = func.strftime('%Y-%m-%d', EnergyData.timestamp).label('time_bucket')
|
|
else:
|
|
time_bucket = func.date_trunc('day', EnergyData.timestamp).label('time_bucket')
|
|
|
|
agg_query = (
|
|
select(time_bucket, func.avg(EnergyData.value).label('avg_value'))
|
|
.where(combined)
|
|
.group_by(text('time_bucket'))
|
|
.order_by(text('time_bucket'))
|
|
)
|
|
rows = await db.execute(agg_query)
|
|
result_data[param] = [
|
|
{"timestamp": str(r[0]), "value": round(r[1], 2)}
|
|
for r in rows.all()
|
|
]
|
|
|
|
return result_data
|
|
|
|
|
|
@router.get("/daily-summary")
|
|
async def daily_summary(
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
energy_type: str | None = None,
|
|
device_id: int | None = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""每日能耗汇总"""
|
|
query = select(EnergyDailySummary)
|
|
if start_date:
|
|
query = query.where(EnergyDailySummary.date >= start_date)
|
|
if end_date:
|
|
query = query.where(EnergyDailySummary.date <= end_date)
|
|
if energy_type:
|
|
query = query.where(EnergyDailySummary.energy_type == energy_type)
|
|
if device_id:
|
|
query = query.where(EnergyDailySummary.device_id == device_id)
|
|
query = query.order_by(EnergyDailySummary.date.desc()).limit(365)
|
|
result = await db.execute(query)
|
|
return [{
|
|
"date": str(s.date), "device_id": s.device_id, "energy_type": s.energy_type,
|
|
"consumption": s.total_consumption, "generation": s.total_generation,
|
|
"peak_power": s.peak_power, "avg_power": s.avg_power,
|
|
"operating_hours": s.operating_hours, "cost": s.cost, "carbon_emission": s.carbon_emission,
|
|
} for s in result.scalars().all()]
|
|
|
|
|
|
@router.get("/comparison")
|
|
async def energy_comparison(
|
|
device_id: int | None = None,
|
|
energy_type: str = "electricity",
|
|
period: str = Query("month", pattern="^(day|week|month|year)$"),
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""能耗同比环比分析"""
|
|
now = datetime.now(timezone.utc)
|
|
if period == "day":
|
|
current_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = current_start - timedelta(days=1)
|
|
yoy_start = current_start.replace(year=current_start.year - 1)
|
|
elif period == "week":
|
|
current_start = now - timedelta(days=now.weekday())
|
|
current_start = current_start.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = current_start - timedelta(weeks=1)
|
|
yoy_start = current_start.replace(year=current_start.year - 1)
|
|
elif period == "month":
|
|
current_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = (current_start - timedelta(days=1)).replace(day=1)
|
|
yoy_start = current_start.replace(year=current_start.year - 1)
|
|
else:
|
|
current_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = current_start.replace(year=current_start.year - 1)
|
|
yoy_start = prev_start
|
|
|
|
async def sum_consumption(start, end):
|
|
q = select(func.sum(EnergyDailySummary.total_consumption)).where(
|
|
and_(EnergyDailySummary.date >= start, EnergyDailySummary.date < end,
|
|
EnergyDailySummary.energy_type == energy_type)
|
|
)
|
|
if device_id:
|
|
q = q.where(EnergyDailySummary.device_id == device_id)
|
|
r = await db.execute(q)
|
|
return r.scalar() or 0
|
|
|
|
current = await sum_consumption(current_start, now)
|
|
previous = await sum_consumption(prev_start, current_start)
|
|
yoy = await sum_consumption(yoy_start, yoy_start.replace(year=yoy_start.year + 1))
|
|
|
|
return {
|
|
"current": round(current, 2),
|
|
"previous": round(previous, 2),
|
|
"yoy": round(yoy, 2),
|
|
"mom_change": round((current - previous) / previous * 100, 1) if previous else 0,
|
|
"yoy_change": round((current - yoy) / yoy * 100, 1) if yoy else 0,
|
|
}
|
|
|
|
|
|
@router.get("/export")
|
|
async def export_energy_data(
|
|
start_time: str = Query(..., description="开始时间, e.g. 2026-03-01"),
|
|
end_time: str = Query(..., description="结束时间, e.g. 2026-03-31"),
|
|
device_id: int | None = Query(None, description="设备ID (可选)"),
|
|
data_type: str | None = Query(None, description="数据类型 (可选, e.g. power, energy)"),
|
|
format: str = Query("csv", pattern="^(csv|xlsx)$", description="导出格式: csv 或 xlsx"),
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""导出能耗数据为CSV或Excel文件"""
|
|
# Parse date strings to datetime for proper PostgreSQL comparison
|
|
try:
|
|
start_dt = datetime.fromisoformat(start_time)
|
|
except ValueError:
|
|
start_dt = datetime.strptime(start_time, "%Y-%m-%d")
|
|
try:
|
|
end_dt = datetime.fromisoformat(end_time)
|
|
except ValueError:
|
|
end_dt = datetime.strptime(end_time, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
|
|
|
|
# If end_time was just a date (no time component), set to end of day
|
|
if end_dt.hour == 0 and end_dt.minute == 0 and end_dt.second == 0 and "T" not in end_time:
|
|
end_dt = end_dt.replace(hour=23, minute=59, second=59)
|
|
|
|
# Query energy data with device names
|
|
query = (
|
|
select(EnergyData, Device.name.label("device_name"))
|
|
.join(Device, EnergyData.device_id == Device.id, isouter=True)
|
|
.where(
|
|
and_(
|
|
EnergyData.timestamp >= start_dt,
|
|
EnergyData.timestamp <= end_dt,
|
|
)
|
|
)
|
|
)
|
|
if device_id:
|
|
query = query.where(EnergyData.device_id == device_id)
|
|
if data_type:
|
|
query = query.where(EnergyData.data_type == data_type)
|
|
query = query.order_by(EnergyData.timestamp)
|
|
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
|
|
headers = ["timestamp", "device_name", "data_type", "value", "unit"]
|
|
data_rows = []
|
|
for row in rows:
|
|
energy = row[0] # EnergyData object
|
|
device_name = row[1] or f"Device#{energy.device_id}"
|
|
data_rows.append([
|
|
str(energy.timestamp) if energy.timestamp else "",
|
|
device_name,
|
|
energy.data_type or "",
|
|
energy.value,
|
|
energy.unit or "",
|
|
])
|
|
|
|
date_suffix = f"{start_time}_{end_time}".replace("-", "")
|
|
if format == "xlsx":
|
|
return _export_xlsx(headers, data_rows, f"energy_export_{date_suffix}.xlsx")
|
|
else:
|
|
return _export_csv(headers, data_rows, f"energy_export_{date_suffix}.csv")
|
|
|
|
|
|
def _export_csv(headers: list[str], rows: list[list], filename: str) -> StreamingResponse:
|
|
"""Generate CSV streaming response."""
|
|
output = io.StringIO()
|
|
# Add BOM for Excel compatibility with Chinese characters
|
|
output.write('\ufeff')
|
|
writer = csv.writer(output)
|
|
writer.writerow(headers)
|
|
writer.writerows(rows)
|
|
output.seek(0)
|
|
return StreamingResponse(
|
|
iter([output.getvalue()]),
|
|
media_type="text/csv; charset=utf-8",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
|
|
def _export_xlsx(headers: list[str], rows: list[list], filename: str) -> StreamingResponse:
|
|
"""Generate XLSX streaming response."""
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "能耗数据"
|
|
|
|
header_font = Font(bold=True, color="FFFFFF", size=11)
|
|
header_fill = PatternFill(start_color="336699", end_color="336699", fill_type="solid")
|
|
header_align = Alignment(horizontal="center", vertical="center")
|
|
thin_border = Border(
|
|
left=Side(style="thin", color="CCCCCC"),
|
|
right=Side(style="thin", color="CCCCCC"),
|
|
top=Side(style="thin", color="CCCCCC"),
|
|
bottom=Side(style="thin", color="CCCCCC"),
|
|
)
|
|
|
|
# Write headers
|
|
for col_idx, h in enumerate(headers, 1):
|
|
cell = ws.cell(row=1, column=col_idx, value=h)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = header_align
|
|
cell.border = thin_border
|
|
|
|
# Write data
|
|
for row_idx, row_data in enumerate(rows, 2):
|
|
for col_idx, val in enumerate(row_data, 1):
|
|
cell = ws.cell(row=row_idx, column=col_idx, value=val)
|
|
cell.border = thin_border
|
|
if isinstance(val, float):
|
|
cell.number_format = "#,##0.00"
|
|
|
|
# Auto-width
|
|
for col_idx in range(1, len(headers) + 1):
|
|
max_len = len(str(headers[col_idx - 1]))
|
|
for row_idx in range(2, min(len(rows) + 2, 102)):
|
|
val = ws.cell(row=row_idx, column=col_idx).value
|
|
if val:
|
|
max_len = max(max_len, len(str(val)))
|
|
ws.column_dimensions[ws.cell(row=1, column=col_idx).column_letter].width = min(max_len + 4, 40)
|
|
|
|
ws.freeze_panes = "A2"
|
|
if rows:
|
|
ws.auto_filter.ref = f"A1:{ws.cell(row=1, column=len(headers)).column_letter}{len(rows) + 1}"
|
|
|
|
output = io.BytesIO()
|
|
wb.save(output)
|
|
output.seek(0)
|
|
|
|
return StreamingResponse(
|
|
output,
|
|
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
|
|
)
|
|
|
|
|
|
# ── Energy Category (分项能耗) ──────────────────────────────────────
|
|
|
|
class CategoryCreate(BaseModel):
|
|
name: str
|
|
code: str
|
|
parent_id: int | None = None
|
|
sort_order: int = 0
|
|
icon: str | None = None
|
|
color: str | None = None
|
|
|
|
|
|
def _category_to_dict(c: EnergyCategory) -> dict:
|
|
return {
|
|
"id": c.id, "name": c.name, "code": c.code,
|
|
"parent_id": c.parent_id, "sort_order": c.sort_order,
|
|
"icon": c.icon, "color": c.color,
|
|
"created_at": str(c.created_at) if c.created_at else None,
|
|
}
|
|
|
|
|
|
def _build_category_tree(items: list[dict], parent_id: int | None = None) -> list[dict]:
|
|
tree = []
|
|
for item in items:
|
|
if item["parent_id"] == parent_id:
|
|
children = _build_category_tree(items, item["id"])
|
|
if children:
|
|
item["children"] = children
|
|
tree.append(item)
|
|
return tree
|
|
|
|
|
|
@router.get("/categories")
|
|
async def list_categories(
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""获取能耗分项类别(树结构)"""
|
|
result = await db.execute(
|
|
select(EnergyCategory).order_by(EnergyCategory.sort_order, EnergyCategory.id)
|
|
)
|
|
items = [_category_to_dict(c) for c in result.scalars().all()]
|
|
return _build_category_tree(items)
|
|
|
|
|
|
@router.post("/categories")
|
|
async def create_category(
|
|
data: CategoryCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(require_roles("admin", "energy_manager")),
|
|
):
|
|
"""创建能耗分项类别"""
|
|
cat = EnergyCategory(**data.model_dump())
|
|
db.add(cat)
|
|
await db.flush()
|
|
return _category_to_dict(cat)
|
|
|
|
|
|
@router.put("/categories/{cat_id}")
|
|
async def update_category(
|
|
cat_id: int,
|
|
data: CategoryCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(require_roles("admin", "energy_manager")),
|
|
):
|
|
"""更新能耗分项类别"""
|
|
result = await db.execute(select(EnergyCategory).where(EnergyCategory.id == cat_id))
|
|
cat = result.scalar_one_or_none()
|
|
if not cat:
|
|
raise HTTPException(status_code=404, detail="分项类别不存在")
|
|
for k, v in data.model_dump(exclude_unset=True).items():
|
|
setattr(cat, k, v)
|
|
return _category_to_dict(cat)
|
|
|
|
|
|
@router.get("/by-category")
|
|
async def energy_by_category(
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
energy_type: str = "electricity",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""按分项类别统计能耗"""
|
|
query = (
|
|
select(
|
|
EnergyCategory.id,
|
|
EnergyCategory.name,
|
|
EnergyCategory.code,
|
|
EnergyCategory.color,
|
|
func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0).label("consumption"),
|
|
)
|
|
.select_from(EnergyCategory)
|
|
.outerjoin(Device, Device.category_id == EnergyCategory.id)
|
|
.outerjoin(
|
|
EnergyDailySummary,
|
|
and_(
|
|
EnergyDailySummary.device_id == Device.id,
|
|
EnergyDailySummary.energy_type == energy_type,
|
|
),
|
|
)
|
|
)
|
|
if start_date:
|
|
query = query.where(EnergyDailySummary.date >= start_date)
|
|
if end_date:
|
|
query = query.where(EnergyDailySummary.date <= end_date)
|
|
query = query.group_by(EnergyCategory.id, EnergyCategory.name, EnergyCategory.code, EnergyCategory.color)
|
|
result = await db.execute(query)
|
|
rows = result.all()
|
|
total = sum(r.consumption for r in rows) or 1
|
|
return [
|
|
{
|
|
"id": r.id, "name": r.name, "code": r.code, "color": r.color,
|
|
"consumption": round(r.consumption, 2),
|
|
"percentage": round(r.consumption / total * 100, 1),
|
|
}
|
|
for r in rows
|
|
]
|
|
|
|
|
|
@router.get("/category-ranking")
|
|
async def category_ranking(
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
energy_type: str = "electricity",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""分项能耗排名"""
|
|
query = (
|
|
select(
|
|
EnergyCategory.name,
|
|
EnergyCategory.color,
|
|
func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0).label("consumption"),
|
|
)
|
|
.select_from(EnergyCategory)
|
|
.outerjoin(Device, Device.category_id == EnergyCategory.id)
|
|
.outerjoin(
|
|
EnergyDailySummary,
|
|
and_(
|
|
EnergyDailySummary.device_id == Device.id,
|
|
EnergyDailySummary.energy_type == energy_type,
|
|
),
|
|
)
|
|
)
|
|
if start_date:
|
|
query = query.where(EnergyDailySummary.date >= start_date)
|
|
if end_date:
|
|
query = query.where(EnergyDailySummary.date <= end_date)
|
|
query = query.group_by(EnergyCategory.name, EnergyCategory.color).order_by(text("consumption DESC"))
|
|
result = await db.execute(query)
|
|
return [{"name": r.name, "color": r.color, "consumption": round(r.consumption, 2)} for r in result.all()]
|
|
|
|
|
|
@router.get("/category-trend")
|
|
async def category_trend(
|
|
start_date: str | None = None,
|
|
end_date: str | None = None,
|
|
energy_type: str = "electricity",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""分项能耗每日趋势"""
|
|
query = (
|
|
select(
|
|
EnergyDailySummary.date,
|
|
EnergyCategory.name,
|
|
EnergyCategory.color,
|
|
func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0).label("consumption"),
|
|
)
|
|
.select_from(EnergyDailySummary)
|
|
.join(Device, EnergyDailySummary.device_id == Device.id)
|
|
.join(EnergyCategory, Device.category_id == EnergyCategory.id)
|
|
.where(EnergyDailySummary.energy_type == energy_type)
|
|
)
|
|
if start_date:
|
|
query = query.where(EnergyDailySummary.date >= start_date)
|
|
if end_date:
|
|
query = query.where(EnergyDailySummary.date <= end_date)
|
|
query = query.group_by(EnergyDailySummary.date, EnergyCategory.name, EnergyCategory.color)
|
|
query = query.order_by(EnergyDailySummary.date)
|
|
result = await db.execute(query)
|
|
return [
|
|
{"date": str(r.date), "category": r.name, "color": r.color, "consumption": round(r.consumption, 2)}
|
|
for r in result.all()
|
|
]
|
|
|
|
|
|
# ── Loss / YoY / MoM Analysis ─────────────────────────────────────
|
|
|
|
from app.models.device import DeviceGroup
|
|
|
|
|
|
@router.get("/loss")
|
|
async def get_energy_loss(
|
|
start_date: str,
|
|
end_date: str,
|
|
energy_type: str = "electricity",
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""能耗损耗分析 - Compare parent meter vs sum of sub-meters"""
|
|
# Get all groups that have children
|
|
groups_result = await db.execute(select(DeviceGroup))
|
|
all_groups = groups_result.scalars().all()
|
|
group_map = {g.id: g for g in all_groups}
|
|
parent_ids = {g.parent_id for g in all_groups if g.parent_id is not None}
|
|
|
|
results = []
|
|
for gid in parent_ids:
|
|
group = group_map.get(gid)
|
|
if not group:
|
|
continue
|
|
child_group_ids = [g.id for g in all_groups if g.parent_id == gid]
|
|
|
|
# Parent consumption: devices directly in this group
|
|
parent_q = select(func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0)).select_from(
|
|
EnergyDailySummary
|
|
).join(Device, EnergyDailySummary.device_id == Device.id).where(
|
|
and_(
|
|
Device.group_id == gid,
|
|
EnergyDailySummary.energy_type == energy_type,
|
|
EnergyDailySummary.date >= start_date,
|
|
EnergyDailySummary.date <= end_date,
|
|
)
|
|
)
|
|
parent_consumption = (await db.execute(parent_q)).scalar() or 0
|
|
|
|
# Children consumption: devices in child groups
|
|
if child_group_ids:
|
|
children_q = select(func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0)).select_from(
|
|
EnergyDailySummary
|
|
).join(Device, EnergyDailySummary.device_id == Device.id).where(
|
|
and_(
|
|
Device.group_id.in_(child_group_ids),
|
|
EnergyDailySummary.energy_type == energy_type,
|
|
EnergyDailySummary.date >= start_date,
|
|
EnergyDailySummary.date <= end_date,
|
|
)
|
|
)
|
|
children_consumption = (await db.execute(children_q)).scalar() or 0
|
|
else:
|
|
children_consumption = 0
|
|
|
|
loss = parent_consumption - children_consumption
|
|
loss_rate = (loss / parent_consumption * 100) if parent_consumption > 0 else 0
|
|
|
|
results.append({
|
|
"group_name": group.name,
|
|
"parent_consumption": round(parent_consumption, 2),
|
|
"children_consumption": round(children_consumption, 2),
|
|
"loss": round(loss, 2),
|
|
"loss_rate_pct": round(loss_rate, 1),
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
@router.get("/yoy")
|
|
async def get_yoy_comparison(
|
|
year: int | None = None,
|
|
energy_type: str = "electricity",
|
|
group_id: int | None = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""同比分析 - Current year vs previous year, month by month"""
|
|
current_year = year or datetime.now(timezone.utc).year
|
|
prev_year = current_year - 1
|
|
settings = get_settings()
|
|
|
|
results = []
|
|
for month in range(1, 13):
|
|
for yr, label in [(current_year, "current_year"), (prev_year, "previous_year")]:
|
|
month_start = f"{yr}-{month:02d}-01"
|
|
if month == 12:
|
|
month_end = f"{yr + 1}-01-01"
|
|
else:
|
|
month_end = f"{yr}-{month + 1:02d}-01"
|
|
|
|
q = select(func.coalesce(func.sum(EnergyDailySummary.total_consumption), 0)).where(
|
|
and_(
|
|
EnergyDailySummary.energy_type == energy_type,
|
|
EnergyDailySummary.date >= month_start,
|
|
EnergyDailySummary.date < month_end,
|
|
)
|
|
)
|
|
if group_id:
|
|
q = q.select_from(EnergyDailySummary).join(
|
|
Device, EnergyDailySummary.device_id == Device.id
|
|
).where(Device.group_id == group_id)
|
|
|
|
val = (await db.execute(q)).scalar() or 0
|
|
# Find or create month entry
|
|
existing = next((r for r in results if r["month"] == month), None)
|
|
if not existing:
|
|
existing = {"month": month, "current_year": 0, "previous_year": 0, "change_pct": 0}
|
|
results.append(existing)
|
|
existing[label] = round(val, 2)
|
|
|
|
# Calculate change percentages
|
|
for r in results:
|
|
if r["previous_year"] > 0:
|
|
r["change_pct"] = round((r["current_year"] - r["previous_year"]) / r["previous_year"] * 100, 1)
|
|
|
|
return results
|
|
|
|
|
|
@router.get("/mom")
|
|
async def get_mom_comparison(
|
|
period: str = "month",
|
|
energy_type: str = "electricity",
|
|
group_id: int | None = None,
|
|
db: AsyncSession = Depends(get_db),
|
|
user: User = Depends(get_current_user),
|
|
):
|
|
"""环比分析 - Current period vs previous period"""
|
|
now = datetime.now(timezone.utc)
|
|
|
|
if period == "month":
|
|
current_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = (current_start - timedelta(days=1)).replace(day=1)
|
|
# Generate daily labels
|
|
days_in_month = (now - current_start).days + 1
|
|
prev_end = current_start
|
|
labels = [f"{i + 1}日" for i in range(31)]
|
|
elif period == "week":
|
|
current_start = (now - timedelta(days=now.weekday())).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = current_start - timedelta(weeks=1)
|
|
prev_end = current_start
|
|
labels = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
|
|
else: # day
|
|
current_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
prev_start = current_start - timedelta(days=1)
|
|
prev_end = current_start
|
|
labels = [f"{i}:00" for i in range(24)]
|
|
|
|
async def get_period_data(start, end):
|
|
q = select(
|
|
EnergyDailySummary.date,
|
|
func.sum(EnergyDailySummary.total_consumption).label("consumption"),
|
|
).where(
|
|
and_(
|
|
EnergyDailySummary.energy_type == energy_type,
|
|
EnergyDailySummary.date >= str(start)[:10],
|
|
EnergyDailySummary.date < str(end)[:10],
|
|
)
|
|
)
|
|
if group_id:
|
|
q = q.select_from(EnergyDailySummary).join(
|
|
Device, EnergyDailySummary.device_id == Device.id
|
|
).where(Device.group_id == group_id)
|
|
q = q.group_by(EnergyDailySummary.date).order_by(EnergyDailySummary.date)
|
|
result = await db.execute(q)
|
|
return [{"date": str(r.date), "consumption": round(r.consumption, 2)} for r in result.all()]
|
|
|
|
current_data = await get_period_data(current_start, now)
|
|
previous_data = await get_period_data(prev_start, prev_end)
|
|
|
|
# Build comparison items
|
|
max_len = max(len(current_data), len(previous_data), 1)
|
|
items = []
|
|
for i in range(max_len):
|
|
cur_val = current_data[i]["consumption"] if i < len(current_data) else 0
|
|
prev_val = previous_data[i]["consumption"] if i < len(previous_data) else 0
|
|
change_pct = round((cur_val - prev_val) / prev_val * 100, 1) if prev_val > 0 else 0
|
|
items.append({
|
|
"label": labels[i] if i < len(labels) else str(i + 1),
|
|
"current_period": cur_val,
|
|
"previous_period": prev_val,
|
|
"change_pct": change_pct,
|
|
})
|
|
|
|
total_current = sum(d["consumption"] for d in current_data)
|
|
total_previous = sum(d["consumption"] for d in previous_data)
|
|
total_change = round((total_current - total_previous) / total_previous * 100, 1) if total_previous > 0 else 0
|
|
|
|
return {
|
|
"items": items,
|
|
"total_current": round(total_current, 2),
|
|
"total_previous": round(total_previous, 2),
|
|
"total_change_pct": total_change,
|
|
}
|