Files
ems-core/backend/app/collectors/base.py
Du Wenbo 92ec910a13 ems-core v1.0.0: Standard EMS platform core
Shared backend + frontend for multi-customer EMS deployments.
- 12 enterprise modules: quota, cost, charging, maintenance, analysis, etc.
- 120+ API endpoints, 37 database tables
- Customer config mechanism (CUSTOMER env var + YAML config)
- Collectors: Modbus TCP, MQTT, HTTP API, Sungrow iSolarCloud
- Frontend: React 19 + Ant Design + ECharts + Three.js
- Infrastructure: Redis cache, rate limiting, aggregation engine

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 18:14:11 +08:00

161 lines
5.6 KiB
Python

"""Base collector abstract class for IoT data collection."""
import asyncio
import logging
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select
from app.core.database import async_session
from app.models.device import Device
from app.models.energy import EnergyData
class BaseCollector(ABC):
"""Abstract base class for all protocol collectors."""
MAX_BACKOFF = 300 # 5 minutes max backoff
def __init__(
self,
device_id: int,
device_code: str,
connection_params: dict,
collect_interval: int = 15,
):
self.device_id = device_id
self.device_code = device_code
self.connection_params = connection_params or {}
self.collect_interval = collect_interval
self.status = "disconnected"
self.last_error: Optional[str] = None
self.last_collect_time: Optional[datetime] = None
self._task: Optional[asyncio.Task] = None
self._running = False
self._backoff = 1
self.logger = logging.getLogger(f"collector.{device_code}")
@abstractmethod
async def connect(self):
"""Establish connection to the device."""
@abstractmethod
async def disconnect(self):
"""Clean up connection resources."""
@abstractmethod
async def collect(self) -> dict:
"""Collect data points from the device.
Returns a dict mapping data_type -> (value, unit), e.g.:
{"power": (105.3, "kW"), "voltage": (220.1, "V")}
"""
async def start(self):
"""Start the collector loop."""
self._running = True
self._task = asyncio.create_task(self._run(), name=f"collector-{self.device_code}")
self.logger.info("Collector started for %s", self.device_code)
async def stop(self):
"""Stop the collector loop and disconnect."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
try:
await self.disconnect()
except Exception as e:
self.logger.warning("Error during disconnect: %s", e)
self.status = "disconnected"
self.logger.info("Collector stopped for %s", self.device_code)
async def _run(self):
"""Main loop: connect, collect at interval, save to DB."""
while self._running:
# Connect phase
if self.status != "connected":
try:
await self.connect()
self.status = "connected"
self.last_error = None
self._backoff = 1
self.logger.info("Connected to %s", self.device_code)
except Exception as e:
self.status = "error"
self.last_error = str(e)
self.logger.error("Connection failed for %s: %s", self.device_code, e)
await self._wait_backoff()
continue
# Collect phase
try:
data = await self.collect()
if data:
await self._save_data(data)
self.last_collect_time = datetime.now(timezone.utc)
self._backoff = 1
except Exception as e:
self.status = "error"
self.last_error = str(e)
self.logger.error("Collect error for %s: %s", self.device_code, e)
try:
await self.disconnect()
except Exception:
pass
self.status = "disconnected"
await self._wait_backoff()
continue
await asyncio.sleep(self.collect_interval)
async def _wait_backoff(self):
"""Wait with exponential backoff."""
wait_time = min(self._backoff, self.MAX_BACKOFF)
self.logger.debug("Backing off %ds for %s", wait_time, self.device_code)
await asyncio.sleep(wait_time)
self._backoff = min(self._backoff * 2, self.MAX_BACKOFF)
async def _save_data(self, data: dict):
"""Save collected data points to the database."""
now = datetime.now(timezone.utc)
async with async_session() as session:
points = []
for data_type, (value, unit) in data.items():
points.append(
EnergyData(
device_id=self.device_id,
timestamp=now,
data_type=data_type,
value=float(value),
unit=unit,
)
)
# Update device status
result = await session.execute(
select(Device).where(Device.id == self.device_id)
)
device = result.scalar_one_or_none()
if device:
device.status = "online"
device.last_data_time = now
session.add_all(points)
await session.commit()
self.logger.debug("Saved %d points for %s", len(points), self.device_code)
def get_status(self) -> dict:
"""Return collector status info."""
return {
"device_id": self.device_id,
"device_code": self.device_code,
"status": self.status,
"last_error": self.last_error,
"last_collect_time": self.last_collect_time.isoformat() if self.last_collect_time else None,
"collect_interval": self.collect_interval,
}