Compare commits

...

3 Commits

Author SHA1 Message Date
Du Wenbo
ed30ac31e4 feat: add Z-Park customer hooks + update core to v1.1.0
- core/ updated to ems-core v1.1.0 (hooks plugin system)
- customers/zpark/hooks/zpark_hooks.py: Z-Park custom logic
  - on_alarm_created: 逆变器严重告警 → 自动创建维修工单
  - on_alarm_resolved: 告警恢复日志
  - on_device_status_changed: 逆变器离线告警
  - calculate_custom_kpis: 光伏发电效率、自发自用率

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 18:32:33 +08:00
Du Wenbo
67cf2b13da chore: update core to v1.1.0 (hooks plugin system) 2026-04-04 18:32:11 +08:00
Du Wenbo
e306b705bc Squashed 'core/' changes from 92ec910..2b9797d
2b9797d feat: add customer hooks plugin system (v1.1.0)
26d2731 chore: add VERSION file (1.0.0)

git-subtree-dir: core
git-subtree-split: 2b9797d61b501ecbaa73253f6f4001769917a24f
2026-04-04 18:32:11 +08:00
8 changed files with 381 additions and 0 deletions

1
core/VERSION Normal file
View File

@@ -0,0 +1 @@
1.1.0

View File

@@ -0,0 +1,3 @@
from app.hooks.loader import get_hooks
__all__ = ["get_hooks"]

View File

@@ -0,0 +1,142 @@
"""客户钩子基类 — 定义所有可扩展的业务事件钩子
每个客户项目可以继承 CustomerHooks 并重写需要的方法。
核心代码在关键业务节点调用这些钩子,客户无需修改核心代码即可注入自定义逻辑。
使用方式:
1. 在客户项目中创建 customers/{customer}/hooks/__init__.py
2. 继承 CustomerHooks重写需要的方法
3. 导出 hooks = YourCustomerHooks() 实例
"""
import logging
from typing import Any, Optional
logger = logging.getLogger("hooks")
class CustomerHooks:
"""客户钩子基类 — 所有方法默认为空操作,客户按需重写。"""
# =====================================================================
# 告警相关钩子
# =====================================================================
async def on_alarm_created(self, alarm_event, device, rule, db) -> None:
"""告警创建后触发。可用于:自动创建工单、发送微信通知、触发联动控制等。
Args:
alarm_event: AlarmEvent 实例已持久化有id
device: Device 实例(触发告警的设备)
rule: AlarmRule 实例(匹配的告警规则)
db: AsyncSession 数据库会话
"""
pass
async def on_alarm_resolved(self, alarm_event, device, db) -> None:
"""告警恢复后触发。可用于:关闭关联工单、发送恢复通知等。
Args:
alarm_event: AlarmEvent 实例已更新为resolved状态
device: Device 实例
db: AsyncSession
"""
pass
# =====================================================================
# 能耗数据钩子
# =====================================================================
async def on_energy_data_received(self, device, data_point, db) -> None:
"""新能耗数据入库后触发。可用于:自定义数据校验、派生计算、异常检测等。
Args:
device: Device 实例
data_point: EnergyData 实例
db: AsyncSession
"""
pass
# =====================================================================
# 设备状态钩子
# =====================================================================
async def on_device_status_changed(self, device, old_status: str, new_status: str, db) -> None:
"""设备状态变更后触发。可用于:离线告警、自动重启、通知运维人员等。
Args:
device: Device 实例
old_status: 旧状态 (online/offline/alarm/maintenance)
new_status: 新状态
db: AsyncSession
"""
pass
# =====================================================================
# 定额钩子
# =====================================================================
async def on_quota_exceeded(self, quota, usage, db) -> None:
"""定额超限后触发。可用于:邮件通知楼宇负责人、限电策略、上报管理层等。
Args:
quota: EnergyQuota 实例
usage: QuotaUsage 实例(包含使用率)
db: AsyncSession
"""
pass
# =====================================================================
# 工单钩子
# =====================================================================
async def on_work_order_created(self, order, db) -> None:
"""维修工单创建后触发。可用于:通知维修人员、同步到外部系统等。"""
pass
async def on_work_order_completed(self, order, db) -> None:
"""维修工单完成后触发。可用于:生成维修报告、更新设备状态等。"""
pass
# =====================================================================
# 巡检钩子
# =====================================================================
async def on_inspection_completed(self, record, db) -> None:
"""巡检完成后触发。可用于:生成合规报告、发现问题自动创建工单等。"""
pass
# =====================================================================
# 报表钩子
# =====================================================================
async def on_report_generated(self, report_task, file_path: str, db) -> None:
"""报表生成后触发。可用于自动邮件发送、上传到FTP/OSS、自定义格式转换等。"""
pass
# =====================================================================
# 自定义KPI钩子
# =====================================================================
async def calculate_custom_kpis(self, period: str, db) -> dict:
"""Dashboard加载时调用返回客户自定义KPI。
Args:
period: 统计周期 ("today", "month", "year")
db: AsyncSession
Returns:
dict: 自定义KPI字典{"solar_efficiency": 0.85, "grid_independence": 0.72}
"""
return {}
# =====================================================================
# 充电桩钩子
# =====================================================================
async def on_charging_order_created(self, order, db) -> None:
"""充电订单创建时触发。可用于:自定义计费规则、用户验证等。"""
pass
async def on_charging_order_completed(self, order, db) -> None:
"""充电订单完成时触发。可用于:结算通知、积分奖励等。"""
pass

View File

@@ -0,0 +1,92 @@
"""客户钩子加载器 — 从 customers/{CUSTOMER}/hooks/ 动态加载客户自定义钩子
加载逻辑:
1. 读取 CUSTOMER 环境变量
2. 查找 customers/{CUSTOMER}/hooks/__init__.py
3. 导入并返回 hooks 实例
4. 找不到则返回默认空钩子(所有方法为空操作)
"""
import importlib
import importlib.util
import logging
import os
import sys
from functools import lru_cache
from typing import Optional
from app.hooks.base import CustomerHooks
from app.core.config import get_settings
logger = logging.getLogger("hooks.loader")
_hooks_instance: Optional[CustomerHooks] = None
def _find_hooks_dir() -> Optional[str]:
"""Find the customer hooks directory, searching multiple locations."""
settings = get_settings()
customer = settings.CUSTOMER
config_path = settings.customer_config_path
hooks_dir = os.path.join(config_path, "hooks")
if os.path.isdir(hooks_dir) and os.path.exists(os.path.join(hooks_dir, "__init__.py")):
return hooks_dir
return None
def _load_hooks_from_dir(hooks_dir: str) -> Optional[CustomerHooks]:
"""Load hooks module from a directory path."""
try:
init_path = os.path.join(hooks_dir, "__init__.py")
spec = importlib.util.spec_from_file_location("customer_hooks", init_path)
if spec and spec.loader:
# Add parent dir to sys.path so relative imports work
parent_dir = os.path.dirname(os.path.dirname(hooks_dir))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
hooks = getattr(module, "hooks", None)
if isinstance(hooks, CustomerHooks):
return hooks
else:
logger.warning(
f"Customer hooks module loaded but 'hooks' attribute is not a CustomerHooks instance"
)
except Exception as e:
logger.error(f"Failed to load customer hooks: {e}", exc_info=True)
return None
def get_hooks() -> CustomerHooks:
"""获取当前客户的钩子实例。线程安全,全局单例。
Returns:
CustomerHooks: 客户钩子实例,如果客户没有自定义钩子则返回默认空钩子
"""
global _hooks_instance
if _hooks_instance is not None:
return _hooks_instance
settings = get_settings()
hooks_dir = _find_hooks_dir()
if hooks_dir:
loaded = _load_hooks_from_dir(hooks_dir)
if loaded:
logger.info(f"Loaded customer hooks for '{settings.CUSTOMER}' from {hooks_dir}")
_hooks_instance = loaded
return _hooks_instance
logger.info(f"No custom hooks for '{settings.CUSTOMER}', using defaults")
_hooks_instance = CustomerHooks()
return _hooks_instance
def reload_hooks() -> CustomerHooks:
"""重新加载客户钩子(开发时热重载用)"""
global _hooks_instance
_hooks_instance = None
return get_hooks()

View File

@@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.models.alarm import AlarmRule, AlarmEvent
from app.models.energy import EnergyData
from app.models.device import Device
from app.hooks import get_hooks
logger = logging.getLogger("alarm_checker")
@@ -233,6 +234,16 @@ async def check_alarms(session: AsyncSession):
triggered_at=now,
)
session.add(event)
await session.flush() # Ensure event has id
# Customer hook: on_alarm_created
try:
_dev = await session.execute(select(Device).where(Device.id == device_id))
_device = _dev.scalar_one_or_none()
await get_hooks().on_alarm_created(event, _device, rule, session)
except Exception as _he:
logger.error(f"Hook on_alarm_created error: {_he}")
logger.info(
f"Alarm triggered: {rule.name} | device={device_id} | "
f"value={dp.value} threshold={threshold_str}"
@@ -246,6 +257,14 @@ async def check_alarms(session: AsyncSession):
active_event.status = "resolved"
active_event.resolved_at = now
active_event.resolve_note = "自动恢复"
# Customer hook: on_alarm_resolved
try:
_dev2 = await session.execute(select(Device).where(Device.id == device_id))
_device2 = _dev2.scalar_one_or_none()
await get_hooks().on_alarm_resolved(active_event, _device2, session)
except Exception as _he2:
logger.error(f"Hook on_alarm_resolved error: {_he2}")
logger.info(
f"Alarm auto-resolved: {rule.name} | device={device_id}"
)

View File

@@ -6,6 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.models.quota import EnergyQuota, QuotaUsage
from app.models.alarm import AlarmEvent
from app.models.energy import EnergyDailySummary
from app.hooks import get_hooks
logger = logging.getLogger("quota_checker")
@@ -34,6 +35,7 @@ async def check_quotas(session: AsyncSession):
)
quotas = result.scalars().all()
hooks = get_hooks()
for quota in quotas:
period_start, period_end = _get_period_range(quota.period, now)
@@ -121,4 +123,10 @@ async def check_quotas(session: AsyncSession):
f"quota={quota.quota_value:.1f} rate={usage_rate_pct:.1f}%"
)
# Customer hook: on_quota_exceeded
try:
await hooks.on_quota_exceeded(quota, usage_record, session)
except Exception as _he:
logger.error(f"Hook on_quota_exceeded error: {_he}")
await session.flush()

View File

@@ -0,0 +1,4 @@
"""中关村医疗器械园 — 客户自定义钩子"""
from customers.zpark.hooks.zpark_hooks import ZParkHooks
hooks = ZParkHooks()

View File

@@ -0,0 +1,112 @@
"""Z-Park 客户钩子实现
业务逻辑:
1. 逆变器严重告警 → 自动创建维修工单
2. 设备离线 → 发送通知
3. 自定义KPI → 光伏发电效率、自发自用率
"""
import logging
from datetime import datetime, timezone
logger = logging.getLogger("hooks.zpark")
# Import path works because loader adds customer project root to sys.path
from app.hooks.base import CustomerHooks
class ZParkHooks(CustomerHooks):
"""中关村医疗器械园自定义钩子"""
async def on_alarm_created(self, alarm_event, device, rule, db):
"""逆变器严重告警 → 自动创建维修工单"""
if not device or not alarm_event:
return
# 只对严重告警且是逆变器设备创建工单
if alarm_event.severity != "critical":
return
if device.device_type not in ("sungrow_inverter", "dc_combiner"):
return
from app.models.maintenance import RepairOrder
# 检查是否已有关联工单
from sqlalchemy import select, and_
existing = await db.execute(
select(RepairOrder).where(
and_(
RepairOrder.alarm_event_id == alarm_event.id,
RepairOrder.status.in_(["open", "assigned", "in_progress"]),
)
)
)
if existing.scalar_one_or_none():
return # 已有工单,不重复创建
# 生成工单编号
now = datetime.now(timezone.utc)
order_no = f"WO-ZP-{now.strftime('%Y%m%d%H%M')}-{alarm_event.id}"
order = RepairOrder(
title=f"[自动] {alarm_event.title} - {device.name}",
order_no=order_no,
type="emergency",
priority="high",
device_id=device.id,
alarm_event_id=alarm_event.id,
description=(
f"由告警系统自动创建\n"
f"设备: {device.name} ({device.code})\n"
f"告警: {alarm_event.description}\n"
f"当前值: {alarm_event.value},阈值: {alarm_event.threshold}"
),
status="open",
created_by=1, # system user
)
db.add(order)
logger.info(f"Z-Park: Auto-created repair order {order_no} for alarm #{alarm_event.id}")
async def on_alarm_resolved(self, alarm_event, device, db):
"""告警恢复时,记录日志(工单由运维人员手动关闭)"""
if device and device.device_type in ("sungrow_inverter", "dc_combiner"):
logger.info(
f"Z-Park: Alarm resolved for {device.name} - "
f"maintenance team should verify and close related work orders"
)
async def on_device_status_changed(self, device, old_status, new_status, db):
"""设备离线时记录日志(可扩展为微信/短信通知)"""
if new_status == "offline" and device.device_type == "sungrow_inverter":
logger.warning(
f"Z-Park: Inverter {device.name} ({device.code}) went offline! "
f"Check Sungrow iSolarCloud connection."
)
async def calculate_custom_kpis(self, period, db):
"""Z-Park自定义KPI光伏相关指标"""
from sqlalchemy import select, func
from app.models.energy import EnergyDailySummary
from app.models.device import Device
# 计算光伏设备的总发电量
# 简化示例 — 实际应按period过滤日期
result = await db.execute(
select(
func.sum(EnergyDailySummary.total_generation),
func.sum(EnergyDailySummary.total_consumption),
)
)
row = result.first()
generation = row[0] or 0
consumption = row[1] or 0
self_use_rate = (generation / consumption * 100) if consumption > 0 else 0
return {
"total_pv_generation_kwh": round(generation, 1),
"total_consumption_kwh": round(consumption, 1),
"self_sufficiency_rate": round(self_use_rate, 1),
"pv_panel_count": 4561, # Z-Park has 4,561 panels
"inverter_count": 10,
}