Files

88 lines
3.0 KiB
Python
Raw Permalink Normal View History

"""Modbus TCP protocol collector."""
import struct
from typing import Optional
from pymodbus.client import AsyncModbusTcpClient
from app.collectors.base import BaseCollector
class ModbusTcpCollector(BaseCollector):
"""Collect data from devices via Modbus TCP.
connection_params example:
{
"host": "192.168.1.100",
"port": 502,
"slave_id": 1,
"registers": [
{"address": 0, "count": 2, "data_type": "active_power", "scale": 0.1, "unit": "kW"},
{"address": 2, "count": 2, "data_type": "voltage", "scale": 0.1, "unit": "V"}
]
}
"""
def __init__(self, device_id, device_code, connection_params, collect_interval=15):
super().__init__(device_id, device_code, connection_params, collect_interval)
self._client: Optional[AsyncModbusTcpClient] = None
self._host = connection_params.get("host", "127.0.0.1")
self._port = connection_params.get("port", 502)
self._slave_id = connection_params.get("slave_id", 1)
self._registers = connection_params.get("registers", [])
async def connect(self):
self._client = AsyncModbusTcpClient(
self._host,
port=self._port,
timeout=5,
)
connected = await self._client.connect()
if not connected:
raise ConnectionError(f"Cannot connect to Modbus TCP {self._host}:{self._port}")
async def disconnect(self):
if self._client:
self._client.close()
self._client = None
async def collect(self) -> dict:
if not self._client or not self._client.connected:
raise ConnectionError("Modbus client not connected")
data = {}
for reg in self._registers:
address = reg["address"]
count = reg.get("count", 1)
data_type = reg["data_type"]
scale = reg.get("scale", 1.0)
unit = reg.get("unit", "")
result = await self._client.read_holding_registers(
address, count=count, slave=self._slave_id
)
if result.isError():
self.logger.warning(
"Modbus read error at address %d for %s: %s",
address, self.device_code, result,
)
continue
raw_value = self._decode_registers(result.registers, count)
value = round(raw_value * scale, 4)
data[data_type] = (value, unit)
return data
@staticmethod
def _decode_registers(registers: list, count: int) -> float:
"""Decode register values to a numeric value."""
if count == 1:
return float(registers[0])
elif count == 2:
# Two 16-bit registers -> 32-bit float (big-endian)
raw = struct.pack(">HH", registers[0], registers[1])
return struct.unpack(">f", raw)[0]
else:
# Fallback: treat as concatenated 16-bit values
return float(registers[0])