Files
zpark-ems/core/scripts/backfill_data.py

400 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""回填历史模拟能耗数据 - 过去30天逐小时数据含碳排放记录
Uses the shared weather_model for physics-based solar, temperature, and load
generation. Deterministic seed (42) ensures reproducible output across runs.
"""
import asyncio
import math
import os
import random
import sys
from datetime import datetime, timedelta, timezone
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "backend"))
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://tianpu:tianpu2026@localhost:5432/tianpu_ems",
)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import text, select
from app.services.weather_model import (
set_seed, reset_cloud_model,
pv_power, pv_electrical_at, get_pv_orientation,
heat_pump_data, building_load, indoor_sensor,
heat_meter_data, outdoor_temperature, outdoor_humidity,
get_hvac_mode,
)
from app.models.device import Device
# ---------------------------------------------------------------------------
# Device definitions — will be populated from DB at runtime
# ---------------------------------------------------------------------------
PV_IDS = []
PV_CODES = ["INV-01", "INV-02", "INV-03"]
HP_IDS = []
HP_CODES = ["HP-01", "HP-02", "HP-03", "HP-04"]
METER_IDS = []
METER_CODES = ["METER-GRID", "METER-PV", "METER-HP", "METER-PUMP"]
HEAT_METER_ID = None
SENSOR_IDS = []
SENSOR_CODES = ["TH-01", "TH-02", "TH-03", "TH-04", "TH-05"]
async def _load_device_ids(session: AsyncSession):
"""Load actual device IDs from DB by code."""
global PV_IDS, HP_IDS, METER_IDS, HEAT_METER_ID, SENSOR_IDS
result = await session.execute(select(Device.id, Device.code).order_by(Device.id))
code_to_id = {row[1]: row[0] for row in result.all()}
PV_IDS = [code_to_id[c] for c in PV_CODES if c in code_to_id]
HP_IDS = [code_to_id[c] for c in HP_CODES if c in code_to_id]
METER_IDS = [code_to_id[c] for c in METER_CODES if c in code_to_id]
HEAT_METER_ID = code_to_id.get("HM-01")
SENSOR_IDS = [code_to_id[c] for c in SENSOR_CODES if c in code_to_id]
print(f" Loaded device IDs: PV={PV_IDS}, HP={HP_IDS}, Meters={METER_IDS}, HeatMeter={HEAT_METER_ID}, Sensors={SENSOR_IDS}")
EMISSION_FACTOR = 0.8843 # kgCO2/kWh - North China grid
DAYS = 30
HOURS_PER_DAY = 24
TOTAL_HOURS = DAYS * HOURS_PER_DAY
# ---------------------------------------------------------------------------
# Main backfill
# ---------------------------------------------------------------------------
async def backfill():
# Set deterministic seed for reproducibility
set_seed(42)
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=5)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# Load actual device IDs from DB
async with session_factory() as session:
await _load_device_ids(session)
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
start = now - timedelta(days=DAYS)
print(f"Backfill range: {start.isoformat()} -> {now.isoformat()}")
print(f"Total hours: {TOTAL_HOURS}")
# ---- Collect rows ----
energy_rows = []
carbon_rows = []
daily_buckets: dict[int, dict[str, dict]] = {}
all_power_ids = PV_IDS + HP_IDS + METER_IDS
for did in all_power_ids:
daily_buckets[did] = {}
print("Generating hourly energy_data rows (realistic models) ...")
for h_offset in range(TOTAL_HOURS):
ts = start + timedelta(hours=h_offset)
beijing_dt = ts + timedelta(hours=8)
date_str = beijing_dt.strftime("%Y-%m-%d")
# Reset cloud model each day for variety
if h_offset % 24 == 0:
reset_cloud_model()
# Re-seed per day for reproducibility but day-to-day variation
set_seed(42 + h_offset // 24)
# --- PV inverters ---
for i, did in enumerate(PV_IDS):
code = PV_CODES[i]
orientation = get_pv_orientation(code)
val = pv_power(ts, rated_power=110.0, orientation=orientation,
device_code=code)
val = round(val, 2)
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "power", "value": val, "unit": "kW", "quality": 0,
})
# Also generate electrical details for richer data
elec = pv_electrical_at(val, ts, rated_power=110.0)
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "dc_voltage", "value": elec["dc_voltage"], "unit": "V", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "ac_voltage", "value": elec["ac_voltage"], "unit": "V", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "temperature", "value": elec["temperature"], "unit": "", "quality": 0,
})
daily_buckets[did].setdefault(date_str, {"values": [], "cops": []})
daily_buckets[did][date_str]["values"].append(val)
# --- Heat pumps ---
hp_total_power = 0.0
hp_cop_sum = 0.0
hp_count = 0
for i, did in enumerate(HP_IDS):
code = HP_CODES[i]
data = heat_pump_data(ts, rated_power=35.0, device_code=code)
val = data["power"]
cop = data["cop"]
hp_total_power += val
if cop > 0:
hp_cop_sum += cop
hp_count += 1
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "power", "value": val, "unit": "kW", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "cop", "value": cop, "unit": "", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "inlet_temp", "value": data["inlet_temp"], "unit": "", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "outlet_temp", "value": data["outlet_temp"], "unit": "", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "flow_rate", "value": data["flow_rate"], "unit": "m³/h", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "outdoor_temp", "value": data["outdoor_temp"], "unit": "", "quality": 0,
})
daily_buckets[did].setdefault(date_str, {"values": [], "cops": []})
daily_buckets[did][date_str]["values"].append(val)
daily_buckets[did][date_str]["cops"].append(cop)
# --- Meters ---
for i, did in enumerate(METER_IDS):
code = METER_CODES[i]
data = building_load(ts, base_power=50.0, meter_code=code)
val = data["power"]
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "power", "value": val, "unit": "kW", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "voltage", "value": data["voltage"], "unit": "V", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "current", "value": data["current"], "unit": "A", "quality": 0,
})
energy_rows.append({
"device_id": did, "timestamp": ts,
"data_type": "power_factor", "value": data["power_factor"], "unit": "", "quality": 0,
})
daily_buckets[did].setdefault(date_str, {"values": [], "cops": []})
daily_buckets[did][date_str]["values"].append(val)
# --- Heat meter (correlated with heat pump totals) ---
avg_cop = hp_cop_sum / hp_count if hp_count > 0 else 3.0
hm_data = heat_meter_data(ts, hp_power=hp_total_power, hp_cop=avg_cop)
energy_rows.append({
"device_id": HEAT_METER_ID, "timestamp": ts,
"data_type": "heat_power", "value": hm_data["heat_power"], "unit": "kW", "quality": 0,
})
energy_rows.append({
"device_id": HEAT_METER_ID, "timestamp": ts,
"data_type": "flow_rate", "value": hm_data["flow_rate"], "unit": "m³/h", "quality": 0,
})
energy_rows.append({
"device_id": HEAT_METER_ID, "timestamp": ts,
"data_type": "supply_temp", "value": hm_data["supply_temp"], "unit": "", "quality": 0,
})
energy_rows.append({
"device_id": HEAT_METER_ID, "timestamp": ts,
"data_type": "return_temp", "value": hm_data["return_temp"], "unit": "", "quality": 0,
})
# --- Temperature/humidity sensors ---
for i, sid in enumerate(SENSOR_IDS):
code = SENSOR_CODES[i]
is_outdoor = (code == "TH-05")
data = indoor_sensor(ts, is_outdoor=is_outdoor, device_code=code)
energy_rows.append({
"device_id": sid, "timestamp": ts,
"data_type": "temperature", "value": data["temperature"], "unit": "", "quality": 0,
})
energy_rows.append({
"device_id": sid, "timestamp": ts,
"data_type": "humidity", "value": data["humidity"], "unit": "%", "quality": 0,
})
print(f" Generated {len(energy_rows)} energy_data rows")
# ---- Build daily summary rows ----
print("Computing daily summaries ...")
summary_rows = []
for did, dates in daily_buckets.items():
is_pv = did in PV_IDS
for date_str, bucket in dates.items():
values = bucket["values"]
cops = bucket["cops"]
total = round(sum(values), 2)
peak = round(max(values), 2) if values else 0
min_p = round(min(values), 2) if values else 0
avg_p = round(sum(values) / len(values), 2) if values else 0
op_hours = sum(1 for v in values if v > 0.5)
cost = round(total * 0.85, 2)
carbon = round(total * EMISSION_FACTOR, 2)
avg_cop = round(sum(cops) / len(cops), 2) if cops else None
summary_rows.append({
"device_id": did,
"date": datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc),
"energy_type": "electricity",
"total_consumption": 0.0 if is_pv else total,
"total_generation": total if is_pv else 0.0,
"peak_power": peak,
"min_power": min_p,
"avg_power": avg_p,
"operating_hours": float(op_hours),
"avg_cop": avg_cop,
"cost": cost,
"carbon_emission": carbon,
})
print(f" Generated {len(summary_rows)} daily summary rows")
# ---- Build carbon emission daily rows ----
print("Computing daily carbon emissions ...")
daily_consumption: dict[str, float] = {}
daily_pv_gen: dict[str, float] = {}
daily_hp_consumption: dict[str, float] = {}
for did, dates in daily_buckets.items():
for date_str, bucket in dates.items():
total = sum(bucket["values"])
if did in PV_IDS:
daily_pv_gen[date_str] = daily_pv_gen.get(date_str, 0) + total
elif did in HP_IDS:
daily_hp_consumption[date_str] = daily_hp_consumption.get(date_str, 0) + total
daily_consumption[date_str] = daily_consumption.get(date_str, 0) + total
else:
daily_consumption[date_str] = daily_consumption.get(date_str, 0) + total
all_dates = sorted(set(list(daily_consumption.keys()) + list(daily_pv_gen.keys())))
for date_str in all_dates:
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
# Grid electricity emission (Scope 2)
grid_kwh = daily_consumption.get(date_str, 0)
carbon_rows.append({
"date": dt, "scope": 2, "category": "electricity",
"emission": round(grid_kwh * EMISSION_FACTOR, 2),
"reduction": 0.0,
"energy_consumption": round(grid_kwh, 2),
"energy_unit": "kWh",
"note": "园区用电碳排放",
})
# PV generation reduction (Scope 2 avoided)
pv_kwh = daily_pv_gen.get(date_str, 0)
if pv_kwh > 0:
carbon_rows.append({
"date": dt, "scope": 2, "category": "pv_generation",
"emission": 0.0,
"reduction": round(pv_kwh * EMISSION_FACTOR, 2),
"energy_consumption": round(pv_kwh, 2),
"energy_unit": "kWh",
"note": "光伏发电碳减排",
})
# Heat pump saving (COP-based reduction vs electric heating)
hp_kwh = daily_hp_consumption.get(date_str, 0)
if hp_kwh > 0:
avg_cop_day = 3.2
heat_delivered = hp_kwh * avg_cop_day
electric_heating_kwh = heat_delivered # COP=1 for electric heating
saved_kwh = electric_heating_kwh - hp_kwh
carbon_rows.append({
"date": dt, "scope": 2, "category": "heat_pump_saving",
"emission": 0.0,
"reduction": round(saved_kwh * EMISSION_FACTOR, 2),
"energy_consumption": round(saved_kwh, 2),
"energy_unit": "kWh",
"note": "热泵节能碳减排(相比电加热)",
})
print(f" Generated {len(carbon_rows)} carbon emission rows")
# ---- Bulk insert ----
BATCH = 2000
async with session_factory() as session:
# Insert energy_data
print("Inserting energy_data ...")
insert_energy = text("""
INSERT INTO energy_data (device_id, timestamp, data_type, value, unit, quality)
VALUES (:device_id, :timestamp, :data_type, :value, :unit, :quality)
""")
for i in range(0, len(energy_rows), BATCH):
batch = energy_rows[i : i + BATCH]
await session.execute(insert_energy, batch)
done = min(i + BATCH, len(energy_rows))
if done % 10000 < BATCH:
print(f" energy_data: {done}/{len(energy_rows)}")
await session.commit()
print(" energy_data done.")
# Insert daily summaries
print("Inserting energy_daily_summary ...")
insert_summary = text("""
INSERT INTO energy_daily_summary
(device_id, date, energy_type, total_consumption, total_generation,
peak_power, min_power, avg_power, operating_hours, avg_cop, cost, carbon_emission)
VALUES
(:device_id, :date, :energy_type, :total_consumption, :total_generation,
:peak_power, :min_power, :avg_power, :operating_hours, :avg_cop, :cost, :carbon_emission)
""")
for i in range(0, len(summary_rows), BATCH):
batch = summary_rows[i : i + BATCH]
await session.execute(insert_summary, batch)
await session.commit()
print(f" daily_summary done. ({len(summary_rows)} rows)")
# Insert carbon emissions
print("Inserting carbon_emissions ...")
insert_carbon = text("""
INSERT INTO carbon_emissions
(date, scope, category, emission, reduction,
energy_consumption, energy_unit, note)
VALUES
(:date, :scope, :category, :emission, :reduction,
:energy_consumption, :energy_unit, :note)
""")
for i in range(0, len(carbon_rows), BATCH):
batch = carbon_rows[i : i + BATCH]
await session.execute(insert_carbon, batch)
await session.commit()
print(f" carbon_emissions done. ({len(carbon_rows)} rows)")
await engine.dispose()
print("=" * 60)
print("Backfill complete!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(backfill())