diff --git a/backend/app/api/v1/carbon.py b/backend/app/api/v1/carbon.py index 1a780fb..d89b40a 100644 --- a/backend/app/api/v1/carbon.py +++ b/backend/app/api/v1/carbon.py @@ -52,7 +52,10 @@ class ReportGenerate(BaseModel): @router.get("/overview") async def carbon_overview(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): - """碳排放总览""" + """碳排放总览 - 优先从carbon_emissions表读取,为空时从energy_data实时计算""" + from app.models.energy import EnergyData + from app.models.device import Device + now = datetime.now(timezone.utc) today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) @@ -70,6 +73,52 @@ async def carbon_overview(db: AsyncSession = Depends(get_db), user: User = Depen month = await sum_carbon(month_start, now) year = await sum_carbon(year_start, now) + # Fallback: if carbon_emissions is empty, compute reduction from PV generation + has_carbon_data = (today["emission"] + today["reduction"] + + month["emission"] + month["reduction"] + + year["emission"] + year["reduction"]) > 0 + + if not has_carbon_data: + # Get grid emission factor (华北电网 0.582 kgCO2/kWh) + factor_q = await db.execute( + select(EmissionFactor.factor).where( + EmissionFactor.energy_type == "electricity" + ).order_by(EmissionFactor.id).limit(1) + ) + grid_factor = factor_q.scalar() or 0.582 # default fallback + + # Compute PV generation from energy_data using latest daily_energy per station + # Device names like AP1xx belong to station 1, AP2xx to station 2 + # To avoid double-counting station-level data written to multiple devices, + # we group by station prefix (first 3 chars of device name) and take MAX + async def compute_pv_reduction(start, end): + q = await db.execute( + select( + func.substring(Device.name, text("1"), text("3")).label("station"), + func.max(EnergyData.value).label("max_energy"), + ).select_from(EnergyData).join( + Device, EnergyData.device_id == Device.id + ).where( + and_( + EnergyData.timestamp >= start, + EnergyData.timestamp < end, + EnergyData.data_type == "daily_energy", + Device.device_type.in_(["pv_inverter", "sungrow_inverter"]), + ) + ).group_by(text("station")) + ) + total_kwh = sum(row[1] or 0 for row in q.all()) + # Carbon reduction (kg CO2) = generation (kWh) * grid emission factor + return round(total_kwh * grid_factor / 1000, 4) # convert to tons + + today_reduction = await compute_pv_reduction(today_start, now) + month_reduction = await compute_pv_reduction(month_start, now) + year_reduction = await compute_pv_reduction(year_start, now) + + today = {"emission": 0, "reduction": today_reduction} + month = {"emission": 0, "reduction": month_reduction} + year = {"emission": 0, "reduction": year_reduction} + # 各scope分布 scope_q = await db.execute( select(CarbonEmission.scope, func.sum(CarbonEmission.emission)) diff --git a/backend/app/api/v1/dashboard.py b/backend/app/api/v1/dashboard.py index 0df1e3d..350a686 100644 --- a/backend/app/api/v1/dashboard.py +++ b/backend/app/api/v1/dashboard.py @@ -40,28 +40,24 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends( # Fallback: if daily summary is empty, compute from raw energy_data if not energy_summary: - from sqlalchemy import distinct - fallback_q = await db.execute( - select( - func.sum(EnergyData.value), - ).where( - and_( - EnergyData.timestamp >= today_start, - EnergyData.data_type == "daily_energy", - ) - ).group_by(EnergyData.device_id).order_by(EnergyData.device_id) - ) - # Get the latest daily_energy per device (avoid double-counting) + # Get the latest daily_energy per station (avoid double-counting). + # The collector writes station-level daily_energy to individual device rows, + # so multiple devices from the same station share the same value. + # Group by station prefix (first 3 chars of device name, e.g. "AP1", "AP2") + # and take MAX per station to deduplicate. latest_energy_q = await db.execute( select( - EnergyData.device_id, + func.substring(Device.name, text("1"), text("3")).label("station"), func.max(EnergyData.value).label("max_energy"), + ).select_from(EnergyData).join( + Device, EnergyData.device_id == Device.id ).where( and_( EnergyData.timestamp >= today_start, EnergyData.data_type == "daily_energy", + Device.device_type.in_(["pv_inverter", "sungrow_inverter"]), ) - ).group_by(EnergyData.device_id) + ).group_by(text("station")) ) total_gen = sum(row[1] or 0 for row in latest_energy_q.all()) if total_gen > 0: @@ -111,7 +107,7 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends( async def get_realtime_data(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): """实时功率数据 - 获取最近的采集数据""" now = datetime.now(timezone.utc) - five_min_ago = now - timedelta(minutes=5) + five_min_ago = now - timedelta(minutes=20) latest_q = await db.execute( select(EnergyData).where( diff --git a/backend/app/api/v1/energy.py b/backend/app/api/v1/energy.py index 21d345d..1f4592b 100644 --- a/backend/app/api/v1/energy.py +++ b/backend/app/api/v1/energy.py @@ -32,13 +32,27 @@ async def query_history( user: User = Depends(get_current_user), ): """历史数据查询""" + # Parse time strings to datetime for proper PostgreSQL timestamp comparison + start_dt = None + end_dt = None + if start_time: + try: + start_dt = datetime.fromisoformat(start_time) + except ValueError: + start_dt = datetime.strptime(start_time, "%Y-%m-%d") + if end_time: + try: + end_dt = datetime.fromisoformat(end_time) + except ValueError: + end_dt = datetime.strptime(end_time, "%Y-%m-%d").replace(hour=23, minute=59, second=59) + query = select(EnergyData).where(EnergyData.data_type == data_type) if device_id: query = query.where(EnergyData.device_id == device_id) - if start_time: - query = query.where(EnergyData.timestamp >= start_time) - if end_time: - query = query.where(EnergyData.timestamp <= end_time) + if start_dt: + query = query.where(EnergyData.timestamp >= start_dt) + if end_dt: + query = query.where(EnergyData.timestamp <= end_dt) if granularity == "raw": query = query.order_by(EnergyData.timestamp.desc()).offset((page - 1) * page_size).limit(page_size) @@ -74,10 +88,10 @@ async def query_history( ).where(EnergyData.data_type == data_type) if device_id: agg_query = agg_query.where(EnergyData.device_id == device_id) - if start_time: - agg_query = agg_query.where(EnergyData.timestamp >= start_time) - if end_time: - agg_query = agg_query.where(EnergyData.timestamp <= end_time) + if start_dt: + agg_query = agg_query.where(EnergyData.timestamp >= start_dt) + if end_dt: + agg_query = agg_query.where(EnergyData.timestamp <= end_dt) agg_query = agg_query.group_by(text('time_bucket')).order_by(text('time_bucket')) result = await db.execute(agg_query) return [{"time": str(r[0]), "avg": round(r[1], 2), "max": round(r[2], 2), "min": round(r[3], 2)}