"""Base collector abstract class for IoT data collection.""" import asyncio import logging from abc import ABC, abstractmethod from datetime import datetime, timezone from typing import Optional from sqlalchemy import select from app.core.database import async_session from app.models.device import Device from app.models.energy import EnergyData class BaseCollector(ABC): """Abstract base class for all protocol collectors.""" MAX_BACKOFF = 300 # 5 minutes max backoff def __init__( self, device_id: int, device_code: str, connection_params: dict, collect_interval: int = 15, ): self.device_id = device_id self.device_code = device_code self.connection_params = connection_params or {} self.collect_interval = collect_interval self.status = "disconnected" self.last_error: Optional[str] = None self.last_collect_time: Optional[datetime] = None self._task: Optional[asyncio.Task] = None self._running = False self._backoff = 1 self.logger = logging.getLogger(f"collector.{device_code}") @abstractmethod async def connect(self): """Establish connection to the device.""" @abstractmethod async def disconnect(self): """Clean up connection resources.""" @abstractmethod async def collect(self) -> dict: """Collect data points from the device. Returns a dict mapping data_type -> (value, unit), e.g.: {"power": (105.3, "kW"), "voltage": (220.1, "V")} """ async def start(self): """Start the collector loop.""" self._running = True self._task = asyncio.create_task(self._run(), name=f"collector-{self.device_code}") self.logger.info("Collector started for %s", self.device_code) async def stop(self): """Stop the collector loop and disconnect.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass try: await self.disconnect() except Exception as e: self.logger.warning("Error during disconnect: %s", e) self.status = "disconnected" self.logger.info("Collector stopped for %s", self.device_code) async def _run(self): """Main loop: connect, collect at interval, save to DB.""" while self._running: # Connect phase if self.status != "connected": try: await self.connect() self.status = "connected" self.last_error = None self._backoff = 1 self.logger.info("Connected to %s", self.device_code) except Exception as e: self.status = "error" self.last_error = str(e) self.logger.error("Connection failed for %s: %s", self.device_code, e) await self._wait_backoff() continue # Collect phase try: data = await self.collect() if data: await self._save_data(data) self.last_collect_time = datetime.now(timezone.utc) self._backoff = 1 except Exception as e: self.status = "error" self.last_error = str(e) self.logger.error("Collect error for %s: %s", self.device_code, e) try: await self.disconnect() except Exception: pass self.status = "disconnected" await self._wait_backoff() continue await asyncio.sleep(self.collect_interval) async def _wait_backoff(self): """Wait with exponential backoff.""" wait_time = min(self._backoff, self.MAX_BACKOFF) self.logger.debug("Backing off %ds for %s", wait_time, self.device_code) await asyncio.sleep(wait_time) self._backoff = min(self._backoff * 2, self.MAX_BACKOFF) async def _save_data(self, data: dict): """Save collected data points to the database.""" now = datetime.now(timezone.utc) async with async_session() as session: points = [] for data_type, (value, unit) in data.items(): points.append( EnergyData( device_id=self.device_id, timestamp=now, data_type=data_type, value=float(value), unit=unit, ) ) # Update device status result = await session.execute( select(Device).where(Device.id == self.device_id) ) device = result.scalar_one_or_none() if device: device.status = "online" device.last_data_time = now session.add_all(points) await session.commit() self.logger.debug("Saved %d points for %s", len(points), self.device_code) def get_status(self) -> dict: """Return collector status info.""" return { "device_id": self.device_id, "device_code": self.device_code, "status": self.status, "last_error": self.last_error, "last_collect_time": self.last_collect_time.isoformat() if self.last_collect_time else None, "collect_interval": self.collect_interval, }