Files
tianpu-ems/backend/app/collectors/manager.py

132 lines
4.5 KiB
Python
Raw Normal View History

"""Collector Manager - orchestrates all device collectors."""
import logging
from typing import Optional
from sqlalchemy import select
from app.core.database import async_session
from app.models.device import Device
from app.collectors.base import BaseCollector
from app.collectors.modbus_tcp import ModbusTcpCollector
from app.collectors.mqtt_collector import MqttCollector
from app.collectors.http_collector import HttpCollector
logger = logging.getLogger("collector.manager")
# Registry mapping protocol names to collector classes
COLLECTOR_REGISTRY: dict[str, type[BaseCollector]] = {
"modbus_tcp": ModbusTcpCollector,
"mqtt": MqttCollector,
"http_api": HttpCollector,
}
class CollectorManager:
"""Manages lifecycle of all device collectors."""
def __init__(self):
self._collectors: dict[int, BaseCollector] = {} # device_id -> collector
self._running = False
async def start(self):
"""Load active devices from DB and start their collectors."""
self._running = True
await self._load_and_start_collectors()
logger.info("CollectorManager started with %d collectors", len(self._collectors))
async def stop(self):
"""Stop all collectors."""
self._running = False
for device_id, collector in self._collectors.items():
try:
await collector.stop()
except Exception as e:
logger.error("Error stopping collector for device %d: %s", device_id, e)
self._collectors.clear()
logger.info("CollectorManager stopped")
async def _load_and_start_collectors(self):
"""Load active devices with supported protocols and start collectors."""
async with async_session() as session:
result = await session.execute(
select(Device).where(
Device.is_active == True,
Device.protocol.in_(list(COLLECTOR_REGISTRY.keys())),
)
)
devices = result.scalars().all()
for device in devices:
await self.start_collector(
device.id,
device.code,
device.protocol,
device.connection_params or {},
device.collect_interval or 15,
)
async def start_collector(
self,
device_id: int,
device_code: str,
protocol: str,
connection_params: dict,
collect_interval: int,
) -> bool:
"""Start a single collector for a device."""
if device_id in self._collectors:
logger.warning("Collector already running for device %d", device_id)
return False
collector_cls = COLLECTOR_REGISTRY.get(protocol)
if not collector_cls:
logger.warning("No collector for protocol '%s' (device %s)", protocol, device_code)
return False
collector = collector_cls(device_id, device_code, connection_params, collect_interval)
self._collectors[device_id] = collector
await collector.start()
logger.info("Started %s collector for %s", protocol, device_code)
return True
async def stop_collector(self, device_id: int) -> bool:
"""Stop collector for a specific device."""
collector = self._collectors.pop(device_id, None)
if not collector:
return False
await collector.stop()
return True
async def restart_collector(self, device_id: int) -> bool:
"""Restart collector for a device by reloading its config from DB."""
await self.stop_collector(device_id)
async with async_session() as session:
result = await session.execute(
select(Device).where(Device.id == device_id)
)
device = result.scalar_one_or_none()
if not device or not device.is_active:
return False
return await self.start_collector(
device.id,
device.code,
device.protocol,
device.connection_params or {},
device.collect_interval or 15,
)
def get_collector(self, device_id: int) -> Optional[BaseCollector]:
return self._collectors.get(device_id)
def get_all_status(self) -> list[dict]:
"""Return status of all collectors."""
return [c.get_status() for c in self._collectors.values()]
@property
def collector_count(self) -> int:
return len(self._collectors)
@property
def is_running(self) -> bool:
return self._running