Files
tianpu-ems/scripts/backfill_data.py
Du Wenbo 6a59f9af76 feat: add 3D interactive dashboard and 2D BigScreen pages
- New /bigscreen-3d route: React Three Fiber 3D campus with buildings,
  PV panels, heat pumps, meters, and sensors — all procedural geometry
- Interactive: hover highlight, click to select, camera fly-in to
  device detail views (PV inverter, heat pump, meter, heat meter, sensor)
- Real-time data: 15s polling for overview, 5s for selected device
- Energy flow particles along PV→Building, Grid→Building, Building→HP paths
- HUD overlay with date/clock, bottom metrics bar, device list panel
- New /bigscreen route: 2D dashboard with energy flow diagram, charts
- New /devices route: device management page
- Vite config: optimizeDeps.force for R3F dep consistency
- Data backfill script for testing

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 22:43:48 +08:00

214 lines
7.6 KiB
Python

"""回填历史模拟能耗数据 - 过去30天逐小时数据"""
import asyncio
import math
import os
import random
import sys
from datetime import datetime, timedelta, timezone
sys.path.insert(0, "../backend")
# Allow override via env var (same pattern as other scripts)
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://tianpu:tianpu2026@localhost:5432/tianpu_ems",
)
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import text
# ---------------------------------------------------------------------------
# Device definitions
# ---------------------------------------------------------------------------
PV_IDS = [1, 2, 3] # 110 kW rated each
HP_IDS = [4, 5, 6, 7] # 35 kW rated each
METER_IDS = [8, 9, 10, 11]
DAYS = 30
HOURS_PER_DAY = 24
TOTAL_HOURS = DAYS * HOURS_PER_DAY # 720
# ---------------------------------------------------------------------------
# Curve generators (return kW for a given hour-of-day 0-23)
# ---------------------------------------------------------------------------
def pv_power(hour: int, rated: float = 110.0) -> float:
"""Solar bell curve: 0 at night, peak ~80-100 kW around noon."""
if hour < 6 or hour >= 18:
return 0.0
# sine curve from 6-18 with peak at 12
x = (hour - 6) / 12.0 * math.pi
base = math.sin(x) * rated * random.uniform(0.72, 0.92)
noise = base * random.uniform(-0.15, 0.15)
return max(0.0, base + noise)
def heat_pump_power(hour: int, rated: float = 35.0) -> float:
"""Heat pump load: base 15-25 kW, peaks morning 7-9 and evening 17-20.
Spring season so moderate."""
base = random.uniform(15, 25)
if 7 <= hour <= 9:
base += random.uniform(5, 10)
elif 17 <= hour <= 20:
base += random.uniform(5, 10)
elif 0 <= hour <= 5:
base -= random.uniform(3, 8)
base = min(base, rated)
noise = base * random.uniform(-0.20, 0.20)
return max(0.0, base + noise)
def meter_power(hour: int) -> float:
"""Building load: ~60 kW at night, ~100 kW during business hours."""
if 8 <= hour <= 18:
base = random.uniform(85, 115)
elif 6 <= hour <= 7 or 19 <= hour <= 21:
base = random.uniform(65, 85)
else:
base = random.uniform(45, 70)
noise = base * random.uniform(-0.15, 0.15)
return max(0.0, base + noise)
# ---------------------------------------------------------------------------
# Main backfill
# ---------------------------------------------------------------------------
async def backfill():
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=5)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0)
start = now - timedelta(days=DAYS)
print(f"Backfill range: {start.isoformat()} -> {now.isoformat()}")
print(f"Total hours: {TOTAL_HOURS}")
# ---- Collect hourly energy_data rows and per-device daily buckets ----
energy_rows = [] # list of dicts for bulk insert
# daily_buckets[device_id][date_str] = list of hourly values
daily_buckets: dict[int, dict[str, list[float]]] = {}
all_device_ids = PV_IDS + HP_IDS + METER_IDS
for did in all_device_ids:
daily_buckets[did] = {}
print("Generating hourly energy_data rows ...")
for h_offset in range(TOTAL_HOURS):
ts = start + timedelta(hours=h_offset)
hour = ts.hour
date_str = ts.strftime("%Y-%m-%d")
for did in PV_IDS:
val = round(pv_power(hour), 2)
energy_rows.append({
"device_id": did,
"timestamp": ts,
"data_type": "power",
"value": val,
"unit": "kW",
"quality": 100,
})
daily_buckets[did].setdefault(date_str, []).append(val)
for did in HP_IDS:
val = round(heat_pump_power(hour), 2)
energy_rows.append({
"device_id": did,
"timestamp": ts,
"data_type": "power",
"value": val,
"unit": "kW",
"quality": 100,
})
daily_buckets[did].setdefault(date_str, []).append(val)
for did in METER_IDS:
val = round(meter_power(hour), 2)
energy_rows.append({
"device_id": did,
"timestamp": ts,
"data_type": "power",
"value": val,
"unit": "kW",
"quality": 100,
})
daily_buckets[did].setdefault(date_str, []).append(val)
print(f" Generated {len(energy_rows)} energy_data rows")
# ---- Build daily summary rows ----
print("Computing daily summaries ...")
summary_rows = []
for did, dates in daily_buckets.items():
is_pv = did in PV_IDS
for date_str, values in dates.items():
total = round(sum(values), 2) # kWh (hourly power * 1h)
peak = round(max(values), 2)
min_p = round(min(values), 2)
avg_p = round(sum(values) / len(values), 2)
op_hours = sum(1 for v in values if v > 0)
cost = round(total * 0.85, 2)
carbon = round(total * 0.8843, 2)
summary_rows.append({
"device_id": did,
"date": datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc),
"energy_type": "electricity",
"total_consumption": 0.0 if is_pv else total,
"total_generation": total if is_pv else 0.0,
"peak_power": peak,
"min_power": min_p,
"avg_power": avg_p,
"operating_hours": float(op_hours),
"cost": cost,
"carbon_emission": carbon,
})
print(f" Generated {len(summary_rows)} daily summary rows")
# ---- Bulk insert ----
BATCH = 2000
async with session_factory() as session:
# Insert energy_data
print("Inserting energy_data ...")
insert_energy = text("""
INSERT INTO energy_data (device_id, timestamp, data_type, value, unit, quality)
VALUES (:device_id, :timestamp, :data_type, :value, :unit, :quality)
""")
for i in range(0, len(energy_rows), BATCH):
batch = energy_rows[i : i + BATCH]
await session.execute(insert_energy, batch)
done = min(i + BATCH, len(energy_rows))
print(f" energy_data: {done}/{len(energy_rows)}")
await session.commit()
print(" energy_data done.")
# Insert daily summaries
print("Inserting energy_daily_summary ...")
insert_summary = text("""
INSERT INTO energy_daily_summary
(device_id, date, energy_type, total_consumption, total_generation,
peak_power, min_power, avg_power, operating_hours, cost, carbon_emission)
VALUES
(:device_id, :date, :energy_type, :total_consumption, :total_generation,
:peak_power, :min_power, :avg_power, :operating_hours, :cost, :carbon_emission)
""")
for i in range(0, len(summary_rows), BATCH):
batch = summary_rows[i : i + BATCH]
await session.execute(insert_summary, batch)
done = min(i + BATCH, len(summary_rows))
print(f" daily_summary: {done}/{len(summary_rows)}")
await session.commit()
print(" daily_summary done.")
await engine.dispose()
print("Backfill complete!")
if __name__ == "__main__":
asyncio.run(backfill())