"""Scheduled aggregation engine for energy data rollups. Computes hourly, daily, and monthly aggregations from raw EnergyData and populates EnergyDailySummary. Follows the APScheduler pattern established in report_scheduler.py. """ import logging from datetime import datetime, timedelta, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from sqlalchemy import select, func, and_, text, Integer from app.core.config import get_settings from app.core.database import async_session from app.models.energy import EnergyData, EnergyDailySummary logger = logging.getLogger("aggregation") _scheduler: AsyncIOScheduler | None = None async def aggregate_hourly(): """Aggregate raw energy_data into hourly avg/min/max per device+data_type. Processes data from the previous hour. Results are logged but not stored separately — the primary use is for cache warming and monitoring. Daily aggregation (which writes to EnergyDailySummary) is the persistent rollup. """ now = datetime.now(timezone.utc) hour_start = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1) hour_end = hour_start + timedelta(hours=1) logger.info("Running hourly aggregation for %s", hour_start.isoformat()) async with async_session() as session: settings = get_settings() query = ( select( EnergyData.device_id, EnergyData.data_type, func.avg(EnergyData.value).label("avg_value"), func.min(EnergyData.value).label("min_value"), func.max(EnergyData.value).label("max_value"), func.count(EnergyData.id).label("sample_count"), ) .where( and_( EnergyData.timestamp >= hour_start, EnergyData.timestamp < hour_end, ) ) .group_by(EnergyData.device_id, EnergyData.data_type) ) result = await session.execute(query) rows = result.all() logger.info( "Hourly aggregation complete: %d device/type groups for %s", len(rows), hour_start.isoformat(), ) return rows async def aggregate_daily(): """Compute daily summaries and populate EnergyDailySummary. Processes yesterday's data. Groups by device_id and maps data_type to energy_type for the summary table. """ now = datetime.now(timezone.utc) day_start = (now - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) day_end = day_start + timedelta(days=1) logger.info("Running daily aggregation for %s", day_start.date().isoformat()) # Map data_type -> energy_type for summary grouping data_type_to_energy_type = { "power": "electricity", "energy": "electricity", "voltage": "electricity", "current": "electricity", "heat_power": "heat", "heat_energy": "heat", "temperature": "heat", "water_flow": "water", "water_volume": "water", "gas_flow": "gas", "gas_volume": "gas", } async with async_session() as session: # Fetch per-device aggregated stats for the day query = ( select( EnergyData.device_id, EnergyData.data_type, func.avg(EnergyData.value).label("avg_value"), func.min(EnergyData.value).label("min_value"), func.max(EnergyData.value).label("max_value"), func.sum(EnergyData.value).label("total_value"), func.count(EnergyData.id).label("sample_count"), ) .where( and_( EnergyData.timestamp >= day_start, EnergyData.timestamp < day_end, ) ) .group_by(EnergyData.device_id, EnergyData.data_type) ) result = await session.execute(query) rows = result.all() # Group results by (device_id, energy_type) device_summaries: dict[tuple[int, str], dict] = {} for row in rows: energy_type = data_type_to_energy_type.get(row.data_type, "electricity") key = (row.device_id, energy_type) if key not in device_summaries: device_summaries[key] = { "peak_power": None, "min_power": None, "avg_power": None, "total_consumption": 0.0, "total_generation": 0.0, "avg_temperature": None, "sample_count": 0, } summary = device_summaries[key] summary["sample_count"] += row.sample_count # Power-type metrics if row.data_type in ("power", "heat_power"): summary["peak_power"] = max( summary["peak_power"] or 0, row.max_value or 0 ) summary["min_power"] = min( summary["min_power"] if summary["min_power"] is not None else float("inf"), row.min_value or 0, ) summary["avg_power"] = row.avg_value # Consumption (energy, volume) if row.data_type in ("energy", "heat_energy", "water_volume", "gas_volume"): summary["total_consumption"] += row.total_value or 0 # Temperature if row.data_type == "temperature": summary["avg_temperature"] = row.avg_value # Delete existing summaries for the same date to allow re-runs await session.execute( EnergyDailySummary.__table__.delete().where( EnergyDailySummary.date == day_start ) ) # Insert new summaries summaries = [] for (device_id, energy_type), stats in device_summaries.items(): summaries.append( EnergyDailySummary( device_id=device_id, date=day_start, energy_type=energy_type, total_consumption=round(stats["total_consumption"], 4), total_generation=0.0, peak_power=round(stats["peak_power"], 4) if stats["peak_power"] else None, min_power=round(stats["min_power"], 4) if stats["min_power"] is not None and stats["min_power"] != float("inf") else None, avg_power=round(stats["avg_power"], 4) if stats["avg_power"] else None, avg_temperature=round(stats["avg_temperature"], 2) if stats["avg_temperature"] else None, ) ) if summaries: session.add_all(summaries) await session.commit() logger.info( "Daily aggregation complete: %d summaries for %s", len(summaries) if device_summaries else 0, day_start.date().isoformat(), ) async def aggregate_monthly(): """Compute monthly rollups from EnergyDailySummary. Aggregates the previous month's daily summaries. Results are logged for monitoring — monthly reports use ReportGenerator for output. """ now = datetime.now(timezone.utc) first_of_current = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) last_month_end = first_of_current - timedelta(days=1) month_start = last_month_end.replace(day=1, hour=0, minute=0, second=0, microsecond=0) logger.info( "Running monthly aggregation for %s-%02d", month_start.year, month_start.month, ) async with async_session() as session: query = ( select( EnergyDailySummary.device_id, EnergyDailySummary.energy_type, func.sum(EnergyDailySummary.total_consumption).label("total_consumption"), func.sum(EnergyDailySummary.total_generation).label("total_generation"), func.max(EnergyDailySummary.peak_power).label("peak_power"), func.min(EnergyDailySummary.min_power).label("min_power"), func.avg(EnergyDailySummary.avg_power).label("avg_power"), func.sum(EnergyDailySummary.operating_hours).label("total_operating_hours"), func.sum(EnergyDailySummary.cost).label("total_cost"), func.sum(EnergyDailySummary.carbon_emission).label("total_carbon"), ) .where( and_( EnergyDailySummary.date >= month_start, EnergyDailySummary.date < first_of_current, ) ) .group_by(EnergyDailySummary.device_id, EnergyDailySummary.energy_type) ) result = await session.execute(query) rows = result.all() logger.info( "Monthly aggregation complete: %d device/type groups for %s-%02d", len(rows), month_start.year, month_start.month, ) return rows async def start_aggregation_scheduler(): """Start the APScheduler-based aggregation scheduler.""" global _scheduler settings = get_settings() if not settings.AGGREGATION_ENABLED: logger.info("Aggregation scheduler disabled by config.") return if _scheduler and _scheduler.running: logger.warning("Aggregation scheduler is already running.") return _scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") # Hourly aggregation: every hour at :05 _scheduler.add_job( aggregate_hourly, CronTrigger(minute=5), id="aggregate_hourly", replace_existing=True, misfire_grace_time=600, ) # Daily aggregation: every day at 00:30 _scheduler.add_job( aggregate_daily, CronTrigger(hour=0, minute=30), id="aggregate_daily", replace_existing=True, misfire_grace_time=3600, ) # Monthly aggregation: 1st of each month at 01:00 _scheduler.add_job( aggregate_monthly, CronTrigger(day=1, hour=1, minute=0), id="aggregate_monthly", replace_existing=True, misfire_grace_time=7200, ) _scheduler.start() logger.info("Aggregation scheduler started (hourly @:05, daily @00:30, monthly @1st 01:00).") async def stop_aggregation_scheduler(): """Stop the aggregation scheduler gracefully.""" global _scheduler if _scheduler and _scheduler.running: _scheduler.shutdown(wait=False) logger.info("Aggregation scheduler stopped.") _scheduler = None