"""模拟数据生成器 - 为天普园区设备生成真实感的模拟数据""" import asyncio import random import math from datetime import datetime, timezone from sqlalchemy import select from app.core.database import async_session from app.models.device import Device from app.models.energy import EnergyData from app.models.alarm import AlarmEvent class DataSimulator: def __init__(self): self._task = None self._running = False async def start(self): self._running = True self._task = asyncio.create_task(self._run_loop()) async def stop(self): self._running = False if self._task: self._task.cancel() async def _run_loop(self): while self._running: try: await self._generate_data() except Exception as e: print(f"Simulator error: {e}") await asyncio.sleep(15) # 每15秒生成一次 async def _generate_data(self): now = datetime.now(timezone.utc) hour = (now.hour + 8) % 24 # 北京时间 async with async_session() as session: result = await session.execute(select(Device).where(Device.is_active == True)) devices = result.scalars().all() data_points = [] for device in devices: points = self._generate_device_data(device, now, hour) data_points.extend(points) device.status = "online" device.last_data_time = now if data_points: session.add_all(data_points) await session.commit() def _generate_device_data(self, device: Device, now: datetime, hour: int) -> list[EnergyData]: points = [] if device.device_type == "pv_inverter": points = self._gen_pv_data(device, now, hour) elif device.device_type == "heat_pump": points = self._gen_heatpump_data(device, now, hour) elif device.device_type == "meter": points = self._gen_meter_data(device, now, hour) elif device.device_type == "sensor": points = self._gen_sensor_data(device, now, hour) return points def _gen_pv_data(self, device: Device, now: datetime, hour: int) -> list[EnergyData]: """光伏逆变器数据 - 基于日照模型""" rated = device.rated_power or 110 # kW # 日照模型: 6-18点有发电, 12点最大 if 6 <= hour <= 18: solar_factor = math.sin(math.pi * (hour - 6) / 12) weather_factor = random.uniform(0.6, 1.0) # 天气影响 power = rated * solar_factor * weather_factor * random.uniform(0.85, 0.95) else: power = 0 daily_energy = rated * 4.5 * random.uniform(0.8, 1.1) # 日发电量约4.5等效小时 cumulative_energy = 170 + random.uniform(0, 5) # 累计发电MWh return [ EnergyData(device_id=device.id, timestamp=now, data_type="power", value=round(power, 2), unit="kW"), EnergyData(device_id=device.id, timestamp=now, data_type="daily_energy", value=round(daily_energy * hour / 24, 2), unit="kWh"), EnergyData(device_id=device.id, timestamp=now, data_type="total_energy", value=round(cumulative_energy * 1000, 1), unit="kWh"), EnergyData(device_id=device.id, timestamp=now, data_type="dc_voltage", value=round(random.uniform(250, 800), 1), unit="V"), EnergyData(device_id=device.id, timestamp=now, data_type="ac_voltage", value=round(random.uniform(218, 222), 1), unit="V"), EnergyData(device_id=device.id, timestamp=now, data_type="temperature", value=round(random.uniform(25, 45), 1), unit="℃"), ] def _gen_heatpump_data(self, device: Device, now: datetime, hour: int) -> list[EnergyData]: """热泵机组数据""" # 冬季供暖模式 is_heating_season = now.month in [1, 2, 3, 10, 11, 12] if is_heating_season: outdoor_temp = random.uniform(-5, 10) cop = random.uniform(2.5, 3.8) inlet_temp = random.uniform(35, 42) outlet_temp = inlet_temp + random.uniform(5, 10) power = random.uniform(20, 35) flow_rate = random.uniform(8, 15) else: outdoor_temp = random.uniform(15, 35) cop = random.uniform(3.5, 5.0) inlet_temp = random.uniform(8, 12) outlet_temp = inlet_temp - random.uniform(3, 6) power = random.uniform(15, 28) flow_rate = random.uniform(8, 15) return [ EnergyData(device_id=device.id, timestamp=now, data_type="power", value=round(power, 2), unit="kW"), EnergyData(device_id=device.id, timestamp=now, data_type="cop", value=round(cop, 2), unit=""), EnergyData(device_id=device.id, timestamp=now, data_type="inlet_temp", value=round(inlet_temp, 1), unit="℃"), EnergyData(device_id=device.id, timestamp=now, data_type="outlet_temp", value=round(outlet_temp, 1), unit="℃"), EnergyData(device_id=device.id, timestamp=now, data_type="flow_rate", value=round(flow_rate, 1), unit="m³/h"), EnergyData(device_id=device.id, timestamp=now, data_type="outdoor_temp", value=round(outdoor_temp, 1), unit="℃"), ] def _gen_meter_data(self, device: Device, now: datetime, hour: int) -> list[EnergyData]: """电表数据""" # 负荷曲线: 白天高, 夜间低 base_load = 50 # 基础负荷kW if 8 <= hour <= 18: load_factor = random.uniform(1.2, 2.0) elif 18 <= hour <= 22: load_factor = random.uniform(0.8, 1.3) else: load_factor = random.uniform(0.3, 0.6) power = base_load * load_factor + random.uniform(-5, 5) return [ EnergyData(device_id=device.id, timestamp=now, data_type="power", value=round(power, 2), unit="kW"), EnergyData(device_id=device.id, timestamp=now, data_type="voltage", value=round(random.uniform(218, 225), 1), unit="V"), EnergyData(device_id=device.id, timestamp=now, data_type="current", value=round(power / 0.38 / random.uniform(0.85, 0.95), 1), unit="A"), EnergyData(device_id=device.id, timestamp=now, data_type="power_factor", value=round(random.uniform(0.88, 0.98), 3), unit=""), ] def _gen_sensor_data(self, device: Device, now: datetime, hour: int) -> list[EnergyData]: """温湿度传感器数据""" # 室内温度根据供暖状态 indoor_temp = random.uniform(20, 24) if now.month in [1, 2, 3, 10, 11, 12] else random.uniform(22, 28) humidity = random.uniform(35, 65) return [ EnergyData(device_id=device.id, timestamp=now, data_type="temperature", value=round(indoor_temp, 1), unit="℃"), EnergyData(device_id=device.id, timestamp=now, data_type="humidity", value=round(humidity, 1), unit="%"), ]