Files
zpark-ems/core/backend/app/api/v1/assets.py

289 lines
9.7 KiB
Python
Raw Normal View History

from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from pydantic import BaseModel
from app.core.database import get_db
from app.core.deps import get_current_user, require_roles
from app.models.maintenance import Asset, AssetCategory, AssetChange
from app.models.user import User
router = APIRouter(prefix="/assets", tags=["资产管理"])
# ── Pydantic Schemas ────────────────────────────────────────────────
class AssetCreate(BaseModel):
name: str
code: str | None = None
category_id: int | None = None
station_name: str | None = None
location: str | None = None
manufacturer: str | None = None
model: str | None = None
serial_number: str | None = None
rated_power: float | None = None
install_date: str | None = None
warranty_until: str | None = None
status: str = "active"
specs: dict | None = None
notes: str | None = None
class AssetUpdate(BaseModel):
name: str | None = None
code: str | None = None
category_id: int | None = None
station_name: str | None = None
location: str | None = None
manufacturer: str | None = None
model: str | None = None
serial_number: str | None = None
rated_power: float | None = None
install_date: str | None = None
warranty_until: str | None = None
status: str | None = None
specs: dict | None = None
notes: str | None = None
class CategoryCreate(BaseModel):
name: str
description: str | None = None
parent_id: int | None = None
class ChangeCreate(BaseModel):
asset_id: int
change_type: str
description: str | None = None
changed_by: int | None = None
change_date: str | None = None
# ── Helpers ─────────────────────────────────────────────────────────
def _asset_to_dict(a: Asset) -> dict:
return {
"id": a.id, "name": a.name, "code": a.code,
"category_id": a.category_id, "station_name": a.station_name,
"location": a.location, "manufacturer": a.manufacturer,
"model": a.model, "serial_number": a.serial_number,
"rated_power": a.rated_power,
"install_date": str(a.install_date) if a.install_date else None,
"warranty_until": str(a.warranty_until) if a.warranty_until else None,
"status": a.status, "specs": a.specs, "notes": a.notes,
"created_at": str(a.created_at) if a.created_at else None,
"updated_at": str(a.updated_at) if a.updated_at else None,
}
def _category_to_dict(c: AssetCategory) -> dict:
return {
"id": c.id, "name": c.name, "description": c.description,
"parent_id": c.parent_id,
"created_at": str(c.created_at) if c.created_at else None,
}
def _change_to_dict(ch: AssetChange) -> dict:
return {
"id": ch.id, "asset_id": ch.asset_id, "change_type": ch.change_type,
"description": ch.description, "changed_by": ch.changed_by,
"change_date": str(ch.change_date) if ch.change_date else None,
"created_at": str(ch.created_at) if ch.created_at else None,
}
# ── Asset Categories ───────────────────────────────────────────────
@router.get("/categories")
async def list_categories(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(AssetCategory).order_by(AssetCategory.id))
return [_category_to_dict(c) for c in result.scalars().all()]
@router.post("/categories")
async def create_category(
data: CategoryCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
cat = AssetCategory(**data.model_dump())
db.add(cat)
await db.flush()
return _category_to_dict(cat)
# ── Asset Statistics ───────────────────────────────────────────────
@router.get("/stats")
async def asset_stats(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
# Count by status
status_q = select(Asset.status, func.count()).group_by(Asset.status)
status_result = await db.execute(status_q)
by_status = {row[0]: row[1] for row in status_result.all()}
# Count by category
cat_q = select(Asset.category_id, func.count()).group_by(Asset.category_id)
cat_result = await db.execute(cat_q)
by_category = {str(row[0]): row[1] for row in cat_result.all()}
total = sum(by_status.values())
return {
"total": total,
"by_status": by_status,
"by_category": by_category,
}
# ── Asset Change Records ──────────────────────────────────────────
@router.get("/changes")
async def list_changes(
asset_id: int | None = None,
change_type: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(AssetChange)
if asset_id:
query = query.where(AssetChange.asset_id == asset_id)
if change_type:
query = query.where(AssetChange.change_type == change_type)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(AssetChange.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_change_to_dict(ch) for ch in result.scalars().all()],
}
@router.post("/changes")
async def create_change(
data: ChangeCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
ch = AssetChange(**data.model_dump(exclude={"change_date"}))
if data.change_date:
ch.change_date = datetime.fromisoformat(data.change_date)
else:
ch.change_date = datetime.now(timezone.utc)
if not ch.changed_by:
ch.changed_by = user.id
db.add(ch)
await db.flush()
return _change_to_dict(ch)
# ── Assets CRUD ────────────────────────────────────────────────────
@router.get("")
async def list_assets(
station_name: str | None = None,
category_id: int | None = None,
status: str | None = None,
search: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(Asset)
if station_name:
query = query.where(Asset.station_name == station_name)
if category_id:
query = query.where(Asset.category_id == category_id)
if status:
query = query.where(Asset.status == status)
if search:
query = query.where(Asset.name.ilike(f"%{search}%"))
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(Asset.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_asset_to_dict(a) for a in result.scalars().all()],
}
@router.get("/{asset_id}")
async def get_asset(
asset_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(Asset).where(Asset.id == asset_id))
asset = result.scalar_one_or_none()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
return _asset_to_dict(asset)
@router.post("")
async def create_asset(
data: AssetCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
asset = Asset(**data.model_dump(exclude={"install_date", "warranty_until"}))
if data.install_date:
asset.install_date = datetime.fromisoformat(data.install_date)
if data.warranty_until:
asset.warranty_until = datetime.fromisoformat(data.warranty_until)
db.add(asset)
await db.flush()
return _asset_to_dict(asset)
@router.put("/{asset_id}")
async def update_asset(
asset_id: int,
data: AssetUpdate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(Asset).where(Asset.id == asset_id))
asset = result.scalar_one_or_none()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
for k, v in data.model_dump(exclude_unset=True, exclude={"install_date", "warranty_until"}).items():
setattr(asset, k, v)
if data.install_date:
asset.install_date = datetime.fromisoformat(data.install_date)
if data.warranty_until:
asset.warranty_until = datetime.fromisoformat(data.warranty_until)
return _asset_to_dict(asset)
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(Asset).where(Asset.id == asset_id))
asset = result.scalar_one_or_none()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
asset.status = "inactive"
return {"message": "已删除"}