Files
zpark-ems/core/backend/app/api/v1/dashboard.py
Du Wenbo 7947a230c4 fix: realtime + KPI power dedup by station prefix (v1.6.3)
Sync core fixes: realtime and KPI endpoints now GROUP BY station
prefix to prevent double-counting. Matches iSolarCloud within 5%.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 14:19:00 +08:00

212 lines
7.9 KiB
Python

from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, text, case, literal_column
from app.core.database import get_db
from app.core.config import get_settings
from app.core.deps import get_current_user
from app.models.device import Device
from app.models.energy import EnergyData, EnergyDailySummary
from app.models.alarm import AlarmEvent
from app.models.carbon import CarbonEmission
from app.models.user import User
router = APIRouter(prefix="/dashboard", tags=["大屏数据"])
@router.get("/overview")
async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""能源总览大屏核心数据"""
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# 设备状态统计
device_stats_q = await db.execute(
select(Device.status, func.count(Device.id)).where(Device.is_active == True).group_by(Device.status)
)
device_stats = {row[0]: row[1] for row in device_stats_q.all()}
# 今日能耗汇总 (from daily summary table)
daily_q = await db.execute(
select(
EnergyDailySummary.energy_type,
func.sum(EnergyDailySummary.total_consumption),
func.sum(EnergyDailySummary.total_generation),
).where(EnergyDailySummary.date >= today_start).group_by(EnergyDailySummary.energy_type)
)
energy_summary = {}
for row in daily_q.all():
energy_summary[row[0]] = {"consumption": row[1] or 0, "generation": row[2] or 0}
# Fallback: if daily summary is empty, compute from raw energy_data
if not energy_summary:
# Get the latest daily_energy per station (avoid double-counting).
# The collector writes station-level daily_energy to individual device rows,
# so multiple devices from the same station share the same value.
# Group by station prefix (first 3 chars of device name, e.g. "AP1", "AP2")
# and take MAX per station to deduplicate.
latest_energy_q = await db.execute(
select(
func.substring(Device.name, text("1"), text("3")).label("station"),
func.max(EnergyData.value).label("max_energy"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
)
).group_by(text("station"))
)
total_gen = sum(row[1] or 0 for row in latest_energy_q.all())
if total_gen > 0:
energy_summary["electricity"] = {"consumption": 0, "generation": round(total_gen, 2)}
# 今日碳排放
carbon_q = await db.execute(
select(func.sum(CarbonEmission.emission), func.sum(CarbonEmission.reduction))
.where(CarbonEmission.date >= today_start)
)
carbon_row = carbon_q.first()
# 活跃告警数
alarm_count_q = await db.execute(
select(func.count(AlarmEvent.id)).where(AlarmEvent.status == "active")
)
active_alarms = alarm_count_q.scalar() or 0
# 最近告警
recent_alarms_q = await db.execute(
select(AlarmEvent).where(AlarmEvent.status == "active").order_by(AlarmEvent.triggered_at.desc()).limit(10)
)
recent_alarms = [
{"id": a.id, "title": a.title, "severity": a.severity, "device_id": a.device_id,
"triggered_at": str(a.triggered_at)}
for a in recent_alarms_q.scalars().all()
]
return {
"device_stats": {
"online": device_stats.get("online", 0),
"offline": device_stats.get("offline", 0),
"alarm": device_stats.get("alarm", 0),
"total": sum(device_stats.values()),
},
"energy_today": energy_summary,
"carbon": {
"emission": carbon_row[0] or 0 if carbon_row else 0,
"reduction": carbon_row[1] or 0 if carbon_row else 0,
},
"active_alarms": active_alarms,
"recent_alarms": recent_alarms,
}
@router.get("/realtime")
async def get_realtime_data(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""实时功率数据 - 获取最近的采集数据,按站去重防止重复计数"""
now = datetime.now(timezone.utc)
window_start = now - timedelta(minutes=20)
# Get latest power per station (dedup by device name prefix)
# Sungrow collectors report station-level power, so multiple devices
# sharing the same station (AP1xx = Phase 1, AP2xx = Phase 2) report
# identical values. GROUP BY station prefix and take MAX to avoid
# double-counting.
from sqlalchemy import text as sa_text
pv_ids = await _get_pv_device_ids(db)
hp_ids = await _get_hp_device_ids(db)
# PV power: dedup by station prefix
if pv_ids:
pv_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.device_id.in_(pv_ids),
)
).group_by(sa_text("1"))
)
pv_power = sum(row[1] or 0 for row in pv_q.all())
else:
pv_power = 0
# Heat pump power: dedup by station prefix
if hp_ids:
hp_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.device_id.in_(hp_ids),
)
).group_by(sa_text("1"))
)
heatpump_power = sum(row[1] or 0 for row in hp_q.all())
else:
heatpump_power = 0
return {
"timestamp": str(now),
"pv_power": round(pv_power, 2),
"heatpump_power": round(heatpump_power, 2),
"total_load": round(pv_power + heatpump_power, 2),
"grid_power": round(max(0, heatpump_power - pv_power), 2),
}
@router.get("/load-curve")
async def get_load_curve(
hours: int = 24,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""负荷曲线数据"""
now = datetime.now(timezone.utc)
start = now - timedelta(hours=hours)
settings = get_settings()
if settings.is_sqlite:
hour_expr = func.strftime('%Y-%m-%d %H:00:00', EnergyData.timestamp).label('hour')
else:
hour_expr = func.date_trunc('hour', EnergyData.timestamp).label('hour')
result = await db.execute(
select(
hour_expr,
func.avg(EnergyData.value).label('avg_power'),
).where(
and_(EnergyData.timestamp >= start, EnergyData.data_type == "power")
).group_by(text('hour')).order_by(text('hour'))
)
return [{"time": str(row[0]), "power": round(row[1], 2)} for row in result.all()]
async def _get_pv_device_ids(db: AsyncSession):
result = await db.execute(
select(Device.id).where(
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
Device.is_active == True,
)
)
return [r[0] for r in result.fetchall()]
async def _get_hp_device_ids(db: AsyncSession):
result = await db.execute(
select(Device.id).where(Device.device_type == "heat_pump", Device.is_active == True)
)
return [r[0] for r in result.fetchall()]