From e306b705bc718934a4000f148f15a6a1a43c68f1 Mon Sep 17 00:00:00 2001 From: Du Wenbo Date: Sat, 4 Apr 2026 18:32:11 +0800 Subject: [PATCH] Squashed 'core/' changes from 92ec910..2b9797d 2b9797d feat: add customer hooks plugin system (v1.1.0) 26d2731 chore: add VERSION file (1.0.0) git-subtree-dir: core git-subtree-split: 2b9797d61b501ecbaa73253f6f4001769917a24f --- VERSION | 1 + backend/app/hooks/__init__.py | 3 + backend/app/hooks/base.py | 142 ++++++++++++++++++++++++++ backend/app/hooks/loader.py | 92 +++++++++++++++++ backend/app/services/alarm_checker.py | 19 ++++ backend/app/services/quota_checker.py | 8 ++ 6 files changed, 265 insertions(+) create mode 100644 VERSION create mode 100644 backend/app/hooks/__init__.py create mode 100644 backend/app/hooks/base.py create mode 100644 backend/app/hooks/loader.py diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..9084fa2 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.1.0 diff --git a/backend/app/hooks/__init__.py b/backend/app/hooks/__init__.py new file mode 100644 index 0000000..ca239e8 --- /dev/null +++ b/backend/app/hooks/__init__.py @@ -0,0 +1,3 @@ +from app.hooks.loader import get_hooks + +__all__ = ["get_hooks"] diff --git a/backend/app/hooks/base.py b/backend/app/hooks/base.py new file mode 100644 index 0000000..dc90b13 --- /dev/null +++ b/backend/app/hooks/base.py @@ -0,0 +1,142 @@ +"""客户钩子基类 — 定义所有可扩展的业务事件钩子 + +每个客户项目可以继承 CustomerHooks 并重写需要的方法。 +核心代码在关键业务节点调用这些钩子,客户无需修改核心代码即可注入自定义逻辑。 + +使用方式: +1. 在客户项目中创建 customers/{customer}/hooks/__init__.py +2. 继承 CustomerHooks,重写需要的方法 +3. 导出 hooks = YourCustomerHooks() 实例 +""" +import logging +from typing import Any, Optional + +logger = logging.getLogger("hooks") + + +class CustomerHooks: + """客户钩子基类 — 所有方法默认为空操作,客户按需重写。""" + + # ===================================================================== + # 告警相关钩子 + # ===================================================================== + + async def on_alarm_created(self, alarm_event, device, rule, db) -> None: + """告警创建后触发。可用于:自动创建工单、发送微信通知、触发联动控制等。 + + Args: + alarm_event: AlarmEvent 实例(已持久化,有id) + device: Device 实例(触发告警的设备) + rule: AlarmRule 实例(匹配的告警规则) + db: AsyncSession 数据库会话 + """ + pass + + async def on_alarm_resolved(self, alarm_event, device, db) -> None: + """告警恢复后触发。可用于:关闭关联工单、发送恢复通知等。 + + Args: + alarm_event: AlarmEvent 实例(已更新为resolved状态) + device: Device 实例 + db: AsyncSession + """ + pass + + # ===================================================================== + # 能耗数据钩子 + # ===================================================================== + + async def on_energy_data_received(self, device, data_point, db) -> None: + """新能耗数据入库后触发。可用于:自定义数据校验、派生计算、异常检测等。 + + Args: + device: Device 实例 + data_point: EnergyData 实例 + db: AsyncSession + """ + pass + + # ===================================================================== + # 设备状态钩子 + # ===================================================================== + + async def on_device_status_changed(self, device, old_status: str, new_status: str, db) -> None: + """设备状态变更后触发。可用于:离线告警、自动重启、通知运维人员等。 + + Args: + device: Device 实例 + old_status: 旧状态 (online/offline/alarm/maintenance) + new_status: 新状态 + db: AsyncSession + """ + pass + + # ===================================================================== + # 定额钩子 + # ===================================================================== + + async def on_quota_exceeded(self, quota, usage, db) -> None: + """定额超限后触发。可用于:邮件通知楼宇负责人、限电策略、上报管理层等。 + + Args: + quota: EnergyQuota 实例 + usage: QuotaUsage 实例(包含使用率) + db: AsyncSession + """ + pass + + # ===================================================================== + # 工单钩子 + # ===================================================================== + + async def on_work_order_created(self, order, db) -> None: + """维修工单创建后触发。可用于:通知维修人员、同步到外部系统等。""" + pass + + async def on_work_order_completed(self, order, db) -> None: + """维修工单完成后触发。可用于:生成维修报告、更新设备状态等。""" + pass + + # ===================================================================== + # 巡检钩子 + # ===================================================================== + + async def on_inspection_completed(self, record, db) -> None: + """巡检完成后触发。可用于:生成合规报告、发现问题自动创建工单等。""" + pass + + # ===================================================================== + # 报表钩子 + # ===================================================================== + + async def on_report_generated(self, report_task, file_path: str, db) -> None: + """报表生成后触发。可用于:自动邮件发送、上传到FTP/OSS、自定义格式转换等。""" + pass + + # ===================================================================== + # 自定义KPI钩子 + # ===================================================================== + + async def calculate_custom_kpis(self, period: str, db) -> dict: + """Dashboard加载时调用,返回客户自定义KPI。 + + Args: + period: 统计周期 ("today", "month", "year") + db: AsyncSession + + Returns: + dict: 自定义KPI字典,如 {"solar_efficiency": 0.85, "grid_independence": 0.72} + """ + return {} + + # ===================================================================== + # 充电桩钩子 + # ===================================================================== + + async def on_charging_order_created(self, order, db) -> None: + """充电订单创建时触发。可用于:自定义计费规则、用户验证等。""" + pass + + async def on_charging_order_completed(self, order, db) -> None: + """充电订单完成时触发。可用于:结算通知、积分奖励等。""" + pass diff --git a/backend/app/hooks/loader.py b/backend/app/hooks/loader.py new file mode 100644 index 0000000..d33907c --- /dev/null +++ b/backend/app/hooks/loader.py @@ -0,0 +1,92 @@ +"""客户钩子加载器 — 从 customers/{CUSTOMER}/hooks/ 动态加载客户自定义钩子 + +加载逻辑: +1. 读取 CUSTOMER 环境变量 +2. 查找 customers/{CUSTOMER}/hooks/__init__.py +3. 导入并返回 hooks 实例 +4. 找不到则返回默认空钩子(所有方法为空操作) +""" +import importlib +import importlib.util +import logging +import os +import sys +from functools import lru_cache +from typing import Optional + +from app.hooks.base import CustomerHooks +from app.core.config import get_settings + +logger = logging.getLogger("hooks.loader") + +_hooks_instance: Optional[CustomerHooks] = None + + +def _find_hooks_dir() -> Optional[str]: + """Find the customer hooks directory, searching multiple locations.""" + settings = get_settings() + customer = settings.CUSTOMER + config_path = settings.customer_config_path + + hooks_dir = os.path.join(config_path, "hooks") + if os.path.isdir(hooks_dir) and os.path.exists(os.path.join(hooks_dir, "__init__.py")): + return hooks_dir + return None + + +def _load_hooks_from_dir(hooks_dir: str) -> Optional[CustomerHooks]: + """Load hooks module from a directory path.""" + try: + init_path = os.path.join(hooks_dir, "__init__.py") + spec = importlib.util.spec_from_file_location("customer_hooks", init_path) + if spec and spec.loader: + # Add parent dir to sys.path so relative imports work + parent_dir = os.path.dirname(os.path.dirname(hooks_dir)) + if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + hooks = getattr(module, "hooks", None) + if isinstance(hooks, CustomerHooks): + return hooks + else: + logger.warning( + f"Customer hooks module loaded but 'hooks' attribute is not a CustomerHooks instance" + ) + except Exception as e: + logger.error(f"Failed to load customer hooks: {e}", exc_info=True) + return None + + +def get_hooks() -> CustomerHooks: + """获取当前客户的钩子实例。线程安全,全局单例。 + + Returns: + CustomerHooks: 客户钩子实例,如果客户没有自定义钩子则返回默认空钩子 + """ + global _hooks_instance + if _hooks_instance is not None: + return _hooks_instance + + settings = get_settings() + hooks_dir = _find_hooks_dir() + + if hooks_dir: + loaded = _load_hooks_from_dir(hooks_dir) + if loaded: + logger.info(f"Loaded customer hooks for '{settings.CUSTOMER}' from {hooks_dir}") + _hooks_instance = loaded + return _hooks_instance + + logger.info(f"No custom hooks for '{settings.CUSTOMER}', using defaults") + _hooks_instance = CustomerHooks() + return _hooks_instance + + +def reload_hooks() -> CustomerHooks: + """重新加载客户钩子(开发时热重载用)""" + global _hooks_instance + _hooks_instance = None + return get_hooks() diff --git a/backend/app/services/alarm_checker.py b/backend/app/services/alarm_checker.py index fdb7678..b66b3f7 100644 --- a/backend/app/services/alarm_checker.py +++ b/backend/app/services/alarm_checker.py @@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.models.alarm import AlarmRule, AlarmEvent from app.models.energy import EnergyData from app.models.device import Device +from app.hooks import get_hooks logger = logging.getLogger("alarm_checker") @@ -233,6 +234,16 @@ async def check_alarms(session: AsyncSession): triggered_at=now, ) session.add(event) + await session.flush() # Ensure event has id + + # Customer hook: on_alarm_created + try: + _dev = await session.execute(select(Device).where(Device.id == device_id)) + _device = _dev.scalar_one_or_none() + await get_hooks().on_alarm_created(event, _device, rule, session) + except Exception as _he: + logger.error(f"Hook on_alarm_created error: {_he}") + logger.info( f"Alarm triggered: {rule.name} | device={device_id} | " f"value={dp.value} threshold={threshold_str}" @@ -246,6 +257,14 @@ async def check_alarms(session: AsyncSession): active_event.status = "resolved" active_event.resolved_at = now active_event.resolve_note = "自动恢复" + + # Customer hook: on_alarm_resolved + try: + _dev2 = await session.execute(select(Device).where(Device.id == device_id)) + _device2 = _dev2.scalar_one_or_none() + await get_hooks().on_alarm_resolved(active_event, _device2, session) + except Exception as _he2: + logger.error(f"Hook on_alarm_resolved error: {_he2}") logger.info( f"Alarm auto-resolved: {rule.name} | device={device_id}" ) diff --git a/backend/app/services/quota_checker.py b/backend/app/services/quota_checker.py index 2ff287e..06e2259 100644 --- a/backend/app/services/quota_checker.py +++ b/backend/app/services/quota_checker.py @@ -6,6 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.models.quota import EnergyQuota, QuotaUsage from app.models.alarm import AlarmEvent from app.models.energy import EnergyDailySummary +from app.hooks import get_hooks logger = logging.getLogger("quota_checker") @@ -34,6 +35,7 @@ async def check_quotas(session: AsyncSession): ) quotas = result.scalars().all() + hooks = get_hooks() for quota in quotas: period_start, period_end = _get_period_range(quota.period, now) @@ -121,4 +123,10 @@ async def check_quotas(session: AsyncSession): f"quota={quota.quota_value:.1f} rate={usage_rate_pct:.1f}%" ) + # Customer hook: on_quota_exceeded + try: + await hooks.on_quota_exceeded(quota, usage_record, session) + except Exception as _he: + logger.error(f"Hook on_quota_exceeded error: {_he}") + await session.flush()