Files
zpark-ems/customers/zpark/hooks/zpark_hooks.py
Du Wenbo ed30ac31e4 feat: add Z-Park customer hooks + update core to v1.1.0
- core/ updated to ems-core v1.1.0 (hooks plugin system)
- customers/zpark/hooks/zpark_hooks.py: Z-Park custom logic
  - on_alarm_created: 逆变器严重告警 → 自动创建维修工单
  - on_alarm_resolved: 告警恢复日志
  - on_device_status_changed: 逆变器离线告警
  - calculate_custom_kpis: 光伏发电效率、自发自用率

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 18:32:33 +08:00

113 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Z-Park 客户钩子实现
业务逻辑:
1. 逆变器严重告警 → 自动创建维修工单
2. 设备离线 → 发送通知
3. 自定义KPI → 光伏发电效率、自发自用率
"""
import logging
from datetime import datetime, timezone
logger = logging.getLogger("hooks.zpark")
# Import path works because loader adds customer project root to sys.path
from app.hooks.base import CustomerHooks
class ZParkHooks(CustomerHooks):
"""中关村医疗器械园自定义钩子"""
async def on_alarm_created(self, alarm_event, device, rule, db):
"""逆变器严重告警 → 自动创建维修工单"""
if not device or not alarm_event:
return
# 只对严重告警且是逆变器设备创建工单
if alarm_event.severity != "critical":
return
if device.device_type not in ("sungrow_inverter", "dc_combiner"):
return
from app.models.maintenance import RepairOrder
# 检查是否已有关联工单
from sqlalchemy import select, and_
existing = await db.execute(
select(RepairOrder).where(
and_(
RepairOrder.alarm_event_id == alarm_event.id,
RepairOrder.status.in_(["open", "assigned", "in_progress"]),
)
)
)
if existing.scalar_one_or_none():
return # 已有工单,不重复创建
# 生成工单编号
now = datetime.now(timezone.utc)
order_no = f"WO-ZP-{now.strftime('%Y%m%d%H%M')}-{alarm_event.id}"
order = RepairOrder(
title=f"[自动] {alarm_event.title} - {device.name}",
order_no=order_no,
type="emergency",
priority="high",
device_id=device.id,
alarm_event_id=alarm_event.id,
description=(
f"由告警系统自动创建\n"
f"设备: {device.name} ({device.code})\n"
f"告警: {alarm_event.description}\n"
f"当前值: {alarm_event.value},阈值: {alarm_event.threshold}"
),
status="open",
created_by=1, # system user
)
db.add(order)
logger.info(f"Z-Park: Auto-created repair order {order_no} for alarm #{alarm_event.id}")
async def on_alarm_resolved(self, alarm_event, device, db):
"""告警恢复时,记录日志(工单由运维人员手动关闭)"""
if device and device.device_type in ("sungrow_inverter", "dc_combiner"):
logger.info(
f"Z-Park: Alarm resolved for {device.name} - "
f"maintenance team should verify and close related work orders"
)
async def on_device_status_changed(self, device, old_status, new_status, db):
"""设备离线时记录日志(可扩展为微信/短信通知)"""
if new_status == "offline" and device.device_type == "sungrow_inverter":
logger.warning(
f"Z-Park: Inverter {device.name} ({device.code}) went offline! "
f"Check Sungrow iSolarCloud connection."
)
async def calculate_custom_kpis(self, period, db):
"""Z-Park自定义KPI光伏相关指标"""
from sqlalchemy import select, func
from app.models.energy import EnergyDailySummary
from app.models.device import Device
# 计算光伏设备的总发电量
# 简化示例 — 实际应按period过滤日期
result = await db.execute(
select(
func.sum(EnergyDailySummary.total_generation),
func.sum(EnergyDailySummary.total_consumption),
)
)
row = result.first()
generation = row[0] or 0
consumption = row[1] or 0
self_use_rate = (generation / consumption * 100) if consumption > 0 else 0
return {
"total_pv_generation_kwh": round(generation, 1),
"total_consumption_kwh": round(consumption, 1),
"self_sufficiency_rate": round(self_use_rate, 1),
"pv_panel_count": 4561, # Z-Park has 4,561 panels
"inverter_count": 10,
}