Files
tianpu-ems/scripts/seed_zpark.py
Du Wenbo 02c4698b59 feat: multi-customer architecture + Z-Park support + Gitea migration scripts
Multi-customer config system:
- CUSTOMER env var selects customer (tianpu/zpark)
- customers/tianpu/config.yaml — Tianpu branding, collectors, features
- customers/zpark/config.yaml — Z-Park branding, Sungrow collector
- GET /api/v1/branding endpoint for customer-specific branding
- main.py loads customer config for app title, CORS, logging
- Collector manager filters by customer's enabled collectors

Z-Park (中关村医疗器械园) support:
- Sungrow iSolarCloud API collector (sungrow_collector.py)
- Z-Park device definitions (10 inverters, 8 combiner boxes, 22+ buildings)
- Z-Park TOU pricing config (Beijing 2026 rates)
- Z-Park seed script (seed_zpark.py)

Gitea migration scripts (Mac Studio → labmac3):
- 5 migration scripts + README in scripts/gitea-migration/
- Creates 3-repo structure: ems-core, tp-ems, zpark-ems

Version: v1.0.0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 16:23:33 +08:00

190 lines
7.2 KiB
Python

"""种子数据 - 中关村医疗器械园光伏设备、告警规则、碳排放因子、电价配置"""
import asyncio
import json
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "backend"))
from sqlalchemy import select
from app.core.database import async_session, engine
from app.models.device import Device, DeviceType, DeviceGroup
from app.models.alarm import AlarmRule
from app.models.carbon import EmissionFactor
from app.models.pricing import ElectricityPricing, PricingPeriod
# Path to device definitions
DEVICES_JSON = os.path.join(os.path.dirname(__file__), "..", "customers", "zpark", "devices.json")
PRICING_JSON = os.path.join(os.path.dirname(__file__), "..", "customers", "zpark", "pricing.json")
async def seed():
with open(DEVICES_JSON, "r", encoding="utf-8") as f:
config = json.load(f)
async with async_session() as session:
# =================================================================
# 1. 设备类型
# =================================================================
for dt in config["device_types"]:
# Check if type already exists (may overlap with tianpu seed)
existing = await session.execute(
select(DeviceType).where(DeviceType.code == dt["code"])
)
if existing.scalar_one_or_none() is None:
session.add(DeviceType(
code=dt["code"],
name=dt["name"],
icon=dt.get("icon"),
data_fields=dt.get("data_fields"),
))
await session.flush()
# =================================================================
# 2. 设备分组 (hierarchical)
# =================================================================
group_name_to_id = {}
async def create_groups(groups, parent_id=None):
for g in groups:
grp = DeviceGroup(
name=g["name"],
parent_id=parent_id,
location=g.get("location"),
description=g.get("description"),
)
session.add(grp)
await session.flush()
group_name_to_id[g["name"]] = grp.id
if "children" in g:
await create_groups(g["children"], parent_id=grp.id)
await create_groups(config["device_groups"])
# =================================================================
# 3. 设备
# =================================================================
devices = []
for d in config["devices"]:
group_id = group_name_to_id.get(d.get("group"))
device = Device(
name=d["name"],
code=d["code"],
device_type=d["device_type"],
group_id=group_id,
model=d.get("model"),
manufacturer=d.get("manufacturer"),
rated_power=d.get("rated_power"),
location=d.get("location", ""),
protocol=d.get("protocol", "http_api"),
connection_params=d.get("connection_params"),
collect_interval=d.get("collect_interval", 900),
status="offline",
is_active=True,
)
devices.append(device)
session.add_all(devices)
await session.flush()
# =================================================================
# 4. 碳排放因子 (光伏减排)
# =================================================================
# Check if PV generation factor already exists
existing_factor = await session.execute(
select(EmissionFactor).where(EmissionFactor.energy_type == "pv_generation")
)
if existing_factor.scalar_one_or_none() is None:
session.add(EmissionFactor(
name="华北电网光伏减排因子",
energy_type="pv_generation",
factor=0.8843,
unit="kWh",
scope=2,
region="north_china",
source="等量替代电网电力",
year=2023,
))
await session.flush()
# =================================================================
# 5. 告警规则 (逆变器监控)
# =================================================================
alarm_rules = [
AlarmRule(
name="逆变器功率过低告警",
device_type="sungrow_inverter",
data_type="power",
condition="lt",
threshold=1.0,
duration=1800,
severity="warning",
notify_channels=["app", "wechat"],
is_active=True,
),
AlarmRule(
name="逆变器通信中断告警",
device_type="sungrow_inverter",
data_type="power",
condition="eq",
threshold=0.0,
duration=3600,
severity="critical",
notify_channels=["app", "sms", "wechat"],
is_active=True,
),
AlarmRule(
name="逆变器过温告警",
device_type="sungrow_inverter",
data_type="temperature",
condition="gt",
threshold=70.0,
duration=120,
severity="major",
notify_channels=["app", "sms"],
is_active=True,
),
]
session.add_all(alarm_rules)
# =================================================================
# 6. 电价配置
# =================================================================
if os.path.exists(PRICING_JSON):
with open(PRICING_JSON, "r", encoding="utf-8") as f:
pricing_config = json.load(f)
pricing = ElectricityPricing(
name=pricing_config["name"],
energy_type=pricing_config.get("energy_type", "electricity"),
pricing_type=pricing_config.get("pricing_type", "tou"),
is_active=True,
)
session.add(pricing)
await session.flush()
for period in pricing_config.get("periods", []):
session.add(PricingPeriod(
pricing_id=pricing.id,
period_name=period["name"],
start_time=period["start"],
end_time=period["end"],
price_per_unit=period["price"],
))
await session.commit()
print("Z-Park seed data created successfully!")
# Print summary
dev_count = len(config["devices"])
group_count = len(group_name_to_id)
print(f" - Device types: {len(config['device_types'])}")
print(f" - Device groups: {group_count}")
print(f" - Devices: {dev_count}")
print(f" - Alarm rules: {len(alarm_rules)}")
print(f" - Pricing periods: {len(pricing_config.get('periods', []))}")
if __name__ == "__main__":
asyncio.run(seed())