"""种子数据 - 中关村医疗器械园光伏设备、告警规则、碳排放因子、电价配置""" import asyncio import json import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "core", "backend")) from sqlalchemy import select from app.core.database import async_session, engine from app.models.device import Device, DeviceType, DeviceGroup from app.models.alarm import AlarmRule from app.models.carbon import EmissionFactor from app.models.pricing import ElectricityPricing, PricingPeriod # Path to device definitions DEVICES_JSON = os.path.join(os.path.dirname(__file__), "..", "customers", "zpark", "devices.json") PRICING_JSON = os.path.join(os.path.dirname(__file__), "..", "customers", "zpark", "pricing.json") async def seed(): with open(DEVICES_JSON, "r", encoding="utf-8") as f: config = json.load(f) async with async_session() as session: # ================================================================= # 1. 设备类型 # ================================================================= for dt in config["device_types"]: # Check if type already exists (may overlap with tianpu seed) existing = await session.execute( select(DeviceType).where(DeviceType.code == dt["code"]) ) if existing.scalar_one_or_none() is None: session.add(DeviceType( code=dt["code"], name=dt["name"], icon=dt.get("icon"), data_fields=dt.get("data_fields"), )) await session.flush() # ================================================================= # 2. 设备分组 (hierarchical) # ================================================================= group_name_to_id = {} async def create_groups(groups, parent_id=None): for g in groups: grp = DeviceGroup( name=g["name"], parent_id=parent_id, location=g.get("location"), description=g.get("description"), ) session.add(grp) await session.flush() group_name_to_id[g["name"]] = grp.id if "children" in g: await create_groups(g["children"], parent_id=grp.id) await create_groups(config["device_groups"]) # ================================================================= # 3. 设备 # ================================================================= devices = [] for d in config["devices"]: group_id = group_name_to_id.get(d.get("group")) device = Device( name=d["name"], code=d["code"], device_type=d["device_type"], group_id=group_id, model=d.get("model"), manufacturer=d.get("manufacturer"), rated_power=d.get("rated_power"), location=d.get("location", ""), protocol=d.get("protocol", "http_api"), connection_params=d.get("connection_params"), collect_interval=d.get("collect_interval", 900), status="offline", is_active=True, ) devices.append(device) session.add_all(devices) await session.flush() # ================================================================= # 4. 碳排放因子 (光伏减排) # ================================================================= # Check if PV generation factor already exists existing_factor = await session.execute( select(EmissionFactor).where(EmissionFactor.energy_type == "pv_generation") ) if existing_factor.scalar_one_or_none() is None: session.add(EmissionFactor( name="华北电网光伏减排因子", energy_type="pv_generation", factor=0.8843, unit="kWh", scope=2, region="north_china", source="等量替代电网电力", year=2023, )) await session.flush() # ================================================================= # 5. 告警规则 (逆变器监控) # ================================================================= alarm_rules = [ AlarmRule( name="逆变器功率过低告警", device_type="sungrow_inverter", data_type="power", condition="lt", threshold=1.0, duration=1800, severity="warning", notify_channels=["app", "wechat"], is_active=True, ), AlarmRule( name="逆变器通信中断告警", device_type="sungrow_inverter", data_type="power", condition="eq", threshold=0.0, duration=3600, severity="critical", notify_channels=["app", "sms", "wechat"], is_active=True, ), AlarmRule( name="逆变器过温告警", device_type="sungrow_inverter", data_type="temperature", condition="gt", threshold=70.0, duration=120, severity="major", notify_channels=["app", "sms"], is_active=True, ), ] session.add_all(alarm_rules) # ================================================================= # 6. 电价配置 # ================================================================= if os.path.exists(PRICING_JSON): with open(PRICING_JSON, "r", encoding="utf-8") as f: pricing_config = json.load(f) pricing = ElectricityPricing( name=pricing_config["name"], energy_type=pricing_config.get("energy_type", "electricity"), pricing_type=pricing_config.get("pricing_type", "tou"), is_active=True, ) session.add(pricing) await session.flush() for period in pricing_config.get("periods", []): session.add(PricingPeriod( pricing_id=pricing.id, period_name=period["name"], start_time=period["start"], end_time=period["end"], price_per_unit=period["price"], )) await session.commit() print("Z-Park seed data created successfully!") # Print summary dev_count = len(config["devices"]) group_count = len(group_name_to_id) print(f" - Device types: {len(config['device_types'])}") print(f" - Device groups: {group_count}") print(f" - Devices: {dev_count}") print(f" - Alarm rules: {len(alarm_rules)}") print(f" - Pricing periods: {len(pricing_config.get('periods', []))}") if __name__ == "__main__": asyncio.run(seed())