Merge commit '026c837b919ab4380e8a6e6c052364bbf9bbe8a3' as 'core'
This commit is contained in:
154
core/backend/app/collectors/manager.py
Normal file
154
core/backend/app/collectors/manager.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""Collector Manager - orchestrates all device collectors."""
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.core.config import get_settings
|
||||
from app.core.database import async_session
|
||||
from app.models.device import Device
|
||||
from app.collectors.base import BaseCollector
|
||||
from app.collectors.modbus_tcp import ModbusTcpCollector
|
||||
from app.collectors.mqtt_collector import MqttCollector
|
||||
from app.collectors.http_collector import HttpCollector
|
||||
|
||||
logger = logging.getLogger("collector.manager")
|
||||
|
||||
# Full registry mapping protocol names to collector classes
|
||||
COLLECTOR_REGISTRY: dict[str, type[BaseCollector]] = {
|
||||
"modbus_tcp": ModbusTcpCollector,
|
||||
"mqtt": MqttCollector,
|
||||
"http_api": HttpCollector,
|
||||
}
|
||||
|
||||
|
||||
def get_enabled_collectors() -> dict[str, type[BaseCollector]]:
|
||||
"""Return collector registry filtered by customer config.
|
||||
|
||||
If the customer config specifies a 'collectors' list, only those
|
||||
protocols are enabled. Otherwise fall back to the full registry.
|
||||
"""
|
||||
settings = get_settings()
|
||||
customer_config = settings.load_customer_config()
|
||||
enabled_list = customer_config.get("collectors")
|
||||
if enabled_list is None:
|
||||
return COLLECTOR_REGISTRY
|
||||
enabled = {}
|
||||
for name in enabled_list:
|
||||
if name in COLLECTOR_REGISTRY:
|
||||
enabled[name] = COLLECTOR_REGISTRY[name]
|
||||
else:
|
||||
logger.warning("Customer config references unknown collector '%s', skipping", name)
|
||||
return enabled
|
||||
|
||||
|
||||
class CollectorManager:
|
||||
"""Manages lifecycle of all device collectors."""
|
||||
|
||||
def __init__(self):
|
||||
self._collectors: dict[int, BaseCollector] = {} # device_id -> collector
|
||||
self._running = False
|
||||
|
||||
async def start(self):
|
||||
"""Load active devices from DB and start their collectors."""
|
||||
self._running = True
|
||||
await self._load_and_start_collectors()
|
||||
logger.info("CollectorManager started with %d collectors", len(self._collectors))
|
||||
|
||||
async def stop(self):
|
||||
"""Stop all collectors."""
|
||||
self._running = False
|
||||
for device_id, collector in self._collectors.items():
|
||||
try:
|
||||
await collector.stop()
|
||||
except Exception as e:
|
||||
logger.error("Error stopping collector for device %d: %s", device_id, e)
|
||||
self._collectors.clear()
|
||||
logger.info("CollectorManager stopped")
|
||||
|
||||
async def _load_and_start_collectors(self):
|
||||
"""Load active devices with supported protocols and start collectors."""
|
||||
enabled = get_enabled_collectors()
|
||||
logger.info("Enabled collectors: %s", list(enabled.keys()))
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(Device).where(
|
||||
Device.is_active == True,
|
||||
Device.protocol.in_(list(enabled.keys())),
|
||||
)
|
||||
)
|
||||
devices = result.scalars().all()
|
||||
|
||||
for device in devices:
|
||||
await self.start_collector(
|
||||
device.id,
|
||||
device.code,
|
||||
device.protocol,
|
||||
device.connection_params or {},
|
||||
device.collect_interval or 15,
|
||||
)
|
||||
|
||||
async def start_collector(
|
||||
self,
|
||||
device_id: int,
|
||||
device_code: str,
|
||||
protocol: str,
|
||||
connection_params: dict,
|
||||
collect_interval: int,
|
||||
) -> bool:
|
||||
"""Start a single collector for a device."""
|
||||
if device_id in self._collectors:
|
||||
logger.warning("Collector already running for device %d", device_id)
|
||||
return False
|
||||
|
||||
collector_cls = COLLECTOR_REGISTRY.get(protocol)
|
||||
if not collector_cls:
|
||||
logger.warning("No collector for protocol '%s' (device %s)", protocol, device_code)
|
||||
return False
|
||||
|
||||
collector = collector_cls(device_id, device_code, connection_params, collect_interval)
|
||||
self._collectors[device_id] = collector
|
||||
await collector.start()
|
||||
logger.info("Started %s collector for %s", protocol, device_code)
|
||||
return True
|
||||
|
||||
async def stop_collector(self, device_id: int) -> bool:
|
||||
"""Stop collector for a specific device."""
|
||||
collector = self._collectors.pop(device_id, None)
|
||||
if not collector:
|
||||
return False
|
||||
await collector.stop()
|
||||
return True
|
||||
|
||||
async def restart_collector(self, device_id: int) -> bool:
|
||||
"""Restart collector for a device by reloading its config from DB."""
|
||||
await self.stop_collector(device_id)
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(Device).where(Device.id == device_id)
|
||||
)
|
||||
device = result.scalar_one_or_none()
|
||||
if not device or not device.is_active:
|
||||
return False
|
||||
return await self.start_collector(
|
||||
device.id,
|
||||
device.code,
|
||||
device.protocol,
|
||||
device.connection_params or {},
|
||||
device.collect_interval or 15,
|
||||
)
|
||||
|
||||
def get_collector(self, device_id: int) -> Optional[BaseCollector]:
|
||||
return self._collectors.get(device_id)
|
||||
|
||||
def get_all_status(self) -> list[dict]:
|
||||
"""Return status of all collectors."""
|
||||
return [c.get_status() for c in self._collectors.values()]
|
||||
|
||||
@property
|
||||
def collector_count(self) -> int:
|
||||
return len(self._collectors)
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
Reference in New Issue
Block a user