"""Beijing weather and solar position models for realistic data simulation. Shared by both the real-time simulator and the backfill script. Deterministic when given a seed — call set_seed() for reproducible backfills. Tianpu campus: 39.9N, 116.4E (Beijing / Daxing district) """ import math import random from datetime import datetime, timezone, timedelta # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- BEIJING_LAT = 39.9 # degrees N BEIJING_LON = 116.4 # degrees E BEIJING_TZ_OFFSET = 8 # UTC+8 # Monthly average temperatures for Beijing (index 0 = Jan) # Source: typical climatological data MONTHLY_AVG_TEMP = [-3.0, 0.0, 7.0, 14.0, 21.0, 26.0, 27.5, 26.0, 21.0, 13.0, 4.0, -1.5] # Diurnal temperature swing amplitude by month (half-range) MONTHLY_DIURNAL_SWING = [6.0, 7.0, 7.5, 8.0, 7.5, 7.0, 6.0, 6.0, 7.0, 7.5, 7.0, 6.0] # Monthly average relative humidity (%) MONTHLY_AVG_HUMIDITY = [44, 42, 38, 38, 45, 58, 72, 75, 62, 55, 50, 46] # Sunrise/sunset hours (approximate, Beijing local time) by month MONTHLY_SUNRISE = [7.5, 7.1, 6.4, 5.7, 5.2, 5.0, 5.1, 5.5, 6.0, 6.3, 6.8, 7.3] MONTHLY_SUNSET = [17.1, 17.6, 18.2, 18.7, 19.2, 19.5, 19.4, 19.0, 18.3, 17.6, 17.0, 16.9] # Solar declination approximation (degrees) for day-of-year # and equation of time are computed analytically below _rng = random.Random() def set_seed(seed: int): """Set the random seed for reproducible data generation.""" global _rng _rng = random.Random(seed) def _gauss(mu: float, sigma: float) -> float: return _rng.gauss(mu, sigma) def _uniform(a: float, b: float) -> float: return _rng.uniform(a, b) def _random() -> float: return _rng.random() # --------------------------------------------------------------------------- # Solar position (simplified but accurate enough for simulation) # --------------------------------------------------------------------------- def _day_of_year(dt: datetime) -> int: """Day of year 1-366.""" beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt return beijing_dt.timetuple().tm_yday def solar_declination(day_of_year: int) -> float: """Solar declination in degrees.""" return 23.45 * math.sin(math.radians((360 / 365) * (day_of_year - 81))) def _equation_of_time(day_of_year: int) -> float: """Equation of time in minutes.""" b = math.radians((360 / 365) * (day_of_year - 81)) return 9.87 * math.sin(2 * b) - 7.53 * math.cos(b) - 1.5 * math.sin(b) def solar_altitude(dt: datetime) -> float: """Solar altitude angle in degrees for Beijing at given UTC datetime. Returns negative values when sun is below horizon. """ doy = _day_of_year(dt) decl = math.radians(solar_declination(doy)) lat = math.radians(BEIJING_LAT) # Local solar time beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt # Standard meridian for UTC+8 is 120E; Tianpu is at 116.4E time_offset = _equation_of_time(doy) + 4 * (BEIJING_LON - 120.0) solar_hour = beijing_dt.hour + beijing_dt.minute / 60.0 + beijing_dt.second / 3600.0 solar_hour += time_offset / 60.0 hour_angle = math.radians(15 * (solar_hour - 12)) sin_alt = (math.sin(lat) * math.sin(decl) + math.cos(lat) * math.cos(decl) * math.cos(hour_angle)) return math.degrees(math.asin(max(-1, min(1, sin_alt)))) def solar_azimuth(dt: datetime) -> float: """Solar azimuth in degrees (0=N, 90=E, 180=S, 270=W).""" doy = _day_of_year(dt) decl = math.radians(solar_declination(doy)) lat = math.radians(BEIJING_LAT) alt = math.radians(solar_altitude(dt)) if math.cos(alt) < 1e-6: return 180.0 cos_az = (math.sin(decl) - math.sin(lat) * math.sin(alt)) / (math.cos(lat) * math.cos(alt)) cos_az = max(-1, min(1, cos_az)) az = math.degrees(math.acos(cos_az)) beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt if beijing_dt.hour + beijing_dt.minute / 60.0 > 12: az = 360 - az return az # --------------------------------------------------------------------------- # Cloud transient model # --------------------------------------------------------------------------- class CloudModel: """Simulates random cloud events that reduce PV output.""" def __init__(self): self._events: list[dict] = [] # list of {start_minute, duration_minutes, opacity} self._last_day: int = -1 def _generate_day_events(self, doy: int, month: int): """Generate cloud events for a day. More clouds in summer monsoon.""" self._events.clear() self._last_day = doy # Number of cloud events varies by season if month in (7, 8): # monsoon n_events = int(_uniform(3, 8)) elif month in (6, 9): n_events = int(_uniform(2, 5)) elif month in (3, 4, 5, 10): n_events = int(_uniform(1, 4)) else: # winter: clearer skies in Beijing n_events = int(_uniform(0, 3)) for _ in range(n_events): start = _uniform(6 * 60, 18 * 60) # minutes from midnight duration = _uniform(2, 15) opacity = _uniform(0.3, 0.7) # how much output drops self._events.append({ "start": start, "duration": duration, "opacity": opacity, }) def get_cloud_factor(self, dt: datetime) -> float: """Returns multiplier 0.3-1.0 (1.0 = clear sky).""" beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt doy = beijing_dt.timetuple().tm_yday month = beijing_dt.month if doy != self._last_day: self._generate_day_events(doy, month) minute_of_day = beijing_dt.hour * 60 + beijing_dt.minute factor = 1.0 for ev in self._events: if ev["start"] <= minute_of_day <= ev["start"] + ev["duration"]: factor = min(factor, 1.0 - ev["opacity"]) return factor # Global cloud model instance (shared across inverters for correlated weather) _cloud_model = CloudModel() def get_cloud_factor(dt: datetime) -> float: return _cloud_model.get_cloud_factor(dt) def reset_cloud_model(): """Reset cloud model (useful for backfill where each day is independent).""" global _cloud_model _cloud_model = CloudModel() # --------------------------------------------------------------------------- # Outdoor temperature model # --------------------------------------------------------------------------- def outdoor_temperature(dt: datetime) -> float: """Realistic outdoor temperature for Beijing based on month, hour, and noise. Uses sinusoidal diurnal pattern with peak at ~15:00 and minimum at ~06:00. """ beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt month_idx = beijing_dt.month - 1 hour = beijing_dt.hour + beijing_dt.minute / 60.0 avg = MONTHLY_AVG_TEMP[month_idx] swing = MONTHLY_DIURNAL_SWING[month_idx] # Sinusoidal with peak at 15:00, minimum at 06:00 (shifted cosine) diurnal = -swing * math.cos(2 * math.pi * (hour - 15) / 24) # Day-to-day variation (slow drift) doy = beijing_dt.timetuple().tm_yday day_drift = 3.0 * math.sin(doy * 0.7) + 2.0 * math.cos(doy * 1.3) noise = _gauss(0, 0.5) return avg + diurnal + day_drift + noise def outdoor_humidity(dt: datetime) -> float: """Outdoor humidity correlated with temperature and season.""" beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt month_idx = beijing_dt.month - 1 hour = beijing_dt.hour base = MONTHLY_AVG_HUMIDITY[month_idx] # Higher humidity at night, lower during day diurnal = 8.0 * math.cos(2 * math.pi * (hour - 4) / 24) noise = _gauss(0, 3) return max(15, min(95, base + diurnal + noise)) # --------------------------------------------------------------------------- # PV power model # --------------------------------------------------------------------------- def pv_power(dt: datetime, rated_power: float = 110.0, orientation: str = "south", device_code: str = "") -> float: """Calculate realistic PV inverter output. Args: dt: UTC datetime rated_power: Inverter rated power in kW orientation: 'east', 'west', or 'south' - affects morning/afternoon bias device_code: Device code for per-inverter variation Returns: Power in kW (0 at night, clipped at rated_power) """ alt = solar_altitude(dt) # Night: exactly 0 if alt <= 0: return 0.0 # Dawn/dusk ramp: gradual startup below 5 degrees altitude if alt < 5: ramp_factor = alt / 5.0 else: ramp_factor = 1.0 # Base clear-sky irradiance (simplified: proportional to sin(altitude)) # With atmosphere correction (air mass) air_mass = 1.0 / max(math.sin(math.radians(alt)), 0.01) air_mass = min(air_mass, 40) # cap for very low sun atmospheric_transmission = 0.7 ** (air_mass ** 0.678) # Meinel model simplified clear_sky_factor = math.sin(math.radians(alt)) * atmospheric_transmission # Seasonal factor: panels at fixed tilt (~30 degrees in Beijing) # Summer sun is higher -> slightly less optimal for tilted panels at noon # but longer days compensate doy = _day_of_year(dt) decl = solar_declination(doy) # Approximate panel tilt correction panel_tilt = 30 # degrees tilt_factor = max(0.5, math.cos(math.radians(abs(alt - (90 - BEIJING_LAT + decl)) * 0.3))) # Orientation bias azimuth = solar_azimuth(dt) if orientation == "east": # East-facing gets more morning sun orient_factor = 1.0 + 0.1 * math.cos(math.radians(azimuth - 120)) elif orientation == "west": # West-facing gets more afternoon sun orient_factor = 1.0 + 0.1 * math.cos(math.radians(azimuth - 240)) else: orient_factor = 1.0 # Cloud effect (correlated across all inverters) cloud = get_cloud_factor(dt) # Temperature derating temp = outdoor_temperature(dt) # Panel temperature is ~20-30C above ambient when producing panel_temp = temp + 20 + 10 * clear_sky_factor temp_derate = 1.0 + (-0.004) * max(0, panel_temp - 25) # -0.4%/C above 25C temp_derate = max(0.75, temp_derate) # Per-inverter variation (use device code hash for deterministic offset) if device_code: inv_hash = hash(device_code) % 1000 / 1000.0 inv_variation = 0.97 + 0.06 * inv_hash # 0.97 to 1.03 else: inv_variation = 1.0 # Gaussian noise (1-3%) noise = 1.0 + _gauss(0, 0.015) # Final power power = (rated_power * clear_sky_factor * tilt_factor * orient_factor * cloud * temp_derate * ramp_factor * inv_variation * noise) # Inverter clipping power = min(power, rated_power) power = max(0.0, power) return round(power, 2) def get_pv_orientation(device_code: str) -> str: """Determine inverter orientation based on device code. INV-01, INV-02 are east building, INV-03 is west building. """ if device_code in ("INV-01", "INV-02"): return "east" elif device_code == "INV-03": return "west" return "south" # --------------------------------------------------------------------------- # Heat pump model # --------------------------------------------------------------------------- def get_hvac_mode(month: int) -> str: """Determine HVAC operating mode by month.""" if month in (11, 12, 1, 2, 3): return "heating" elif month in (6, 7, 8, 9): return "cooling" elif month in (4, 5): return "transition_spring" else: # Oct return "transition_fall" def heat_pump_data(dt: datetime, rated_power: float = 35.0, device_code: str = "") -> dict: """Generate realistic heat pump operating data. Returns dict with: power, cop, inlet_temp, outlet_temp, flow_rate, outdoor_temp, mode """ beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt hour = beijing_dt.hour + beijing_dt.minute / 60.0 month = beijing_dt.month is_weekend = beijing_dt.weekday() >= 5 mode = get_hvac_mode(month) out_temp = outdoor_temperature(dt) # COP model: varies with outdoor temperature cop = 3.0 + 0.05 * (out_temp - 7) cop = max(2.0, min(5.5, cop)) cop += _gauss(0, 0.1) cop = max(2.0, min(5.5, cop)) # Operating pattern depends on mode if mode == "heating": # Higher demand at night/morning (cold), lower during warmest part of day if 6 <= hour <= 9: load_factor = _uniform(0.75, 0.95) elif 9 <= hour <= 16: load_factor = _uniform(0.45, 0.65) elif 16 <= hour <= 22: load_factor = _uniform(0.65, 0.85) else: # night load_factor = _uniform(0.55, 0.75) if is_weekend: load_factor *= 0.7 inlet_temp = 35 + _gauss(0, 1.5) # return water delta_t = _uniform(5, 8) outlet_temp = inlet_temp + delta_t elif mode == "cooling": # Higher demand in afternoon (hot) if 8 <= hour <= 11: load_factor = _uniform(0.45, 0.65) elif 11 <= hour <= 16: load_factor = _uniform(0.75, 0.95) elif 16 <= hour <= 19: load_factor = _uniform(0.60, 0.80) elif 19 <= hour <= 22: load_factor = _uniform(0.35, 0.55) else: load_factor = _uniform(0.15, 0.30) if is_weekend: load_factor *= 0.7 inlet_temp = 12 + _gauss(0, 1.0) # return water (chilled) delta_t = _uniform(3, 5) outlet_temp = inlet_temp - delta_t else: # transition # Intermittent operation if _random() < 0.4: # Off period return { "power": 0.0, "cop": 0.0, "inlet_temp": round(out_temp + _gauss(5, 1), 1), "outlet_temp": round(out_temp + _gauss(5, 1), 1), "flow_rate": 0.0, "outdoor_temp": round(out_temp, 1), "mode": "standby", } load_factor = _uniform(0.25, 0.55) # Could be either heating or cooling depending on temp if out_temp < 15: inlet_temp = 32 + _gauss(0, 1.5) delta_t = _uniform(4, 6) outlet_temp = inlet_temp + delta_t else: inlet_temp = 14 + _gauss(0, 1.0) delta_t = _uniform(3, 4) outlet_temp = inlet_temp - delta_t power = rated_power * load_factor power += _gauss(0, power * 0.02) # noise power = max(0, min(rated_power, power)) # Flow rate correlates with power (not random!) # Higher power -> higher flow for heat transfer flow_rate = 8 + (power / rated_power) * 7 # 8-15 m3/h range flow_rate += _gauss(0, 0.3) flow_rate = max(5, min(18, flow_rate)) # Per-unit variation if device_code: unit_offset = (hash(device_code) % 100 - 50) / 500.0 # +/- 10% power *= (1 + unit_offset) return { "power": round(max(0, power), 2), "cop": round(cop, 2), "inlet_temp": round(inlet_temp, 1), "outlet_temp": round(outlet_temp, 1), "flow_rate": round(flow_rate, 1), "outdoor_temp": round(out_temp, 1), "mode": mode, } # --------------------------------------------------------------------------- # Building load (meter) model # --------------------------------------------------------------------------- def building_load(dt: datetime, base_power: float = 50.0, meter_code: str = "") -> dict: """Generate realistic building electrical load. Returns dict with: power, voltage, current, power_factor """ beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt hour = beijing_dt.hour + beijing_dt.minute / 60.0 month = beijing_dt.month is_weekend = beijing_dt.weekday() >= 5 # Base load profile if is_weekend: # Weekend: much lower, no office activity if 8 <= hour <= 18: load_factor = _uniform(0.35, 0.50) else: load_factor = _uniform(0.25, 0.35) else: # Weekday office pattern if hour < 6: load_factor = _uniform(0.25, 0.35) # night minimum (security, servers) elif 6 <= hour < 8: # Morning ramp-up ramp = (hour - 6) / 2.0 load_factor = _uniform(0.35, 0.50) + ramp * 0.3 elif 8 <= hour < 12: load_factor = _uniform(0.75, 0.95) # morning work elif 12 <= hour < 13: load_factor = _uniform(0.55, 0.70) # lunch dip elif 13 <= hour < 18: load_factor = _uniform(0.80, 1.0) # afternoon peak elif 18 <= hour < 19: # Evening ramp-down ramp = (19 - hour) load_factor = _uniform(0.50, 0.65) + ramp * 0.2 elif 19 <= hour < 22: load_factor = _uniform(0.35, 0.50) # evening else: load_factor = _uniform(0.25, 0.35) # night # HVAC seasonal contribution hvac_mode = get_hvac_mode(month) if hvac_mode == "heating": hvac_add = _uniform(0.10, 0.20) elif hvac_mode == "cooling": hvac_add = _uniform(0.15, 0.25) else: hvac_add = _uniform(0.03, 0.08) # Random load events (elevator, kitchen, EV charging) spike = 0.0 if _random() < 0.08: # ~8% chance per reading spike = _uniform(5, 25) # kW spike power = base_power * (load_factor + hvac_add) + spike # Minimum night base load (security, servers, emergency lighting) min_load = 15 + _gauss(0, 1) power = max(min_load, power) # Noise power += _gauss(0, power * 0.015) power = max(0, power) # Voltage (realistic grid: 220V +/- 5%) voltage = 220 + _gauss(0, 2.0) voltage = max(209, min(231, voltage)) # Power factor pf = _uniform(0.88, 0.96) if 8 <= hour <= 18 and not is_weekend: pf = _uniform(0.90, 0.97) # better during office hours (capacitor bank) # Current derived from power current = power / (voltage * math.sqrt(3) * pf / 1000) # 3-phase # Per-meter variation if meter_code == "METER-GRID": pass # main meter, use as-is elif meter_code == "METER-PV": # PV meter shows generation, not load — handled separately pass elif meter_code == "METER-HP": power *= _uniform(0.2, 0.35) # heat pump subset of total elif meter_code == "METER-PUMP": power *= _uniform(0.05, 0.12) # circulation pumps return { "power": round(power, 2), "voltage": round(voltage, 1), "current": round(current, 1), "power_factor": round(pf, 3), } # --------------------------------------------------------------------------- # Sensor model # --------------------------------------------------------------------------- def indoor_sensor(dt: datetime, is_outdoor: bool = False, device_code: str = "") -> dict: """Generate realistic temperature and humidity sensor data. Returns dict with: temperature, humidity """ beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt hour = beijing_dt.hour + beijing_dt.minute / 60.0 month = beijing_dt.month is_weekend = beijing_dt.weekday() >= 5 if is_outdoor: temp = outdoor_temperature(dt) hum = outdoor_humidity(dt) return {"temperature": round(temp, 1), "humidity": round(hum, 1)} # Indoor: HVAC controlled during office hours hvac_mode = get_hvac_mode(month) if not is_weekend and 7 <= hour <= 19: # HVAC on: well-controlled if hvac_mode == "heating": temp = _uniform(20.5, 23.5) elif hvac_mode == "cooling": temp = _uniform(23.0, 25.5) else: temp = _uniform(21.0, 25.0) hum = _uniform(40, 55) else: # HVAC off or weekend: drifts toward outdoor out_temp = outdoor_temperature(dt) if hvac_mode == "heating": # Indoor cools slowly without heating temp = max(16, min(22, 22 - (22 - out_temp) * 0.15)) elif hvac_mode == "cooling": # Indoor warms slowly without cooling temp = min(30, max(24, 24 + (out_temp - 24) * 0.15)) else: temp = 20 + (out_temp - 15) * 0.2 hum = _uniform(35, 65) # Summer monsoon: higher indoor humidity without dehumidification if month in (7, 8) and is_weekend: hum = _uniform(55, 75) # Per-sensor variation (different rooms have slightly different temps) if device_code: room_offset = (hash(device_code) % 100 - 50) / 100.0 # +/- 0.5C temp += room_offset temp += _gauss(0, 0.2) hum += _gauss(0, 1.5) return { "temperature": round(temp, 1), "humidity": round(max(15, min(95, hum)), 1), } # --------------------------------------------------------------------------- # Heat meter model # --------------------------------------------------------------------------- def heat_meter_data(dt: datetime, hp_power: float = 0, hp_cop: float = 3.0) -> dict: """Generate heat meter readings correlated with heat pump operation. Args: hp_power: Total heat pump electrical power (sum of all units) in kW hp_cop: Average COP of operating heat pumps Returns dict with: heat_power, flow_rate, supply_temp, return_temp """ # Heat output = electrical input * COP * efficiency loss heat_power = hp_power * hp_cop * _uniform(0.88, 0.95) if heat_power < 1: return { "heat_power": 0.0, "flow_rate": 0.0, "supply_temp": round(outdoor_temperature(dt) + _gauss(5, 1), 1), "return_temp": round(outdoor_temperature(dt) + _gauss(5, 1), 1), } beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt mode = get_hvac_mode(beijing_dt.month) if mode == "heating": supply_temp = 42 + _gauss(0, 1.5) return_temp = supply_temp - _uniform(5, 8) elif mode == "cooling": supply_temp = 7 + _gauss(0, 0.8) return_temp = supply_temp + _uniform(3, 5) else: supply_temp = 30 + _gauss(0, 2) return_temp = supply_temp - _uniform(3, 5) # Flow rate derived from heat and delta-T delta_t = abs(supply_temp - return_temp) if delta_t > 0.5: # Q = m * cp * dT => m = Q / (cp * dT) # cp of water ~4.186 kJ/kgK, 1 m3 = 1000 kg flow_rate = heat_power / (4.186 * delta_t) * 3.6 # m3/h else: flow_rate = _uniform(5, 10) flow_rate += _gauss(0, 0.2) return { "heat_power": round(max(0, heat_power), 2), "flow_rate": round(max(0, flow_rate), 1), "supply_temp": round(supply_temp, 1), "return_temp": round(return_temp, 1), } # --------------------------------------------------------------------------- # Communication glitch model # --------------------------------------------------------------------------- def should_skip_reading(cycle_count: int = 0) -> bool: """Simulate occasional communication glitches. ~1% chance of skipping a reading. """ return _random() < 0.01 def should_go_offline() -> bool: """Simulate brief device offline events. ~0.1% chance per cycle (roughly once every few hours at 15s intervals). """ return _random() < 0.001 # --------------------------------------------------------------------------- # PV electrical details # --------------------------------------------------------------------------- def pv_electrical(power: float, rated_power: float = 110.0) -> dict: """Generate realistic PV electrical measurements.""" if power <= 0: return { "dc_voltage": 0.0, "ac_voltage": round(220 + _gauss(0, 1), 1), "temperature": round(outdoor_temperature(datetime.now(timezone.utc)) + _gauss(0, 2), 1), } load_ratio = power / rated_power # DC voltage: MPPT tracking range 200-850V, higher at higher power dc_voltage = 450 + 200 * load_ratio + _gauss(0, 15) dc_voltage = max(200, min(850, dc_voltage)) # AC voltage: grid-tied, very stable ac_voltage = 220 + _gauss(0, 1.5) # Inverter temperature: ambient + load-dependent heating inv_temp = outdoor_temperature(datetime.now(timezone.utc)) + 15 + 20 * load_ratio inv_temp += _gauss(0, 1.5) return { "dc_voltage": round(dc_voltage, 1), "ac_voltage": round(ac_voltage, 1), "temperature": round(inv_temp, 1), } def pv_electrical_at(power: float, dt: datetime, rated_power: float = 110.0) -> dict: """Generate PV electrical measurements for a specific time (backfill).""" if power <= 0: return { "dc_voltage": 0.0, "ac_voltage": round(220 + _gauss(0, 1), 1), "temperature": round(outdoor_temperature(dt) + _gauss(0, 2), 1), } load_ratio = power / rated_power dc_voltage = 450 + 200 * load_ratio + _gauss(0, 15) dc_voltage = max(200, min(850, dc_voltage)) ac_voltage = 220 + _gauss(0, 1.5) inv_temp = outdoor_temperature(dt) + 15 + 20 * load_ratio + _gauss(0, 1.5) return { "dc_voltage": round(dc_voltage, 1), "ac_voltage": round(ac_voltage, 1), "temperature": round(inv_temp, 1), }