Files
ems-core/backend/app/services/aggregation.py
Du Wenbo 92ec910a13 ems-core v1.0.0: Standard EMS platform core
Shared backend + frontend for multi-customer EMS deployments.
- 12 enterprise modules: quota, cost, charging, maintenance, analysis, etc.
- 120+ API endpoints, 37 database tables
- Customer config mechanism (CUSTOMER env var + YAML config)
- Collectors: Modbus TCP, MQTT, HTTP API, Sungrow iSolarCloud
- Frontend: React 19 + Ant Design + ECharts + Three.js
- Infrastructure: Redis cache, rate limiting, aggregation engine

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 18:14:11 +08:00

292 lines
10 KiB
Python

"""Scheduled aggregation engine for energy data rollups.
Computes hourly, daily, and monthly aggregations from raw EnergyData
and populates EnergyDailySummary. Follows the APScheduler pattern
established in report_scheduler.py.
"""
import logging
from datetime import datetime, timedelta, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select, func, and_, text, Integer
from app.core.config import get_settings
from app.core.database import async_session
from app.models.energy import EnergyData, EnergyDailySummary
logger = logging.getLogger("aggregation")
_scheduler: AsyncIOScheduler | None = None
async def aggregate_hourly():
"""Aggregate raw energy_data into hourly avg/min/max per device+data_type.
Processes data from the previous hour. Results are logged but not
stored separately — the primary use is for cache warming and monitoring.
Daily aggregation (which writes to EnergyDailySummary) is the persistent rollup.
"""
now = datetime.now(timezone.utc)
hour_start = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
hour_end = hour_start + timedelta(hours=1)
logger.info("Running hourly aggregation for %s", hour_start.isoformat())
async with async_session() as session:
settings = get_settings()
query = (
select(
EnergyData.device_id,
EnergyData.data_type,
func.avg(EnergyData.value).label("avg_value"),
func.min(EnergyData.value).label("min_value"),
func.max(EnergyData.value).label("max_value"),
func.count(EnergyData.id).label("sample_count"),
)
.where(
and_(
EnergyData.timestamp >= hour_start,
EnergyData.timestamp < hour_end,
)
)
.group_by(EnergyData.device_id, EnergyData.data_type)
)
result = await session.execute(query)
rows = result.all()
logger.info(
"Hourly aggregation complete: %d device/type groups for %s",
len(rows),
hour_start.isoformat(),
)
return rows
async def aggregate_daily():
"""Compute daily summaries and populate EnergyDailySummary.
Processes yesterday's data. Groups by device_id and maps data_type
to energy_type for the summary table.
"""
now = datetime.now(timezone.utc)
day_start = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
logger.info("Running daily aggregation for %s", day_start.date().isoformat())
# Map data_type -> energy_type for summary grouping
data_type_to_energy_type = {
"power": "electricity",
"energy": "electricity",
"voltage": "electricity",
"current": "electricity",
"heat_power": "heat",
"heat_energy": "heat",
"temperature": "heat",
"water_flow": "water",
"water_volume": "water",
"gas_flow": "gas",
"gas_volume": "gas",
}
async with async_session() as session:
# Fetch per-device aggregated stats for the day
query = (
select(
EnergyData.device_id,
EnergyData.data_type,
func.avg(EnergyData.value).label("avg_value"),
func.min(EnergyData.value).label("min_value"),
func.max(EnergyData.value).label("max_value"),
func.sum(EnergyData.value).label("total_value"),
func.count(EnergyData.id).label("sample_count"),
)
.where(
and_(
EnergyData.timestamp >= day_start,
EnergyData.timestamp < day_end,
)
)
.group_by(EnergyData.device_id, EnergyData.data_type)
)
result = await session.execute(query)
rows = result.all()
# Group results by (device_id, energy_type)
device_summaries: dict[tuple[int, str], dict] = {}
for row in rows:
energy_type = data_type_to_energy_type.get(row.data_type, "electricity")
key = (row.device_id, energy_type)
if key not in device_summaries:
device_summaries[key] = {
"peak_power": None,
"min_power": None,
"avg_power": None,
"total_consumption": 0.0,
"total_generation": 0.0,
"avg_temperature": None,
"sample_count": 0,
}
summary = device_summaries[key]
summary["sample_count"] += row.sample_count
# Power-type metrics
if row.data_type in ("power", "heat_power"):
summary["peak_power"] = max(
summary["peak_power"] or 0, row.max_value or 0
)
summary["min_power"] = min(
summary["min_power"] if summary["min_power"] is not None else float("inf"),
row.min_value or 0,
)
summary["avg_power"] = row.avg_value
# Consumption (energy, volume)
if row.data_type in ("energy", "heat_energy", "water_volume", "gas_volume"):
summary["total_consumption"] += row.total_value or 0
# Temperature
if row.data_type == "temperature":
summary["avg_temperature"] = row.avg_value
# Delete existing summaries for the same date to allow re-runs
await session.execute(
EnergyDailySummary.__table__.delete().where(
EnergyDailySummary.date == day_start
)
)
# Insert new summaries
summaries = []
for (device_id, energy_type), stats in device_summaries.items():
summaries.append(
EnergyDailySummary(
device_id=device_id,
date=day_start,
energy_type=energy_type,
total_consumption=round(stats["total_consumption"], 4),
total_generation=0.0,
peak_power=round(stats["peak_power"], 4) if stats["peak_power"] else None,
min_power=round(stats["min_power"], 4) if stats["min_power"] is not None and stats["min_power"] != float("inf") else None,
avg_power=round(stats["avg_power"], 4) if stats["avg_power"] else None,
avg_temperature=round(stats["avg_temperature"], 2) if stats["avg_temperature"] else None,
)
)
if summaries:
session.add_all(summaries)
await session.commit()
logger.info(
"Daily aggregation complete: %d summaries for %s",
len(summaries) if device_summaries else 0,
day_start.date().isoformat(),
)
async def aggregate_monthly():
"""Compute monthly rollups from EnergyDailySummary.
Aggregates the previous month's daily summaries. Results are logged
for monitoring — monthly reports use ReportGenerator for output.
"""
now = datetime.now(timezone.utc)
first_of_current = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
last_month_end = first_of_current - timedelta(days=1)
month_start = last_month_end.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
logger.info(
"Running monthly aggregation for %s-%02d",
month_start.year,
month_start.month,
)
async with async_session() as session:
query = (
select(
EnergyDailySummary.device_id,
EnergyDailySummary.energy_type,
func.sum(EnergyDailySummary.total_consumption).label("total_consumption"),
func.sum(EnergyDailySummary.total_generation).label("total_generation"),
func.max(EnergyDailySummary.peak_power).label("peak_power"),
func.min(EnergyDailySummary.min_power).label("min_power"),
func.avg(EnergyDailySummary.avg_power).label("avg_power"),
func.sum(EnergyDailySummary.operating_hours).label("total_operating_hours"),
func.sum(EnergyDailySummary.cost).label("total_cost"),
func.sum(EnergyDailySummary.carbon_emission).label("total_carbon"),
)
.where(
and_(
EnergyDailySummary.date >= month_start,
EnergyDailySummary.date < first_of_current,
)
)
.group_by(EnergyDailySummary.device_id, EnergyDailySummary.energy_type)
)
result = await session.execute(query)
rows = result.all()
logger.info(
"Monthly aggregation complete: %d device/type groups for %s-%02d",
len(rows),
month_start.year,
month_start.month,
)
return rows
async def start_aggregation_scheduler():
"""Start the APScheduler-based aggregation scheduler."""
global _scheduler
settings = get_settings()
if not settings.AGGREGATION_ENABLED:
logger.info("Aggregation scheduler disabled by config.")
return
if _scheduler and _scheduler.running:
logger.warning("Aggregation scheduler is already running.")
return
_scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
# Hourly aggregation: every hour at :05
_scheduler.add_job(
aggregate_hourly,
CronTrigger(minute=5),
id="aggregate_hourly",
replace_existing=True,
misfire_grace_time=600,
)
# Daily aggregation: every day at 00:30
_scheduler.add_job(
aggregate_daily,
CronTrigger(hour=0, minute=30),
id="aggregate_daily",
replace_existing=True,
misfire_grace_time=3600,
)
# Monthly aggregation: 1st of each month at 01:00
_scheduler.add_job(
aggregate_monthly,
CronTrigger(day=1, hour=1, minute=0),
id="aggregate_monthly",
replace_existing=True,
misfire_grace_time=7200,
)
_scheduler.start()
logger.info("Aggregation scheduler started (hourly @:05, daily @00:30, monthly @1st 01:00).")
async def stop_aggregation_scheduler():
"""Stop the aggregation scheduler gracefully."""
global _scheduler
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
logger.info("Aggregation scheduler stopped.")
_scheduler = None