Files
ems-core/backend/app/api/v1/meters.py

202 lines
6.2 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.meter import MeterReading
from app.models.user import User
router = APIRouter(prefix="/meters", tags=["电表管理"])
settings = get_settings()
_customer_config = settings.load_customer_config()
# --------------- Pydantic schemas ---------------
class MeterInfo(BaseModel):
id: int
name: str
meter_no: str | None = None
modbus_addr: int | None = None
class MeterReadingResponse(BaseModel):
time: datetime
meter_id: int
meter_name: str | None = None
forward_active_energy: float | None = None
reverse_active_energy: float | None = None
active_power: float | None = None
reactive_power: float | None = None
power_factor: float | None = None
voltage_a: float | None = None
voltage_b: float | None = None
voltage_c: float | None = None
current_a: float | None = None
current_b: float | None = None
current_c: float | None = None
class MeterLatestResponse(BaseModel):
meter: MeterInfo
latest: MeterReadingResponse | None = None
class MeterOverviewItem(BaseModel):
meter: MeterInfo
latest: MeterReadingResponse | None = None
class MeterOverviewResponse(BaseModel):
meters: list[MeterOverviewItem]
total_forward_energy: float
total_reverse_energy: float
total_active_power: float
class MeterListResponse(BaseModel):
items: list[MeterInfo]
total: int
# --------------- Helpers ---------------
def _get_meter_configs() -> list[dict]:
"""Load meter list from customer config.yaml."""
return _customer_config.get("meters", [])
def _meter_config_to_info(cfg: dict) -> MeterInfo:
return MeterInfo(
id=cfg["id"],
name=cfg.get("name", f"Meter-{cfg['id']}"),
meter_no=cfg.get("meter_no"),
modbus_addr=cfg.get("modbus_addr"),
)
def _reading_to_response(r: MeterReading) -> MeterReadingResponse:
return MeterReadingResponse(
time=r.time,
meter_id=r.meter_id,
meter_name=r.meter_name,
forward_active_energy=r.forward_active_energy,
reverse_active_energy=r.reverse_active_energy,
active_power=r.active_power,
reactive_power=r.reactive_power,
power_factor=r.power_factor,
voltage_a=r.voltage_a,
voltage_b=r.voltage_b,
voltage_c=r.voltage_c,
current_a=r.current_a,
current_b=r.current_b,
current_c=r.current_c,
)
# --------------- Endpoints ---------------
@router.get("", response_model=MeterListResponse)
async def list_meters(
user: User = Depends(get_current_user),
):
"""列出所有已配置的电表"""
configs = _get_meter_configs()
items = [_meter_config_to_info(c) for c in configs]
return MeterListResponse(items=items, total=len(items))
@router.get("/overview", response_model=MeterOverviewResponse)
async def meter_overview(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""仪表盘概览 - 所有电表最新值及汇总"""
configs = _get_meter_configs()
overview_items: list[MeterOverviewItem] = []
total_forward = 0.0
total_reverse = 0.0
total_power = 0.0
for cfg in configs:
info = _meter_config_to_info(cfg)
result = await db.execute(
select(MeterReading)
.where(MeterReading.meter_id == cfg["id"])
.order_by(desc(MeterReading.time))
.limit(1)
)
reading = result.scalar_one_or_none()
latest = _reading_to_response(reading) if reading else None
if reading:
total_forward += reading.forward_active_energy or 0
total_reverse += reading.reverse_active_energy or 0
total_power += reading.active_power or 0
overview_items.append(MeterOverviewItem(meter=info, latest=latest))
return MeterOverviewResponse(
meters=overview_items,
total_forward_energy=total_forward,
total_reverse_energy=total_reverse,
total_active_power=total_power,
)
@router.get("/{meter_id}/latest", response_model=MeterLatestResponse)
async def get_meter_latest(
meter_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""获取指定电表的最新读数"""
configs = _get_meter_configs()
cfg = next((c for c in configs if c["id"] == meter_id), None)
if not cfg:
raise HTTPException(status_code=404, detail="电表不存在")
result = await db.execute(
select(MeterReading)
.where(MeterReading.meter_id == meter_id)
.order_by(desc(MeterReading.time))
.limit(1)
)
reading = result.scalar_one_or_none()
return MeterLatestResponse(
meter=_meter_config_to_info(cfg),
latest=_reading_to_response(reading) if reading else None,
)
@router.get("/{meter_id}/readings", response_model=list[MeterReadingResponse])
async def get_meter_readings(
meter_id: int,
start: Optional[datetime] = Query(None, description="开始时间 ISO8601"),
end: Optional[datetime] = Query(None, description="结束时间 ISO8601"),
limit: int = Query(100, ge=1, le=10000, description="返回条数上限"),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""查询历史读数 (时间范围 + limit)"""
configs = _get_meter_configs()
cfg = next((c for c in configs if c["id"] == meter_id), None)
if not cfg:
raise HTTPException(status_code=404, detail="电表不存在")
query = select(MeterReading).where(MeterReading.meter_id == meter_id)
if start:
query = query.where(MeterReading.time >= start)
if end:
query = query.where(MeterReading.time <= end)
query = query.order_by(desc(MeterReading.time)).limit(limit)
result = await db.execute(query)
readings = result.scalars().all()
return [_reading_to_response(r) for r in readings]