Files
tp-ems/backend/app/services/aggregation.py

292 lines
10 KiB
Python
Raw Normal View History

"""Scheduled aggregation engine for energy data rollups.
Computes hourly, daily, and monthly aggregations from raw EnergyData
and populates EnergyDailySummary. Follows the APScheduler pattern
established in report_scheduler.py.
"""
import logging
from datetime import datetime, timedelta, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy import select, func, and_, text, Integer
from app.core.config import get_settings
from app.core.database import async_session
from app.models.energy import EnergyData, EnergyDailySummary
logger = logging.getLogger("aggregation")
_scheduler: AsyncIOScheduler | None = None
async def aggregate_hourly():
"""Aggregate raw energy_data into hourly avg/min/max per device+data_type.
Processes data from the previous hour. Results are logged but not
stored separately the primary use is for cache warming and monitoring.
Daily aggregation (which writes to EnergyDailySummary) is the persistent rollup.
"""
now = datetime.now(timezone.utc)
hour_start = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
hour_end = hour_start + timedelta(hours=1)
logger.info("Running hourly aggregation for %s", hour_start.isoformat())
async with async_session() as session:
settings = get_settings()
query = (
select(
EnergyData.device_id,
EnergyData.data_type,
func.avg(EnergyData.value).label("avg_value"),
func.min(EnergyData.value).label("min_value"),
func.max(EnergyData.value).label("max_value"),
func.count(EnergyData.id).label("sample_count"),
)
.where(
and_(
EnergyData.timestamp >= hour_start,
EnergyData.timestamp < hour_end,
)
)
.group_by(EnergyData.device_id, EnergyData.data_type)
)
result = await session.execute(query)
rows = result.all()
logger.info(
"Hourly aggregation complete: %d device/type groups for %s",
len(rows),
hour_start.isoformat(),
)
return rows
async def aggregate_daily():
"""Compute daily summaries and populate EnergyDailySummary.
Processes yesterday's data. Groups by device_id and maps data_type
to energy_type for the summary table.
"""
now = datetime.now(timezone.utc)
day_start = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
logger.info("Running daily aggregation for %s", day_start.date().isoformat())
# Map data_type -> energy_type for summary grouping
data_type_to_energy_type = {
"power": "electricity",
"energy": "electricity",
"voltage": "electricity",
"current": "electricity",
"heat_power": "heat",
"heat_energy": "heat",
"temperature": "heat",
"water_flow": "water",
"water_volume": "water",
"gas_flow": "gas",
"gas_volume": "gas",
}
async with async_session() as session:
# Fetch per-device aggregated stats for the day
query = (
select(
EnergyData.device_id,
EnergyData.data_type,
func.avg(EnergyData.value).label("avg_value"),
func.min(EnergyData.value).label("min_value"),
func.max(EnergyData.value).label("max_value"),
func.sum(EnergyData.value).label("total_value"),
func.count(EnergyData.id).label("sample_count"),
)
.where(
and_(
EnergyData.timestamp >= day_start,
EnergyData.timestamp < day_end,
)
)
.group_by(EnergyData.device_id, EnergyData.data_type)
)
result = await session.execute(query)
rows = result.all()
# Group results by (device_id, energy_type)
device_summaries: dict[tuple[int, str], dict] = {}
for row in rows:
energy_type = data_type_to_energy_type.get(row.data_type, "electricity")
key = (row.device_id, energy_type)
if key not in device_summaries:
device_summaries[key] = {
"peak_power": None,
"min_power": None,
"avg_power": None,
"total_consumption": 0.0,
"total_generation": 0.0,
"avg_temperature": None,
"sample_count": 0,
}
summary = device_summaries[key]
summary["sample_count"] += row.sample_count
# Power-type metrics
if row.data_type in ("power", "heat_power"):
summary["peak_power"] = max(
summary["peak_power"] or 0, row.max_value or 0
)
summary["min_power"] = min(
summary["min_power"] if summary["min_power"] is not None else float("inf"),
row.min_value or 0,
)
summary["avg_power"] = row.avg_value
# Consumption (energy, volume)
if row.data_type in ("energy", "heat_energy", "water_volume", "gas_volume"):
summary["total_consumption"] += row.total_value or 0
# Temperature
if row.data_type == "temperature":
summary["avg_temperature"] = row.avg_value
# Delete existing summaries for the same date to allow re-runs
await session.execute(
EnergyDailySummary.__table__.delete().where(
EnergyDailySummary.date == day_start
)
)
# Insert new summaries
summaries = []
for (device_id, energy_type), stats in device_summaries.items():
summaries.append(
EnergyDailySummary(
device_id=device_id,
date=day_start,
energy_type=energy_type,
total_consumption=round(stats["total_consumption"], 4),
total_generation=0.0,
peak_power=round(stats["peak_power"], 4) if stats["peak_power"] else None,
min_power=round(stats["min_power"], 4) if stats["min_power"] is not None and stats["min_power"] != float("inf") else None,
avg_power=round(stats["avg_power"], 4) if stats["avg_power"] else None,
avg_temperature=round(stats["avg_temperature"], 2) if stats["avg_temperature"] else None,
)
)
if summaries:
session.add_all(summaries)
await session.commit()
logger.info(
"Daily aggregation complete: %d summaries for %s",
len(summaries) if device_summaries else 0,
day_start.date().isoformat(),
)
async def aggregate_monthly():
"""Compute monthly rollups from EnergyDailySummary.
Aggregates the previous month's daily summaries. Results are logged
for monitoring monthly reports use ReportGenerator for output.
"""
now = datetime.now(timezone.utc)
first_of_current = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
last_month_end = first_of_current - timedelta(days=1)
month_start = last_month_end.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
logger.info(
"Running monthly aggregation for %s-%02d",
month_start.year,
month_start.month,
)
async with async_session() as session:
query = (
select(
EnergyDailySummary.device_id,
EnergyDailySummary.energy_type,
func.sum(EnergyDailySummary.total_consumption).label("total_consumption"),
func.sum(EnergyDailySummary.total_generation).label("total_generation"),
func.max(EnergyDailySummary.peak_power).label("peak_power"),
func.min(EnergyDailySummary.min_power).label("min_power"),
func.avg(EnergyDailySummary.avg_power).label("avg_power"),
func.sum(EnergyDailySummary.operating_hours).label("total_operating_hours"),
func.sum(EnergyDailySummary.cost).label("total_cost"),
func.sum(EnergyDailySummary.carbon_emission).label("total_carbon"),
)
.where(
and_(
EnergyDailySummary.date >= month_start,
EnergyDailySummary.date < first_of_current,
)
)
.group_by(EnergyDailySummary.device_id, EnergyDailySummary.energy_type)
)
result = await session.execute(query)
rows = result.all()
logger.info(
"Monthly aggregation complete: %d device/type groups for %s-%02d",
len(rows),
month_start.year,
month_start.month,
)
return rows
async def start_aggregation_scheduler():
"""Start the APScheduler-based aggregation scheduler."""
global _scheduler
settings = get_settings()
if not settings.AGGREGATION_ENABLED:
logger.info("Aggregation scheduler disabled by config.")
return
if _scheduler and _scheduler.running:
logger.warning("Aggregation scheduler is already running.")
return
_scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
# Hourly aggregation: every hour at :05
_scheduler.add_job(
aggregate_hourly,
CronTrigger(minute=5),
id="aggregate_hourly",
replace_existing=True,
misfire_grace_time=600,
)
# Daily aggregation: every day at 00:30
_scheduler.add_job(
aggregate_daily,
CronTrigger(hour=0, minute=30),
id="aggregate_daily",
replace_existing=True,
misfire_grace_time=3600,
)
# Monthly aggregation: 1st of each month at 01:00
_scheduler.add_job(
aggregate_monthly,
CronTrigger(day=1, hour=1, minute=0),
id="aggregate_monthly",
replace_existing=True,
misfire_grace_time=7200,
)
_scheduler.start()
logger.info("Aggregation scheduler started (hourly @:05, daily @00:30, monthly @1st 01:00).")
async def stop_aggregation_scheduler():
"""Stop the aggregation scheduler gracefully."""
global _scheduler
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
logger.info("Aggregation scheduler stopped.")
_scheduler = None