Files
zpark-ems/backend/app/services/weather_model.py
Du Wenbo 026c837b91 Squashed 'core/' content from commit 92ec910
git-subtree-dir: core
git-subtree-split: 92ec910a132e379a3a6e442a75bcb07cac0f0010
2026-04-04 18:17:10 +08:00

740 lines
25 KiB
Python

"""Beijing weather and solar position models for realistic data simulation.
Shared by both the real-time simulator and the backfill script.
Deterministic when given a seed — call set_seed() for reproducible backfills.
Tianpu campus: 39.9N, 116.4E (Beijing / Daxing district)
"""
import math
import random
from datetime import datetime, timezone, timedelta
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
BEIJING_LAT = 39.9 # degrees N
BEIJING_LON = 116.4 # degrees E
BEIJING_TZ_OFFSET = 8 # UTC+8
# Monthly average temperatures for Beijing (index 0 = Jan)
# Source: typical climatological data
MONTHLY_AVG_TEMP = [-3.0, 0.0, 7.0, 14.0, 21.0, 26.0, 27.5, 26.0, 21.0, 13.0, 4.0, -1.5]
# Diurnal temperature swing amplitude by month (half-range)
MONTHLY_DIURNAL_SWING = [6.0, 7.0, 7.5, 8.0, 7.5, 7.0, 6.0, 6.0, 7.0, 7.5, 7.0, 6.0]
# Monthly average relative humidity (%)
MONTHLY_AVG_HUMIDITY = [44, 42, 38, 38, 45, 58, 72, 75, 62, 55, 50, 46]
# Sunrise/sunset hours (approximate, Beijing local time) by month
MONTHLY_SUNRISE = [7.5, 7.1, 6.4, 5.7, 5.2, 5.0, 5.1, 5.5, 6.0, 6.3, 6.8, 7.3]
MONTHLY_SUNSET = [17.1, 17.6, 18.2, 18.7, 19.2, 19.5, 19.4, 19.0, 18.3, 17.6, 17.0, 16.9]
# Solar declination approximation (degrees) for day-of-year
# and equation of time are computed analytically below
_rng = random.Random()
def set_seed(seed: int):
"""Set the random seed for reproducible data generation."""
global _rng
_rng = random.Random(seed)
def _gauss(mu: float, sigma: float) -> float:
return _rng.gauss(mu, sigma)
def _uniform(a: float, b: float) -> float:
return _rng.uniform(a, b)
def _random() -> float:
return _rng.random()
# ---------------------------------------------------------------------------
# Solar position (simplified but accurate enough for simulation)
# ---------------------------------------------------------------------------
def _day_of_year(dt: datetime) -> int:
"""Day of year 1-366."""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
return beijing_dt.timetuple().tm_yday
def solar_declination(day_of_year: int) -> float:
"""Solar declination in degrees."""
return 23.45 * math.sin(math.radians((360 / 365) * (day_of_year - 81)))
def _equation_of_time(day_of_year: int) -> float:
"""Equation of time in minutes."""
b = math.radians((360 / 365) * (day_of_year - 81))
return 9.87 * math.sin(2 * b) - 7.53 * math.cos(b) - 1.5 * math.sin(b)
def solar_altitude(dt: datetime) -> float:
"""Solar altitude angle in degrees for Beijing at given UTC datetime.
Returns negative values when sun is below horizon.
"""
doy = _day_of_year(dt)
decl = math.radians(solar_declination(doy))
lat = math.radians(BEIJING_LAT)
# Local solar time
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
# Standard meridian for UTC+8 is 120E; Tianpu is at 116.4E
time_offset = _equation_of_time(doy) + 4 * (BEIJING_LON - 120.0)
solar_hour = beijing_dt.hour + beijing_dt.minute / 60.0 + beijing_dt.second / 3600.0
solar_hour += time_offset / 60.0
hour_angle = math.radians(15 * (solar_hour - 12))
sin_alt = (math.sin(lat) * math.sin(decl) +
math.cos(lat) * math.cos(decl) * math.cos(hour_angle))
return math.degrees(math.asin(max(-1, min(1, sin_alt))))
def solar_azimuth(dt: datetime) -> float:
"""Solar azimuth in degrees (0=N, 90=E, 180=S, 270=W)."""
doy = _day_of_year(dt)
decl = math.radians(solar_declination(doy))
lat = math.radians(BEIJING_LAT)
alt = math.radians(solar_altitude(dt))
if math.cos(alt) < 1e-6:
return 180.0
cos_az = (math.sin(decl) - math.sin(lat) * math.sin(alt)) / (math.cos(lat) * math.cos(alt))
cos_az = max(-1, min(1, cos_az))
az = math.degrees(math.acos(cos_az))
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
if beijing_dt.hour + beijing_dt.minute / 60.0 > 12:
az = 360 - az
return az
# ---------------------------------------------------------------------------
# Cloud transient model
# ---------------------------------------------------------------------------
class CloudModel:
"""Simulates random cloud events that reduce PV output."""
def __init__(self):
self._events: list[dict] = [] # list of {start_minute, duration_minutes, opacity}
self._last_day: int = -1
def _generate_day_events(self, doy: int, month: int):
"""Generate cloud events for a day. More clouds in summer monsoon."""
self._events.clear()
self._last_day = doy
# Number of cloud events varies by season
if month in (7, 8): # monsoon
n_events = int(_uniform(3, 8))
elif month in (6, 9):
n_events = int(_uniform(2, 5))
elif month in (3, 4, 5, 10):
n_events = int(_uniform(1, 4))
else: # winter: clearer skies in Beijing
n_events = int(_uniform(0, 3))
for _ in range(n_events):
start = _uniform(6 * 60, 18 * 60) # minutes from midnight
duration = _uniform(2, 15)
opacity = _uniform(0.3, 0.7) # how much output drops
self._events.append({
"start": start,
"duration": duration,
"opacity": opacity,
})
def get_cloud_factor(self, dt: datetime) -> float:
"""Returns multiplier 0.3-1.0 (1.0 = clear sky)."""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
doy = beijing_dt.timetuple().tm_yday
month = beijing_dt.month
if doy != self._last_day:
self._generate_day_events(doy, month)
minute_of_day = beijing_dt.hour * 60 + beijing_dt.minute
factor = 1.0
for ev in self._events:
if ev["start"] <= minute_of_day <= ev["start"] + ev["duration"]:
factor = min(factor, 1.0 - ev["opacity"])
return factor
# Global cloud model instance (shared across inverters for correlated weather)
_cloud_model = CloudModel()
def get_cloud_factor(dt: datetime) -> float:
return _cloud_model.get_cloud_factor(dt)
def reset_cloud_model():
"""Reset cloud model (useful for backfill where each day is independent)."""
global _cloud_model
_cloud_model = CloudModel()
# ---------------------------------------------------------------------------
# Outdoor temperature model
# ---------------------------------------------------------------------------
def outdoor_temperature(dt: datetime) -> float:
"""Realistic outdoor temperature for Beijing based on month, hour, and noise.
Uses sinusoidal diurnal pattern with peak at ~15:00 and minimum at ~06:00.
"""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
month_idx = beijing_dt.month - 1
hour = beijing_dt.hour + beijing_dt.minute / 60.0
avg = MONTHLY_AVG_TEMP[month_idx]
swing = MONTHLY_DIURNAL_SWING[month_idx]
# Sinusoidal with peak at 15:00, minimum at 06:00 (shifted cosine)
diurnal = -swing * math.cos(2 * math.pi * (hour - 15) / 24)
# Day-to-day variation (slow drift)
doy = beijing_dt.timetuple().tm_yday
day_drift = 3.0 * math.sin(doy * 0.7) + 2.0 * math.cos(doy * 1.3)
noise = _gauss(0, 0.5)
return avg + diurnal + day_drift + noise
def outdoor_humidity(dt: datetime) -> float:
"""Outdoor humidity correlated with temperature and season."""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
month_idx = beijing_dt.month - 1
hour = beijing_dt.hour
base = MONTHLY_AVG_HUMIDITY[month_idx]
# Higher humidity at night, lower during day
diurnal = 8.0 * math.cos(2 * math.pi * (hour - 4) / 24)
noise = _gauss(0, 3)
return max(15, min(95, base + diurnal + noise))
# ---------------------------------------------------------------------------
# PV power model
# ---------------------------------------------------------------------------
def pv_power(dt: datetime, rated_power: float = 110.0,
orientation: str = "south", device_code: str = "") -> float:
"""Calculate realistic PV inverter output.
Args:
dt: UTC datetime
rated_power: Inverter rated power in kW
orientation: 'east', 'west', or 'south' - affects morning/afternoon bias
device_code: Device code for per-inverter variation
Returns:
Power in kW (0 at night, clipped at rated_power)
"""
alt = solar_altitude(dt)
# Night: exactly 0
if alt <= 0:
return 0.0
# Dawn/dusk ramp: gradual startup below 5 degrees altitude
if alt < 5:
ramp_factor = alt / 5.0
else:
ramp_factor = 1.0
# Base clear-sky irradiance (simplified: proportional to sin(altitude))
# With atmosphere correction (air mass)
air_mass = 1.0 / max(math.sin(math.radians(alt)), 0.01)
air_mass = min(air_mass, 40) # cap for very low sun
atmospheric_transmission = 0.7 ** (air_mass ** 0.678) # Meinel model simplified
clear_sky_factor = math.sin(math.radians(alt)) * atmospheric_transmission
# Seasonal factor: panels at fixed tilt (~30 degrees in Beijing)
# Summer sun is higher -> slightly less optimal for tilted panels at noon
# but longer days compensate
doy = _day_of_year(dt)
decl = solar_declination(doy)
# Approximate panel tilt correction
panel_tilt = 30 # degrees
tilt_factor = max(0.5, math.cos(math.radians(abs(alt - (90 - BEIJING_LAT + decl)) * 0.3)))
# Orientation bias
azimuth = solar_azimuth(dt)
if orientation == "east":
# East-facing gets more morning sun
orient_factor = 1.0 + 0.1 * math.cos(math.radians(azimuth - 120))
elif orientation == "west":
# West-facing gets more afternoon sun
orient_factor = 1.0 + 0.1 * math.cos(math.radians(azimuth - 240))
else:
orient_factor = 1.0
# Cloud effect (correlated across all inverters)
cloud = get_cloud_factor(dt)
# Temperature derating
temp = outdoor_temperature(dt)
# Panel temperature is ~20-30C above ambient when producing
panel_temp = temp + 20 + 10 * clear_sky_factor
temp_derate = 1.0 + (-0.004) * max(0, panel_temp - 25) # -0.4%/C above 25C
temp_derate = max(0.75, temp_derate)
# Per-inverter variation (use device code hash for deterministic offset)
if device_code:
inv_hash = hash(device_code) % 1000 / 1000.0
inv_variation = 0.97 + 0.06 * inv_hash # 0.97 to 1.03
else:
inv_variation = 1.0
# Gaussian noise (1-3%)
noise = 1.0 + _gauss(0, 0.015)
# Final power
power = (rated_power * clear_sky_factor * tilt_factor * orient_factor *
cloud * temp_derate * ramp_factor * inv_variation * noise)
# Inverter clipping
power = min(power, rated_power)
power = max(0.0, power)
return round(power, 2)
def get_pv_orientation(device_code: str) -> str:
"""Determine inverter orientation based on device code.
INV-01, INV-02 are east building, INV-03 is west building.
"""
if device_code in ("INV-01", "INV-02"):
return "east"
elif device_code == "INV-03":
return "west"
return "south"
# ---------------------------------------------------------------------------
# Heat pump model
# ---------------------------------------------------------------------------
def get_hvac_mode(month: int) -> str:
"""Determine HVAC operating mode by month."""
if month in (11, 12, 1, 2, 3):
return "heating"
elif month in (6, 7, 8, 9):
return "cooling"
elif month in (4, 5):
return "transition_spring"
else: # Oct
return "transition_fall"
def heat_pump_data(dt: datetime, rated_power: float = 35.0,
device_code: str = "") -> dict:
"""Generate realistic heat pump operating data.
Returns dict with: power, cop, inlet_temp, outlet_temp, flow_rate, outdoor_temp, mode
"""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
hour = beijing_dt.hour + beijing_dt.minute / 60.0
month = beijing_dt.month
is_weekend = beijing_dt.weekday() >= 5
mode = get_hvac_mode(month)
out_temp = outdoor_temperature(dt)
# COP model: varies with outdoor temperature
cop = 3.0 + 0.05 * (out_temp - 7)
cop = max(2.0, min(5.5, cop))
cop += _gauss(0, 0.1)
cop = max(2.0, min(5.5, cop))
# Operating pattern depends on mode
if mode == "heating":
# Higher demand at night/morning (cold), lower during warmest part of day
if 6 <= hour <= 9:
load_factor = _uniform(0.75, 0.95)
elif 9 <= hour <= 16:
load_factor = _uniform(0.45, 0.65)
elif 16 <= hour <= 22:
load_factor = _uniform(0.65, 0.85)
else: # night
load_factor = _uniform(0.55, 0.75)
if is_weekend:
load_factor *= 0.7
inlet_temp = 35 + _gauss(0, 1.5) # return water
delta_t = _uniform(5, 8)
outlet_temp = inlet_temp + delta_t
elif mode == "cooling":
# Higher demand in afternoon (hot)
if 8 <= hour <= 11:
load_factor = _uniform(0.45, 0.65)
elif 11 <= hour <= 16:
load_factor = _uniform(0.75, 0.95)
elif 16 <= hour <= 19:
load_factor = _uniform(0.60, 0.80)
elif 19 <= hour <= 22:
load_factor = _uniform(0.35, 0.55)
else:
load_factor = _uniform(0.15, 0.30)
if is_weekend:
load_factor *= 0.7
inlet_temp = 12 + _gauss(0, 1.0) # return water (chilled)
delta_t = _uniform(3, 5)
outlet_temp = inlet_temp - delta_t
else: # transition
# Intermittent operation
if _random() < 0.4:
# Off period
return {
"power": 0.0, "cop": 0.0,
"inlet_temp": round(out_temp + _gauss(5, 1), 1),
"outlet_temp": round(out_temp + _gauss(5, 1), 1),
"flow_rate": 0.0, "outdoor_temp": round(out_temp, 1),
"mode": "standby",
}
load_factor = _uniform(0.25, 0.55)
# Could be either heating or cooling depending on temp
if out_temp < 15:
inlet_temp = 32 + _gauss(0, 1.5)
delta_t = _uniform(4, 6)
outlet_temp = inlet_temp + delta_t
else:
inlet_temp = 14 + _gauss(0, 1.0)
delta_t = _uniform(3, 4)
outlet_temp = inlet_temp - delta_t
power = rated_power * load_factor
power += _gauss(0, power * 0.02) # noise
power = max(0, min(rated_power, power))
# Flow rate correlates with power (not random!)
# Higher power -> higher flow for heat transfer
flow_rate = 8 + (power / rated_power) * 7 # 8-15 m3/h range
flow_rate += _gauss(0, 0.3)
flow_rate = max(5, min(18, flow_rate))
# Per-unit variation
if device_code:
unit_offset = (hash(device_code) % 100 - 50) / 500.0 # +/- 10%
power *= (1 + unit_offset)
return {
"power": round(max(0, power), 2),
"cop": round(cop, 2),
"inlet_temp": round(inlet_temp, 1),
"outlet_temp": round(outlet_temp, 1),
"flow_rate": round(flow_rate, 1),
"outdoor_temp": round(out_temp, 1),
"mode": mode,
}
# ---------------------------------------------------------------------------
# Building load (meter) model
# ---------------------------------------------------------------------------
def building_load(dt: datetime, base_power: float = 50.0,
meter_code: str = "") -> dict:
"""Generate realistic building electrical load.
Returns dict with: power, voltage, current, power_factor
"""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
hour = beijing_dt.hour + beijing_dt.minute / 60.0
month = beijing_dt.month
is_weekend = beijing_dt.weekday() >= 5
# Base load profile
if is_weekend:
# Weekend: much lower, no office activity
if 8 <= hour <= 18:
load_factor = _uniform(0.35, 0.50)
else:
load_factor = _uniform(0.25, 0.35)
else:
# Weekday office pattern
if hour < 6:
load_factor = _uniform(0.25, 0.35) # night minimum (security, servers)
elif 6 <= hour < 8:
# Morning ramp-up
ramp = (hour - 6) / 2.0
load_factor = _uniform(0.35, 0.50) + ramp * 0.3
elif 8 <= hour < 12:
load_factor = _uniform(0.75, 0.95) # morning work
elif 12 <= hour < 13:
load_factor = _uniform(0.55, 0.70) # lunch dip
elif 13 <= hour < 18:
load_factor = _uniform(0.80, 1.0) # afternoon peak
elif 18 <= hour < 19:
# Evening ramp-down
ramp = (19 - hour)
load_factor = _uniform(0.50, 0.65) + ramp * 0.2
elif 19 <= hour < 22:
load_factor = _uniform(0.35, 0.50) # evening
else:
load_factor = _uniform(0.25, 0.35) # night
# HVAC seasonal contribution
hvac_mode = get_hvac_mode(month)
if hvac_mode == "heating":
hvac_add = _uniform(0.10, 0.20)
elif hvac_mode == "cooling":
hvac_add = _uniform(0.15, 0.25)
else:
hvac_add = _uniform(0.03, 0.08)
# Random load events (elevator, kitchen, EV charging)
spike = 0.0
if _random() < 0.08: # ~8% chance per reading
spike = _uniform(5, 25) # kW spike
power = base_power * (load_factor + hvac_add) + spike
# Minimum night base load (security, servers, emergency lighting)
min_load = 15 + _gauss(0, 1)
power = max(min_load, power)
# Noise
power += _gauss(0, power * 0.015)
power = max(0, power)
# Voltage (realistic grid: 220V +/- 5%)
voltage = 220 + _gauss(0, 2.0)
voltage = max(209, min(231, voltage))
# Power factor
pf = _uniform(0.88, 0.96)
if 8 <= hour <= 18 and not is_weekend:
pf = _uniform(0.90, 0.97) # better during office hours (capacitor bank)
# Current derived from power
current = power / (voltage * math.sqrt(3) * pf / 1000) # 3-phase
# Per-meter variation
if meter_code == "METER-GRID":
pass # main meter, use as-is
elif meter_code == "METER-PV":
# PV meter shows generation, not load — handled separately
pass
elif meter_code == "METER-HP":
power *= _uniform(0.2, 0.35) # heat pump subset of total
elif meter_code == "METER-PUMP":
power *= _uniform(0.05, 0.12) # circulation pumps
return {
"power": round(power, 2),
"voltage": round(voltage, 1),
"current": round(current, 1),
"power_factor": round(pf, 3),
}
# ---------------------------------------------------------------------------
# Sensor model
# ---------------------------------------------------------------------------
def indoor_sensor(dt: datetime, is_outdoor: bool = False,
device_code: str = "") -> dict:
"""Generate realistic temperature and humidity sensor data.
Returns dict with: temperature, humidity
"""
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
hour = beijing_dt.hour + beijing_dt.minute / 60.0
month = beijing_dt.month
is_weekend = beijing_dt.weekday() >= 5
if is_outdoor:
temp = outdoor_temperature(dt)
hum = outdoor_humidity(dt)
return {"temperature": round(temp, 1), "humidity": round(hum, 1)}
# Indoor: HVAC controlled during office hours
hvac_mode = get_hvac_mode(month)
if not is_weekend and 7 <= hour <= 19:
# HVAC on: well-controlled
if hvac_mode == "heating":
temp = _uniform(20.5, 23.5)
elif hvac_mode == "cooling":
temp = _uniform(23.0, 25.5)
else:
temp = _uniform(21.0, 25.0)
hum = _uniform(40, 55)
else:
# HVAC off or weekend: drifts toward outdoor
out_temp = outdoor_temperature(dt)
if hvac_mode == "heating":
# Indoor cools slowly without heating
temp = max(16, min(22, 22 - (22 - out_temp) * 0.15))
elif hvac_mode == "cooling":
# Indoor warms slowly without cooling
temp = min(30, max(24, 24 + (out_temp - 24) * 0.15))
else:
temp = 20 + (out_temp - 15) * 0.2
hum = _uniform(35, 65)
# Summer monsoon: higher indoor humidity without dehumidification
if month in (7, 8) and is_weekend:
hum = _uniform(55, 75)
# Per-sensor variation (different rooms have slightly different temps)
if device_code:
room_offset = (hash(device_code) % 100 - 50) / 100.0 # +/- 0.5C
temp += room_offset
temp += _gauss(0, 0.2)
hum += _gauss(0, 1.5)
return {
"temperature": round(temp, 1),
"humidity": round(max(15, min(95, hum)), 1),
}
# ---------------------------------------------------------------------------
# Heat meter model
# ---------------------------------------------------------------------------
def heat_meter_data(dt: datetime, hp_power: float = 0, hp_cop: float = 3.0) -> dict:
"""Generate heat meter readings correlated with heat pump operation.
Args:
hp_power: Total heat pump electrical power (sum of all units) in kW
hp_cop: Average COP of operating heat pumps
Returns dict with: heat_power, flow_rate, supply_temp, return_temp
"""
# Heat output = electrical input * COP * efficiency loss
heat_power = hp_power * hp_cop * _uniform(0.88, 0.95)
if heat_power < 1:
return {
"heat_power": 0.0,
"flow_rate": 0.0,
"supply_temp": round(outdoor_temperature(dt) + _gauss(5, 1), 1),
"return_temp": round(outdoor_temperature(dt) + _gauss(5, 1), 1),
}
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
mode = get_hvac_mode(beijing_dt.month)
if mode == "heating":
supply_temp = 42 + _gauss(0, 1.5)
return_temp = supply_temp - _uniform(5, 8)
elif mode == "cooling":
supply_temp = 7 + _gauss(0, 0.8)
return_temp = supply_temp + _uniform(3, 5)
else:
supply_temp = 30 + _gauss(0, 2)
return_temp = supply_temp - _uniform(3, 5)
# Flow rate derived from heat and delta-T
delta_t = abs(supply_temp - return_temp)
if delta_t > 0.5:
# Q = m * cp * dT => m = Q / (cp * dT)
# cp of water ~4.186 kJ/kgK, 1 m3 = 1000 kg
flow_rate = heat_power / (4.186 * delta_t) * 3.6 # m3/h
else:
flow_rate = _uniform(5, 10)
flow_rate += _gauss(0, 0.2)
return {
"heat_power": round(max(0, heat_power), 2),
"flow_rate": round(max(0, flow_rate), 1),
"supply_temp": round(supply_temp, 1),
"return_temp": round(return_temp, 1),
}
# ---------------------------------------------------------------------------
# Communication glitch model
# ---------------------------------------------------------------------------
def should_skip_reading(cycle_count: int = 0) -> bool:
"""Simulate occasional communication glitches.
~1% chance of skipping a reading.
"""
return _random() < 0.01
def should_go_offline() -> bool:
"""Simulate brief device offline events.
~0.1% chance per cycle (roughly once every few hours at 15s intervals).
"""
return _random() < 0.001
# ---------------------------------------------------------------------------
# PV electrical details
# ---------------------------------------------------------------------------
def pv_electrical(power: float, rated_power: float = 110.0) -> dict:
"""Generate realistic PV electrical measurements."""
if power <= 0:
return {
"dc_voltage": 0.0,
"ac_voltage": round(220 + _gauss(0, 1), 1),
"temperature": round(outdoor_temperature(datetime.now(timezone.utc)) + _gauss(0, 2), 1),
}
load_ratio = power / rated_power
# DC voltage: MPPT tracking range 200-850V, higher at higher power
dc_voltage = 450 + 200 * load_ratio + _gauss(0, 15)
dc_voltage = max(200, min(850, dc_voltage))
# AC voltage: grid-tied, very stable
ac_voltage = 220 + _gauss(0, 1.5)
# Inverter temperature: ambient + load-dependent heating
inv_temp = outdoor_temperature(datetime.now(timezone.utc)) + 15 + 20 * load_ratio
inv_temp += _gauss(0, 1.5)
return {
"dc_voltage": round(dc_voltage, 1),
"ac_voltage": round(ac_voltage, 1),
"temperature": round(inv_temp, 1),
}
def pv_electrical_at(power: float, dt: datetime, rated_power: float = 110.0) -> dict:
"""Generate PV electrical measurements for a specific time (backfill)."""
if power <= 0:
return {
"dc_voltage": 0.0,
"ac_voltage": round(220 + _gauss(0, 1), 1),
"temperature": round(outdoor_temperature(dt) + _gauss(0, 2), 1),
}
load_ratio = power / rated_power
dc_voltage = 450 + 200 * load_ratio + _gauss(0, 15)
dc_voltage = max(200, min(850, dc_voltage))
ac_voltage = 220 + _gauss(0, 1.5)
inv_temp = outdoor_temperature(dt) + 15 + 20 * load_ratio + _gauss(0, 1.5)
return {
"dc_voltage": round(dc_voltage, 1),
"ac_voltage": round(ac_voltage, 1),
"temperature": round(inv_temp, 1),
}