Files
tianpu-ems/backend/app/collectors/manager.py
Du Wenbo 02c4698b59 feat: multi-customer architecture + Z-Park support + Gitea migration scripts
Multi-customer config system:
- CUSTOMER env var selects customer (tianpu/zpark)
- customers/tianpu/config.yaml — Tianpu branding, collectors, features
- customers/zpark/config.yaml — Z-Park branding, Sungrow collector
- GET /api/v1/branding endpoint for customer-specific branding
- main.py loads customer config for app title, CORS, logging
- Collector manager filters by customer's enabled collectors

Z-Park (中关村医疗器械园) support:
- Sungrow iSolarCloud API collector (sungrow_collector.py)
- Z-Park device definitions (10 inverters, 8 combiner boxes, 22+ buildings)
- Z-Park TOU pricing config (Beijing 2026 rates)
- Z-Park seed script (seed_zpark.py)

Gitea migration scripts (Mac Studio → labmac3):
- 5 migration scripts + README in scripts/gitea-migration/
- Creates 3-repo structure: ems-core, tp-ems, zpark-ems

Version: v1.0.0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 16:23:33 +08:00

155 lines
5.4 KiB
Python

"""Collector Manager - orchestrates all device collectors."""
import logging
from typing import Optional
from sqlalchemy import select
from app.core.config import get_settings
from app.core.database import async_session
from app.models.device import Device
from app.collectors.base import BaseCollector
from app.collectors.modbus_tcp import ModbusTcpCollector
from app.collectors.mqtt_collector import MqttCollector
from app.collectors.http_collector import HttpCollector
logger = logging.getLogger("collector.manager")
# Full registry mapping protocol names to collector classes
COLLECTOR_REGISTRY: dict[str, type[BaseCollector]] = {
"modbus_tcp": ModbusTcpCollector,
"mqtt": MqttCollector,
"http_api": HttpCollector,
}
def get_enabled_collectors() -> dict[str, type[BaseCollector]]:
"""Return collector registry filtered by customer config.
If the customer config specifies a 'collectors' list, only those
protocols are enabled. Otherwise fall back to the full registry.
"""
settings = get_settings()
customer_config = settings.load_customer_config()
enabled_list = customer_config.get("collectors")
if enabled_list is None:
return COLLECTOR_REGISTRY
enabled = {}
for name in enabled_list:
if name in COLLECTOR_REGISTRY:
enabled[name] = COLLECTOR_REGISTRY[name]
else:
logger.warning("Customer config references unknown collector '%s', skipping", name)
return enabled
class CollectorManager:
"""Manages lifecycle of all device collectors."""
def __init__(self):
self._collectors: dict[int, BaseCollector] = {} # device_id -> collector
self._running = False
async def start(self):
"""Load active devices from DB and start their collectors."""
self._running = True
await self._load_and_start_collectors()
logger.info("CollectorManager started with %d collectors", len(self._collectors))
async def stop(self):
"""Stop all collectors."""
self._running = False
for device_id, collector in self._collectors.items():
try:
await collector.stop()
except Exception as e:
logger.error("Error stopping collector for device %d: %s", device_id, e)
self._collectors.clear()
logger.info("CollectorManager stopped")
async def _load_and_start_collectors(self):
"""Load active devices with supported protocols and start collectors."""
enabled = get_enabled_collectors()
logger.info("Enabled collectors: %s", list(enabled.keys()))
async with async_session() as session:
result = await session.execute(
select(Device).where(
Device.is_active == True,
Device.protocol.in_(list(enabled.keys())),
)
)
devices = result.scalars().all()
for device in devices:
await self.start_collector(
device.id,
device.code,
device.protocol,
device.connection_params or {},
device.collect_interval or 15,
)
async def start_collector(
self,
device_id: int,
device_code: str,
protocol: str,
connection_params: dict,
collect_interval: int,
) -> bool:
"""Start a single collector for a device."""
if device_id in self._collectors:
logger.warning("Collector already running for device %d", device_id)
return False
collector_cls = COLLECTOR_REGISTRY.get(protocol)
if not collector_cls:
logger.warning("No collector for protocol '%s' (device %s)", protocol, device_code)
return False
collector = collector_cls(device_id, device_code, connection_params, collect_interval)
self._collectors[device_id] = collector
await collector.start()
logger.info("Started %s collector for %s", protocol, device_code)
return True
async def stop_collector(self, device_id: int) -> bool:
"""Stop collector for a specific device."""
collector = self._collectors.pop(device_id, None)
if not collector:
return False
await collector.stop()
return True
async def restart_collector(self, device_id: int) -> bool:
"""Restart collector for a device by reloading its config from DB."""
await self.stop_collector(device_id)
async with async_session() as session:
result = await session.execute(
select(Device).where(Device.id == device_id)
)
device = result.scalar_one_or_none()
if not device or not device.is_active:
return False
return await self.start_collector(
device.id,
device.code,
device.protocol,
device.connection_params or {},
device.collect_interval or 15,
)
def get_collector(self, device_id: int) -> Optional[BaseCollector]:
return self._collectors.get(device_id)
def get_all_status(self) -> list[dict]:
"""Return status of all collectors."""
return [c.get_status() for c in self._collectors.values()]
@property
def collector_count(self) -> int:
return len(self._collectors)
@property
def is_running(self) -> bool:
return self._running