fix: carbon fallback, energy history parsing, generation dedup (v1.4.2)
- Carbon overview: fallback to compute from energy_data × emission_factors when carbon_emissions table is empty - Energy history: parse start_time/end_time as datetime (was raw string → 500) - Dashboard generation: dedup by station prefix to prevent inflated totals (93K → 14.8K kWh) - Realtime window: already at 20min to cover 15min collector interval Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -52,7 +52,10 @@ class ReportGenerate(BaseModel):
|
||||
|
||||
@router.get("/overview")
|
||||
async def carbon_overview(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
||||
"""碳排放总览"""
|
||||
"""碳排放总览 - 优先从carbon_emissions表读取,为空时从energy_data实时计算"""
|
||||
from app.models.energy import EnergyData
|
||||
from app.models.device import Device
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||||
@@ -70,6 +73,52 @@ async def carbon_overview(db: AsyncSession = Depends(get_db), user: User = Depen
|
||||
month = await sum_carbon(month_start, now)
|
||||
year = await sum_carbon(year_start, now)
|
||||
|
||||
# Fallback: if carbon_emissions is empty, compute reduction from PV generation
|
||||
has_carbon_data = (today["emission"] + today["reduction"] +
|
||||
month["emission"] + month["reduction"] +
|
||||
year["emission"] + year["reduction"]) > 0
|
||||
|
||||
if not has_carbon_data:
|
||||
# Get grid emission factor (华北电网 0.582 kgCO2/kWh)
|
||||
factor_q = await db.execute(
|
||||
select(EmissionFactor.factor).where(
|
||||
EmissionFactor.energy_type == "electricity"
|
||||
).order_by(EmissionFactor.id).limit(1)
|
||||
)
|
||||
grid_factor = factor_q.scalar() or 0.582 # default fallback
|
||||
|
||||
# Compute PV generation from energy_data using latest daily_energy per station
|
||||
# Device names like AP1xx belong to station 1, AP2xx to station 2
|
||||
# To avoid double-counting station-level data written to multiple devices,
|
||||
# we group by station prefix (first 3 chars of device name) and take MAX
|
||||
async def compute_pv_reduction(start, end):
|
||||
q = await db.execute(
|
||||
select(
|
||||
func.substring(Device.name, text("1"), text("3")).label("station"),
|
||||
func.max(EnergyData.value).label("max_energy"),
|
||||
).select_from(EnergyData).join(
|
||||
Device, EnergyData.device_id == Device.id
|
||||
).where(
|
||||
and_(
|
||||
EnergyData.timestamp >= start,
|
||||
EnergyData.timestamp < end,
|
||||
EnergyData.data_type == "daily_energy",
|
||||
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
|
||||
)
|
||||
).group_by(text("station"))
|
||||
)
|
||||
total_kwh = sum(row[1] or 0 for row in q.all())
|
||||
# Carbon reduction (kg CO2) = generation (kWh) * grid emission factor
|
||||
return round(total_kwh * grid_factor / 1000, 4) # convert to tons
|
||||
|
||||
today_reduction = await compute_pv_reduction(today_start, now)
|
||||
month_reduction = await compute_pv_reduction(month_start, now)
|
||||
year_reduction = await compute_pv_reduction(year_start, now)
|
||||
|
||||
today = {"emission": 0, "reduction": today_reduction}
|
||||
month = {"emission": 0, "reduction": month_reduction}
|
||||
year = {"emission": 0, "reduction": year_reduction}
|
||||
|
||||
# 各scope分布
|
||||
scope_q = await db.execute(
|
||||
select(CarbonEmission.scope, func.sum(CarbonEmission.emission))
|
||||
|
||||
@@ -40,28 +40,24 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
|
||||
|
||||
# Fallback: if daily summary is empty, compute from raw energy_data
|
||||
if not energy_summary:
|
||||
from sqlalchemy import distinct
|
||||
fallback_q = await db.execute(
|
||||
select(
|
||||
func.sum(EnergyData.value),
|
||||
).where(
|
||||
and_(
|
||||
EnergyData.timestamp >= today_start,
|
||||
EnergyData.data_type == "daily_energy",
|
||||
)
|
||||
).group_by(EnergyData.device_id).order_by(EnergyData.device_id)
|
||||
)
|
||||
# Get the latest daily_energy per device (avoid double-counting)
|
||||
# Get the latest daily_energy per station (avoid double-counting).
|
||||
# The collector writes station-level daily_energy to individual device rows,
|
||||
# so multiple devices from the same station share the same value.
|
||||
# Group by station prefix (first 3 chars of device name, e.g. "AP1", "AP2")
|
||||
# and take MAX per station to deduplicate.
|
||||
latest_energy_q = await db.execute(
|
||||
select(
|
||||
EnergyData.device_id,
|
||||
func.substring(Device.name, text("1"), text("3")).label("station"),
|
||||
func.max(EnergyData.value).label("max_energy"),
|
||||
).select_from(EnergyData).join(
|
||||
Device, EnergyData.device_id == Device.id
|
||||
).where(
|
||||
and_(
|
||||
EnergyData.timestamp >= today_start,
|
||||
EnergyData.data_type == "daily_energy",
|
||||
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
|
||||
)
|
||||
).group_by(EnergyData.device_id)
|
||||
).group_by(text("station"))
|
||||
)
|
||||
total_gen = sum(row[1] or 0 for row in latest_energy_q.all())
|
||||
if total_gen > 0:
|
||||
@@ -111,7 +107,7 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
|
||||
async def get_realtime_data(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
|
||||
"""实时功率数据 - 获取最近的采集数据"""
|
||||
now = datetime.now(timezone.utc)
|
||||
five_min_ago = now - timedelta(minutes=5)
|
||||
five_min_ago = now - timedelta(minutes=20)
|
||||
|
||||
latest_q = await db.execute(
|
||||
select(EnergyData).where(
|
||||
|
||||
@@ -32,13 +32,27 @@ async def query_history(
|
||||
user: User = Depends(get_current_user),
|
||||
):
|
||||
"""历史数据查询"""
|
||||
# Parse time strings to datetime for proper PostgreSQL timestamp comparison
|
||||
start_dt = None
|
||||
end_dt = None
|
||||
if start_time:
|
||||
try:
|
||||
start_dt = datetime.fromisoformat(start_time)
|
||||
except ValueError:
|
||||
start_dt = datetime.strptime(start_time, "%Y-%m-%d")
|
||||
if end_time:
|
||||
try:
|
||||
end_dt = datetime.fromisoformat(end_time)
|
||||
except ValueError:
|
||||
end_dt = datetime.strptime(end_time, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
|
||||
|
||||
query = select(EnergyData).where(EnergyData.data_type == data_type)
|
||||
if device_id:
|
||||
query = query.where(EnergyData.device_id == device_id)
|
||||
if start_time:
|
||||
query = query.where(EnergyData.timestamp >= start_time)
|
||||
if end_time:
|
||||
query = query.where(EnergyData.timestamp <= end_time)
|
||||
if start_dt:
|
||||
query = query.where(EnergyData.timestamp >= start_dt)
|
||||
if end_dt:
|
||||
query = query.where(EnergyData.timestamp <= end_dt)
|
||||
|
||||
if granularity == "raw":
|
||||
query = query.order_by(EnergyData.timestamp.desc()).offset((page - 1) * page_size).limit(page_size)
|
||||
@@ -74,10 +88,10 @@ async def query_history(
|
||||
).where(EnergyData.data_type == data_type)
|
||||
if device_id:
|
||||
agg_query = agg_query.where(EnergyData.device_id == device_id)
|
||||
if start_time:
|
||||
agg_query = agg_query.where(EnergyData.timestamp >= start_time)
|
||||
if end_time:
|
||||
agg_query = agg_query.where(EnergyData.timestamp <= end_time)
|
||||
if start_dt:
|
||||
agg_query = agg_query.where(EnergyData.timestamp >= start_dt)
|
||||
if end_dt:
|
||||
agg_query = agg_query.where(EnergyData.timestamp <= end_dt)
|
||||
agg_query = agg_query.group_by(text('time_bucket')).order_by(text('time_bucket'))
|
||||
result = await db.execute(agg_query)
|
||||
return [{"time": str(r[0]), "avg": round(r[1], 2), "max": round(r[2], 2), "min": round(r[3], 2)}
|
||||
|
||||
Reference in New Issue
Block a user