from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from pydantic import BaseModel from app.core.database import get_db from app.core.deps import get_current_user, require_roles from app.models.device import Device, DeviceType, DeviceGroup from app.models.user import User from app.services.audit import log_audit router = APIRouter(prefix="/devices", tags=["设备管理"]) class DeviceCreate(BaseModel): name: str code: str device_type: str group_id: int | None = None model: str | None = None manufacturer: str | None = None serial_number: str | None = None rated_power: float | None = None location: str | None = None protocol: str | None = None connection_params: dict | None = None collect_interval: int = 15 class DeviceUpdate(BaseModel): name: str | None = None group_id: int | None = None location: str | None = None protocol: str | None = None connection_params: dict | None = None collect_interval: int | None = None status: str | None = None is_active: bool | None = None @router.get("") async def list_devices( device_type: str | None = None, group_id: int | None = None, status: str | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): query = select(Device).where(Device.is_active == True) if device_type: query = query.where(Device.device_type == device_type) if group_id: query = query.where(Device.group_id == group_id) if status: query = query.where(Device.status == status) count_query = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_query)).scalar() query = query.offset((page - 1) * page_size).limit(page_size).order_by(Device.id) result = await db.execute(query) devices = result.scalars().all() return {"total": total, "items": [_device_to_dict(d) for d in devices]} @router.get("/types") async def list_device_types(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(DeviceType).order_by(DeviceType.id)) return [{"id": t.id, "code": t.code, "name": t.name, "icon": t.icon} for t in result.scalars().all()] @router.get("/groups") async def list_device_groups(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(DeviceGroup).order_by(DeviceGroup.id)) return [{"id": g.id, "name": g.name, "parent_id": g.parent_id, "location": g.location} for g in result.scalars().all()] @router.get("/stats") async def device_stats(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute( select(Device.status, func.count(Device.id)).where(Device.is_active == True).group_by(Device.status) ) stats = {row[0]: row[1] for row in result.all()} return {"online": stats.get("online", 0), "offline": stats.get("offline", 0), "alarm": stats.get("alarm", 0), "maintenance": stats.get("maintenance", 0)} @router.get("/{device_id}") async def get_device(device_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(Device).where(Device.id == device_id)) device = result.scalar_one_or_none() if not device: raise HTTPException(status_code=404, detail="设备不存在") return _device_to_dict(device) @router.post("") async def create_device(data: DeviceCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): device = Device(**data.model_dump()) db.add(device) await db.flush() await log_audit(db, user.id, "create", "device", detail=f"创建设备 {data.name} ({data.code})") return _device_to_dict(device) @router.put("/{device_id}") async def update_device(device_id: int, data: DeviceUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): result = await db.execute(select(Device).where(Device.id == device_id)) device = result.scalar_one_or_none() if not device: raise HTTPException(status_code=404, detail="设备不存在") updates = data.model_dump(exclude_unset=True) for k, v in updates.items(): setattr(device, k, v) await log_audit(db, user.id, "update", "device", detail=f"更新设备 {device.name}: {', '.join(updates.keys())}") return _device_to_dict(device) @router.get("/topology") async def get_device_topology( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """设备拓扑树 - Full device tree with counts and status""" # Get all groups group_result = await db.execute(select(DeviceGroup).order_by(DeviceGroup.id)) groups = group_result.scalars().all() # Get device counts and status per group status_query = ( select( Device.group_id, Device.status, func.count(Device.id).label('cnt'), ) .where(Device.is_active == True) .group_by(Device.group_id, Device.status) ) status_result = await db.execute(status_query) status_rows = status_result.all() # Build status map: group_id -> {status: count} group_stats: dict[int | None, dict[str, int]] = {} for group_id, status, cnt in status_rows: if group_id not in group_stats: group_stats[group_id] = {} group_stats[group_id][status] = cnt # Build group nodes group_map: dict[int, dict] = {} for g in groups: stats = group_stats.get(g.id, {}) device_count = sum(stats.values()) group_map[g.id] = { "id": g.id, "name": g.name, "location": g.location, "parent_id": g.parent_id, "children": [], "device_count": device_count, "online_count": stats.get("online", 0), "offline_count": stats.get("offline", 0), "alarm_count": stats.get("alarm", 0), } # Build tree roots = [] for gid, node in group_map.items(): pid = node["parent_id"] if pid and pid in group_map: group_map[pid]["children"].append(node) else: roots.append(node) # Propagate child counts up def propagate(node: dict) -> tuple[int, int, int, int]: total = node["device_count"] online = node["online_count"] offline = node["offline_count"] alarm = node["alarm_count"] for child in node["children"]: ct, co, coff, ca = propagate(child) total += ct online += co offline += coff alarm += ca node["total_device_count"] = total node["total_online"] = online node["total_offline"] = offline node["total_alarm"] = alarm return total, online, offline, alarm for root in roots: propagate(root) return roots def _device_to_dict(d: Device) -> dict: return { "id": d.id, "name": d.name, "code": d.code, "device_type": d.device_type, "group_id": d.group_id, "model": d.model, "manufacturer": d.manufacturer, "serial_number": d.serial_number, "rated_power": d.rated_power, "location": d.location, "protocol": d.protocol, "collect_interval": d.collect_interval, "status": d.status, "is_active": d.is_active, "last_data_time": str(d.last_data_time) if d.last_data_time else None, }