Files
tianpu-ems/backend/tests/test_monitoring.py
Du Wenbo 36c53e0e7c feat: complete platform build-out to 95% benchmark-ready
Major additions across backend, frontend, and infrastructure:

Backend:
- IoT collector framework (Modbus TCP, MQTT, HTTP) with manager
- Realistic Beijing solar/weather simulator with cloud transients
- Alarm auto-checker with demo anomaly injection (3-4 events/hour)
- Report generation (PDF/Excel) with sync fallback and E2E testing
- Energy data CSV/XLSX export endpoint
- WebSocket real-time broadcast at /ws/realtime
- Alembic initial migration for all 14 tables
- 77 pytest tests across 9 API routers

Frontend:
- Live notification badge with alarm count (was hardcoded 0)
- Sankey energy flow diagram on dashboard
- Device photos (SVG illustrations) on all device pages
- Report download with status icons
- Energy data export buttons (CSV/Excel)
- WebSocket hook with auto-reconnect and polling fallback
- BigScreen 2D responsive CSS (tablet/mobile)
- Error handling improvements across pages

Infrastructure:
- PostgreSQL + TimescaleDB as primary database
- Production docker-compose with nginx reverse proxy
- Comprehensive Chinese README
- .env.example with documentation
- quick-start.sh deployment script
- nginx config with gzip, caching, security headers

Data:
- 30-day realistic backfill (47K rows, weather-correlated)
- 18 devices, 6 alarm rules, 15 historical alarm events
- Beijing solar position model with seasonal variation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 18:46:42 +08:00

45 lines
1.6 KiB
Python

import pytest
from conftest import auth_header
class TestDeviceRealtime:
async def test_get_device_realtime(self, client, admin_user, admin_token, seed_devices, seed_energy_data):
device = seed_devices[0]
resp = await client.get(
f"/api/v1/monitoring/devices/{device.id}/realtime",
headers=auth_header(admin_token),
)
assert resp.status_code == 200
body = resp.json()
assert "device" in body
assert "data" in body
assert body["device"]["id"] == device.id
async def test_get_device_realtime_no_device(self, client, admin_user, admin_token):
resp = await client.get(
"/api/v1/monitoring/devices/99999/realtime",
headers=auth_header(admin_token),
)
assert resp.status_code == 200
body = resp.json()
assert body["device"] is None
async def test_get_device_realtime_unauthenticated(self, client):
resp = await client.get("/api/v1/monitoring/devices/1/realtime")
assert resp.status_code == 401
class TestEnergyFlow:
async def test_get_energy_flow(self, client, admin_user, admin_token, seed_devices, seed_energy_data):
resp = await client.get("/api/v1/monitoring/energy-flow", headers=auth_header(admin_token))
assert resp.status_code == 200
body = resp.json()
assert "nodes" in body
assert "links" in body
assert len(body["nodes"]) == 4
assert len(body["links"]) == 4
async def test_get_energy_flow_unauthenticated(self, client):
resp = await client.get("/api/v1/monitoring/energy-flow")
assert resp.status_code == 401