Files
ems-core/backend/app/services/cost_calculator.py
Du Wenbo 92ec910a13 ems-core v1.0.0: Standard EMS platform core
Shared backend + frontend for multi-customer EMS deployments.
- 12 enterprise modules: quota, cost, charging, maintenance, analysis, etc.
- 120+ API endpoints, 37 database tables
- Customer config mechanism (CUSTOMER env var + YAML config)
- Collectors: Modbus TCP, MQTT, HTTP API, Sungrow iSolarCloud
- Frontend: React 19 + Ant Design + ECharts + Three.js
- Infrastructure: Redis cache, rate limiting, aggregation engine

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 18:14:11 +08:00

262 lines
9.9 KiB
Python

from datetime import datetime, timedelta, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from app.models.pricing import ElectricityPricing, PricingPeriod
from app.models.energy import EnergyData, EnergyDailySummary
from app.models.device import Device
async def get_active_pricing(db: AsyncSession, energy_type: str = "electricity", date: datetime | None = None):
"""获取当前生效的电价配置"""
q = select(ElectricityPricing).where(
and_(
ElectricityPricing.energy_type == energy_type,
ElectricityPricing.is_active == True,
)
)
if date:
q = q.where(
and_(
(ElectricityPricing.effective_from == None) | (ElectricityPricing.effective_from <= date),
(ElectricityPricing.effective_to == None) | (ElectricityPricing.effective_to >= date),
)
)
q = q.order_by(ElectricityPricing.created_at.desc()).limit(1)
result = await db.execute(q)
return result.scalar_one_or_none()
async def get_pricing_periods(db: AsyncSession, pricing_id: int, month: int | None = None):
"""获取电价时段配置"""
q = select(PricingPeriod).where(PricingPeriod.pricing_id == pricing_id)
result = await db.execute(q)
periods = result.scalars().all()
if month is not None:
periods = [p for p in periods if p.applicable_months is None or month in p.applicable_months]
return periods
def get_period_for_hour(periods: list, hour: int) -> PricingPeriod | None:
"""根据小时确定所属时段"""
hour_str = f"{hour:02d}:00"
for p in periods:
start = p.start_time
end = p.end_time
if start <= end:
if start <= hour_str < end:
return p
else: # crosses midnight, e.g. 23:00 - 07:00
if hour_str >= start or hour_str < end:
return p
return periods[0] if periods else None
async def calculate_daily_cost(db: AsyncSession, date: datetime, device_id: int | None = None):
"""计算某天的用电费用"""
pricing = await get_active_pricing(db, "electricity", date)
if not pricing:
return 0.0
if pricing.pricing_type == "flat":
# 平价: 直接查日汇总
q = select(func.sum(EnergyDailySummary.total_consumption)).where(
and_(
EnergyDailySummary.date >= date.replace(hour=0, minute=0, second=0),
EnergyDailySummary.date < date.replace(hour=0, minute=0, second=0) + timedelta(days=1),
EnergyDailySummary.energy_type == "electricity",
)
)
if device_id:
q = q.where(EnergyDailySummary.device_id == device_id)
result = await db.execute(q)
total_energy = result.scalar() or 0.0
periods = await get_pricing_periods(db, pricing.id)
flat_price = periods[0].price_per_unit if periods else 0.0
cost = total_energy * flat_price
else:
# TOU分时: 按小时计算
periods = await get_pricing_periods(db, pricing.id, month=date.month)
if not periods:
return 0.0
cost = 0.0
day_start = date.replace(hour=0, minute=0, second=0, microsecond=0)
for hour in range(24):
hour_start = day_start + timedelta(hours=hour)
hour_end = hour_start + timedelta(hours=1)
q = select(func.sum(EnergyData.value)).where(
and_(
EnergyData.timestamp >= hour_start,
EnergyData.timestamp < hour_end,
EnergyData.data_type == "energy",
)
)
if device_id:
q = q.where(EnergyData.device_id == device_id)
result = await db.execute(q)
hour_energy = result.scalar() or 0.0
period = get_period_for_hour(periods, hour)
if period:
cost += hour_energy * period.price_per_unit
# Update daily summary cost
q = select(EnergyDailySummary).where(
and_(
EnergyDailySummary.date >= date.replace(hour=0, minute=0, second=0),
EnergyDailySummary.date < date.replace(hour=0, minute=0, second=0) + timedelta(days=1),
EnergyDailySummary.energy_type == "electricity",
)
)
if device_id:
q = q.where(EnergyDailySummary.device_id == device_id)
result = await db.execute(q)
for summary in result.scalars().all():
summary.cost = cost
return round(cost, 2)
async def get_cost_summary(
db: AsyncSession, start_date: datetime, end_date: datetime,
group_by: str = "day", energy_type: str = "electricity",
):
"""获取费用汇总"""
q = select(
EnergyDailySummary.date,
func.sum(EnergyDailySummary.total_consumption).label("consumption"),
func.sum(EnergyDailySummary.cost).label("cost"),
).where(
and_(
EnergyDailySummary.date >= start_date,
EnergyDailySummary.date <= end_date,
EnergyDailySummary.energy_type == energy_type,
)
)
if group_by == "device":
q = select(
EnergyDailySummary.device_id,
Device.name.label("device_name"),
func.sum(EnergyDailySummary.total_consumption).label("consumption"),
func.sum(EnergyDailySummary.cost).label("cost"),
).join(Device, EnergyDailySummary.device_id == Device.id, isouter=True).where(
and_(
EnergyDailySummary.date >= start_date,
EnergyDailySummary.date <= end_date,
EnergyDailySummary.energy_type == energy_type,
)
).group_by(EnergyDailySummary.device_id, Device.name)
result = await db.execute(q)
return [
{"device_id": r[0], "device_name": r[1] or f"Device#{r[0]}",
"consumption": round(r[2] or 0, 2), "cost": round(r[3] or 0, 2)}
for r in result.all()
]
elif group_by == "month":
from app.core.config import get_settings
settings = get_settings()
if settings.is_sqlite:
group_expr = func.strftime('%Y-%m', EnergyDailySummary.date).label('period')
else:
group_expr = func.to_char(EnergyDailySummary.date, 'YYYY-MM').label('period')
q = select(
group_expr,
func.sum(EnergyDailySummary.total_consumption).label("consumption"),
func.sum(EnergyDailySummary.cost).label("cost"),
).where(
and_(
EnergyDailySummary.date >= start_date,
EnergyDailySummary.date <= end_date,
EnergyDailySummary.energy_type == energy_type,
)
).group_by(group_expr).order_by(group_expr)
result = await db.execute(q)
return [
{"period": str(r[0]), "consumption": round(r[1] or 0, 2), "cost": round(r[2] or 0, 2)}
for r in result.all()
]
else: # day
q = q.group_by(EnergyDailySummary.date).order_by(EnergyDailySummary.date)
result = await db.execute(q)
return [
{"date": str(r[0]), "consumption": round(r[1] or 0, 2), "cost": round(r[2] or 0, 2)}
for r in result.all()
]
async def get_cost_breakdown(db: AsyncSession, start_date: datetime, end_date: datetime, energy_type: str = "electricity"):
"""获取峰谷平费用分布"""
pricing = await get_active_pricing(db, energy_type, start_date)
if not pricing:
return {"periods": [], "total_cost": 0, "total_consumption": 0}
periods = await get_pricing_periods(db, pricing.id)
if not periods:
return {"periods": [], "total_cost": 0, "total_consumption": 0}
# For each period, calculate the total energy consumption in those hours
breakdown = []
total_cost = 0.0
total_consumption = 0.0
for period in periods:
start_hour = int(period.start_time.split(":")[0])
end_hour = int(period.end_time.split(":")[0])
if start_hour < end_hour:
hours = list(range(start_hour, end_hour))
else: # crosses midnight
hours = list(range(start_hour, 24)) + list(range(0, end_hour))
period_energy = 0.0
current = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end = end_date.replace(hour=23, minute=59, second=59, microsecond=0)
# Sum energy for all matching hours in date range using daily summary approximation
q = select(func.sum(EnergyDailySummary.total_consumption)).where(
and_(
EnergyDailySummary.date >= start_date,
EnergyDailySummary.date <= end_date,
EnergyDailySummary.energy_type == energy_type,
)
)
result = await db.execute(q)
total_daily = result.scalar() or 0.0
# Approximate proportion based on hours in period vs 24h
proportion = len(hours) / 24.0
period_energy = total_daily * proportion
period_cost = period_energy * period.price_per_unit
total_cost += period_cost
total_consumption += period_energy
period_name_map = {
"peak": "尖峰", "sharp": "尖峰",
"high": "高峰", "shoulder": "高峰",
"flat": "平段",
"valley": "低谷", "off_peak": "低谷",
}
breakdown.append({
"period_name": period.period_name,
"period_label": period_name_map.get(period.period_name, period.period_name),
"start_time": period.start_time,
"end_time": period.end_time,
"price_per_unit": period.price_per_unit,
"consumption": round(period_energy, 2),
"cost": round(period_cost, 2),
"proportion": round(proportion * 100, 1),
})
return {
"periods": breakdown,
"total_cost": round(total_cost, 2),
"total_consumption": round(total_consumption, 2),
"pricing_name": pricing.name,
"pricing_type": pricing.pricing_type,
}