Shared backend + frontend for multi-customer EMS deployments. - 12 enterprise modules: quota, cost, charging, maintenance, analysis, etc. - 120+ API endpoints, 37 database tables - Customer config mechanism (CUSTOMER env var + YAML config) - Collectors: Modbus TCP, MQTT, HTTP API, Sungrow iSolarCloud - Frontend: React 19 + Ant Design + ECharts + Three.js - Infrastructure: Redis cache, rate limiting, aggregation engine Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
400 lines
16 KiB
Python
400 lines
16 KiB
Python
"""回填历史模拟能耗数据 - 过去30天逐小时数据,含碳排放记录
|
||
|
||
Uses the shared weather_model for physics-based solar, temperature, and load
|
||
generation. Deterministic seed (42) ensures reproducible output across runs.
|
||
"""
|
||
import asyncio
|
||
import math
|
||
import os
|
||
import random
|
||
import sys
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "backend"))
|
||
|
||
DATABASE_URL = os.environ.get(
|
||
"DATABASE_URL",
|
||
"postgresql+asyncpg://tianpu:tianpu2026@localhost:5432/tianpu_ems",
|
||
)
|
||
|
||
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
|
||
from sqlalchemy import text, select
|
||
|
||
from app.services.weather_model import (
|
||
set_seed, reset_cloud_model,
|
||
pv_power, pv_electrical_at, get_pv_orientation,
|
||
heat_pump_data, building_load, indoor_sensor,
|
||
heat_meter_data, outdoor_temperature, outdoor_humidity,
|
||
get_hvac_mode,
|
||
)
|
||
from app.models.device import Device
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Device definitions — will be populated from DB at runtime
|
||
# ---------------------------------------------------------------------------
|
||
PV_IDS = []
|
||
PV_CODES = ["INV-01", "INV-02", "INV-03"]
|
||
HP_IDS = []
|
||
HP_CODES = ["HP-01", "HP-02", "HP-03", "HP-04"]
|
||
METER_IDS = []
|
||
METER_CODES = ["METER-GRID", "METER-PV", "METER-HP", "METER-PUMP"]
|
||
HEAT_METER_ID = None
|
||
SENSOR_IDS = []
|
||
SENSOR_CODES = ["TH-01", "TH-02", "TH-03", "TH-04", "TH-05"]
|
||
|
||
|
||
async def _load_device_ids(session: AsyncSession):
|
||
"""Load actual device IDs from DB by code."""
|
||
global PV_IDS, HP_IDS, METER_IDS, HEAT_METER_ID, SENSOR_IDS
|
||
result = await session.execute(select(Device.id, Device.code).order_by(Device.id))
|
||
code_to_id = {row[1]: row[0] for row in result.all()}
|
||
|
||
PV_IDS = [code_to_id[c] for c in PV_CODES if c in code_to_id]
|
||
HP_IDS = [code_to_id[c] for c in HP_CODES if c in code_to_id]
|
||
METER_IDS = [code_to_id[c] for c in METER_CODES if c in code_to_id]
|
||
HEAT_METER_ID = code_to_id.get("HM-01")
|
||
SENSOR_IDS = [code_to_id[c] for c in SENSOR_CODES if c in code_to_id]
|
||
print(f" Loaded device IDs: PV={PV_IDS}, HP={HP_IDS}, Meters={METER_IDS}, HeatMeter={HEAT_METER_ID}, Sensors={SENSOR_IDS}")
|
||
|
||
EMISSION_FACTOR = 0.8843 # kgCO2/kWh - North China grid
|
||
|
||
DAYS = 30
|
||
HOURS_PER_DAY = 24
|
||
TOTAL_HOURS = DAYS * HOURS_PER_DAY
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main backfill
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def backfill():
|
||
# Set deterministic seed for reproducibility
|
||
set_seed(42)
|
||
|
||
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=5)
|
||
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||
|
||
# Load actual device IDs from DB
|
||
async with session_factory() as session:
|
||
await _load_device_ids(session)
|
||
|
||
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
|
||
start = now - timedelta(days=DAYS)
|
||
|
||
print(f"Backfill range: {start.isoformat()} -> {now.isoformat()}")
|
||
print(f"Total hours: {TOTAL_HOURS}")
|
||
|
||
# ---- Collect rows ----
|
||
energy_rows = []
|
||
carbon_rows = []
|
||
daily_buckets: dict[int, dict[str, dict]] = {}
|
||
|
||
all_power_ids = PV_IDS + HP_IDS + METER_IDS
|
||
for did in all_power_ids:
|
||
daily_buckets[did] = {}
|
||
|
||
print("Generating hourly energy_data rows (realistic models) ...")
|
||
for h_offset in range(TOTAL_HOURS):
|
||
ts = start + timedelta(hours=h_offset)
|
||
beijing_dt = ts + timedelta(hours=8)
|
||
date_str = beijing_dt.strftime("%Y-%m-%d")
|
||
|
||
# Reset cloud model each day for variety
|
||
if h_offset % 24 == 0:
|
||
reset_cloud_model()
|
||
# Re-seed per day for reproducibility but day-to-day variation
|
||
set_seed(42 + h_offset // 24)
|
||
|
||
# --- PV inverters ---
|
||
for i, did in enumerate(PV_IDS):
|
||
code = PV_CODES[i]
|
||
orientation = get_pv_orientation(code)
|
||
val = pv_power(ts, rated_power=110.0, orientation=orientation,
|
||
device_code=code)
|
||
val = round(val, 2)
|
||
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "power", "value": val, "unit": "kW", "quality": 0,
|
||
})
|
||
|
||
# Also generate electrical details for richer data
|
||
elec = pv_electrical_at(val, ts, rated_power=110.0)
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "dc_voltage", "value": elec["dc_voltage"], "unit": "V", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "ac_voltage", "value": elec["ac_voltage"], "unit": "V", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "temperature", "value": elec["temperature"], "unit": "℃", "quality": 0,
|
||
})
|
||
|
||
daily_buckets[did].setdefault(date_str, {"values": [], "cops": []})
|
||
daily_buckets[did][date_str]["values"].append(val)
|
||
|
||
# --- Heat pumps ---
|
||
hp_total_power = 0.0
|
||
hp_cop_sum = 0.0
|
||
hp_count = 0
|
||
for i, did in enumerate(HP_IDS):
|
||
code = HP_CODES[i]
|
||
data = heat_pump_data(ts, rated_power=35.0, device_code=code)
|
||
val = data["power"]
|
||
cop = data["cop"]
|
||
|
||
hp_total_power += val
|
||
if cop > 0:
|
||
hp_cop_sum += cop
|
||
hp_count += 1
|
||
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "power", "value": val, "unit": "kW", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "cop", "value": cop, "unit": "", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "inlet_temp", "value": data["inlet_temp"], "unit": "℃", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "outlet_temp", "value": data["outlet_temp"], "unit": "℃", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "flow_rate", "value": data["flow_rate"], "unit": "m³/h", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "outdoor_temp", "value": data["outdoor_temp"], "unit": "℃", "quality": 0,
|
||
})
|
||
|
||
daily_buckets[did].setdefault(date_str, {"values": [], "cops": []})
|
||
daily_buckets[did][date_str]["values"].append(val)
|
||
daily_buckets[did][date_str]["cops"].append(cop)
|
||
|
||
# --- Meters ---
|
||
for i, did in enumerate(METER_IDS):
|
||
code = METER_CODES[i]
|
||
data = building_load(ts, base_power=50.0, meter_code=code)
|
||
val = data["power"]
|
||
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "power", "value": val, "unit": "kW", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "voltage", "value": data["voltage"], "unit": "V", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "current", "value": data["current"], "unit": "A", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": did, "timestamp": ts,
|
||
"data_type": "power_factor", "value": data["power_factor"], "unit": "", "quality": 0,
|
||
})
|
||
|
||
daily_buckets[did].setdefault(date_str, {"values": [], "cops": []})
|
||
daily_buckets[did][date_str]["values"].append(val)
|
||
|
||
# --- Heat meter (correlated with heat pump totals) ---
|
||
avg_cop = hp_cop_sum / hp_count if hp_count > 0 else 3.0
|
||
hm_data = heat_meter_data(ts, hp_power=hp_total_power, hp_cop=avg_cop)
|
||
energy_rows.append({
|
||
"device_id": HEAT_METER_ID, "timestamp": ts,
|
||
"data_type": "heat_power", "value": hm_data["heat_power"], "unit": "kW", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": HEAT_METER_ID, "timestamp": ts,
|
||
"data_type": "flow_rate", "value": hm_data["flow_rate"], "unit": "m³/h", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": HEAT_METER_ID, "timestamp": ts,
|
||
"data_type": "supply_temp", "value": hm_data["supply_temp"], "unit": "℃", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": HEAT_METER_ID, "timestamp": ts,
|
||
"data_type": "return_temp", "value": hm_data["return_temp"], "unit": "℃", "quality": 0,
|
||
})
|
||
|
||
# --- Temperature/humidity sensors ---
|
||
for i, sid in enumerate(SENSOR_IDS):
|
||
code = SENSOR_CODES[i]
|
||
is_outdoor = (code == "TH-05")
|
||
data = indoor_sensor(ts, is_outdoor=is_outdoor, device_code=code)
|
||
energy_rows.append({
|
||
"device_id": sid, "timestamp": ts,
|
||
"data_type": "temperature", "value": data["temperature"], "unit": "℃", "quality": 0,
|
||
})
|
||
energy_rows.append({
|
||
"device_id": sid, "timestamp": ts,
|
||
"data_type": "humidity", "value": data["humidity"], "unit": "%", "quality": 0,
|
||
})
|
||
|
||
print(f" Generated {len(energy_rows)} energy_data rows")
|
||
|
||
# ---- Build daily summary rows ----
|
||
print("Computing daily summaries ...")
|
||
summary_rows = []
|
||
for did, dates in daily_buckets.items():
|
||
is_pv = did in PV_IDS
|
||
for date_str, bucket in dates.items():
|
||
values = bucket["values"]
|
||
cops = bucket["cops"]
|
||
total = round(sum(values), 2)
|
||
peak = round(max(values), 2) if values else 0
|
||
min_p = round(min(values), 2) if values else 0
|
||
avg_p = round(sum(values) / len(values), 2) if values else 0
|
||
op_hours = sum(1 for v in values if v > 0.5)
|
||
cost = round(total * 0.85, 2)
|
||
carbon = round(total * EMISSION_FACTOR, 2)
|
||
avg_cop = round(sum(cops) / len(cops), 2) if cops else None
|
||
|
||
summary_rows.append({
|
||
"device_id": did,
|
||
"date": datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc),
|
||
"energy_type": "electricity",
|
||
"total_consumption": 0.0 if is_pv else total,
|
||
"total_generation": total if is_pv else 0.0,
|
||
"peak_power": peak,
|
||
"min_power": min_p,
|
||
"avg_power": avg_p,
|
||
"operating_hours": float(op_hours),
|
||
"avg_cop": avg_cop,
|
||
"cost": cost,
|
||
"carbon_emission": carbon,
|
||
})
|
||
|
||
print(f" Generated {len(summary_rows)} daily summary rows")
|
||
|
||
# ---- Build carbon emission daily rows ----
|
||
print("Computing daily carbon emissions ...")
|
||
daily_consumption: dict[str, float] = {}
|
||
daily_pv_gen: dict[str, float] = {}
|
||
daily_hp_consumption: dict[str, float] = {}
|
||
|
||
for did, dates in daily_buckets.items():
|
||
for date_str, bucket in dates.items():
|
||
total = sum(bucket["values"])
|
||
if did in PV_IDS:
|
||
daily_pv_gen[date_str] = daily_pv_gen.get(date_str, 0) + total
|
||
elif did in HP_IDS:
|
||
daily_hp_consumption[date_str] = daily_hp_consumption.get(date_str, 0) + total
|
||
daily_consumption[date_str] = daily_consumption.get(date_str, 0) + total
|
||
else:
|
||
daily_consumption[date_str] = daily_consumption.get(date_str, 0) + total
|
||
|
||
all_dates = sorted(set(list(daily_consumption.keys()) + list(daily_pv_gen.keys())))
|
||
for date_str in all_dates:
|
||
dt = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
|
||
|
||
# Grid electricity emission (Scope 2)
|
||
grid_kwh = daily_consumption.get(date_str, 0)
|
||
carbon_rows.append({
|
||
"date": dt, "scope": 2, "category": "electricity",
|
||
"emission": round(grid_kwh * EMISSION_FACTOR, 2),
|
||
"reduction": 0.0,
|
||
"energy_consumption": round(grid_kwh, 2),
|
||
"energy_unit": "kWh",
|
||
"note": "园区用电碳排放",
|
||
})
|
||
|
||
# PV generation reduction (Scope 2 avoided)
|
||
pv_kwh = daily_pv_gen.get(date_str, 0)
|
||
if pv_kwh > 0:
|
||
carbon_rows.append({
|
||
"date": dt, "scope": 2, "category": "pv_generation",
|
||
"emission": 0.0,
|
||
"reduction": round(pv_kwh * EMISSION_FACTOR, 2),
|
||
"energy_consumption": round(pv_kwh, 2),
|
||
"energy_unit": "kWh",
|
||
"note": "光伏发电碳减排",
|
||
})
|
||
|
||
# Heat pump saving (COP-based reduction vs electric heating)
|
||
hp_kwh = daily_hp_consumption.get(date_str, 0)
|
||
if hp_kwh > 0:
|
||
avg_cop_day = 3.2
|
||
heat_delivered = hp_kwh * avg_cop_day
|
||
electric_heating_kwh = heat_delivered # COP=1 for electric heating
|
||
saved_kwh = electric_heating_kwh - hp_kwh
|
||
carbon_rows.append({
|
||
"date": dt, "scope": 2, "category": "heat_pump_saving",
|
||
"emission": 0.0,
|
||
"reduction": round(saved_kwh * EMISSION_FACTOR, 2),
|
||
"energy_consumption": round(saved_kwh, 2),
|
||
"energy_unit": "kWh",
|
||
"note": "热泵节能碳减排(相比电加热)",
|
||
})
|
||
|
||
print(f" Generated {len(carbon_rows)} carbon emission rows")
|
||
|
||
# ---- Bulk insert ----
|
||
BATCH = 2000
|
||
|
||
async with session_factory() as session:
|
||
# Insert energy_data
|
||
print("Inserting energy_data ...")
|
||
insert_energy = text("""
|
||
INSERT INTO energy_data (device_id, timestamp, data_type, value, unit, quality)
|
||
VALUES (:device_id, :timestamp, :data_type, :value, :unit, :quality)
|
||
""")
|
||
for i in range(0, len(energy_rows), BATCH):
|
||
batch = energy_rows[i : i + BATCH]
|
||
await session.execute(insert_energy, batch)
|
||
done = min(i + BATCH, len(energy_rows))
|
||
if done % 10000 < BATCH:
|
||
print(f" energy_data: {done}/{len(energy_rows)}")
|
||
await session.commit()
|
||
print(" energy_data done.")
|
||
|
||
# Insert daily summaries
|
||
print("Inserting energy_daily_summary ...")
|
||
insert_summary = text("""
|
||
INSERT INTO energy_daily_summary
|
||
(device_id, date, energy_type, total_consumption, total_generation,
|
||
peak_power, min_power, avg_power, operating_hours, avg_cop, cost, carbon_emission)
|
||
VALUES
|
||
(:device_id, :date, :energy_type, :total_consumption, :total_generation,
|
||
:peak_power, :min_power, :avg_power, :operating_hours, :avg_cop, :cost, :carbon_emission)
|
||
""")
|
||
for i in range(0, len(summary_rows), BATCH):
|
||
batch = summary_rows[i : i + BATCH]
|
||
await session.execute(insert_summary, batch)
|
||
await session.commit()
|
||
print(f" daily_summary done. ({len(summary_rows)} rows)")
|
||
|
||
# Insert carbon emissions
|
||
print("Inserting carbon_emissions ...")
|
||
insert_carbon = text("""
|
||
INSERT INTO carbon_emissions
|
||
(date, scope, category, emission, reduction,
|
||
energy_consumption, energy_unit, note)
|
||
VALUES
|
||
(:date, :scope, :category, :emission, :reduction,
|
||
:energy_consumption, :energy_unit, :note)
|
||
""")
|
||
for i in range(0, len(carbon_rows), BATCH):
|
||
batch = carbon_rows[i : i + BATCH]
|
||
await session.execute(insert_carbon, batch)
|
||
await session.commit()
|
||
print(f" carbon_emissions done. ({len(carbon_rows)} rows)")
|
||
|
||
await engine.dispose()
|
||
print("=" * 60)
|
||
print("Backfill complete!")
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(backfill())
|