feat: v2.0 — maintenance module, AI analysis, station power fix

- Add full 检修维护中心 (6.4): 3-type work orders (消缺/巡检/抄表),
  asset management, warehouse, work plans, billing settlement
- Add AI智能分析 tab with LLM-powered diagnostics (StepFun + ZhipuAI)
- Add AI模型配置 settings page (provider, temperature, prompts)
- Fix station power accuracy: use API station total (station_power)
  instead of inverter-level computation — eliminates timing gaps
- Add 7 new DB models, 4 new API routers, 5 new frontend pages
- Migrations: 009 (maintenance expansion) + 010 (AI analysis)
- Version bump: 1.6.1 → 2.0.0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Du Wenbo
2026-04-12 21:16:03 +08:00
parent 7947a230c4
commit f0f13faf00
30 changed files with 3325 additions and 52 deletions

View File

@@ -0,0 +1,167 @@
"""Add maintenance expansion tables and order_type
Revision ID: 009_maintenance_expansion
Revises: 008_management
Create Date: 2026-04-11
"""
from alembic import op
import sqlalchemy as sa
revision = "009_maintenance_expansion"
down_revision = "008_management"
branch_labels = None
depends_on = None
def upgrade() -> None:
# --- Add order_type, station_name, due_date to repair_orders ---
op.add_column("repair_orders", sa.Column("order_type", sa.String(20), server_default="repair"))
op.add_column("repair_orders", sa.Column("station_name", sa.String(200)))
op.add_column("repair_orders", sa.Column("due_date", sa.DateTime(timezone=True)))
# --- asset_categories ---
op.create_table(
"asset_categories",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("parent_id", sa.Integer, sa.ForeignKey("asset_categories.id"), nullable=True),
sa.Column("description", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# --- assets ---
op.create_table(
"assets",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("name", sa.String(200), nullable=False),
sa.Column("asset_code", sa.String(50), unique=True),
sa.Column("category_id", sa.Integer, sa.ForeignKey("asset_categories.id")),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id"), nullable=True),
sa.Column("station_name", sa.String(200)),
sa.Column("manufacturer", sa.String(200)),
sa.Column("model_number", sa.String(100)),
sa.Column("serial_number", sa.String(100)),
sa.Column("purchase_date", sa.DateTime(timezone=True)),
sa.Column("warranty_expiry", sa.DateTime(timezone=True)),
sa.Column("purchase_price", sa.Float),
sa.Column("status", sa.String(20), server_default="active"),
sa.Column("location", sa.String(200)),
sa.Column("responsible_dept", sa.String(100)),
sa.Column("custodian", sa.String(100)),
sa.Column("supplier", sa.String(200)),
sa.Column("notes", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# --- asset_changes ---
op.create_table(
"asset_changes",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("asset_id", sa.Integer, sa.ForeignKey("assets.id"), nullable=False),
sa.Column("change_type", sa.String(20)),
sa.Column("change_date", sa.DateTime(timezone=True)),
sa.Column("description", sa.Text),
sa.Column("old_value", sa.JSON),
sa.Column("new_value", sa.JSON),
sa.Column("operator_id", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# --- spare_parts ---
op.create_table(
"spare_parts",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("name", sa.String(200), nullable=False),
sa.Column("part_code", sa.String(50), unique=True),
sa.Column("category", sa.String(100)),
sa.Column("specification", sa.String(200)),
sa.Column("unit", sa.String(20)),
sa.Column("current_stock", sa.Integer, server_default="0"),
sa.Column("min_stock", sa.Integer, server_default="0"),
sa.Column("max_stock", sa.Integer),
sa.Column("warehouse_location", sa.String(100)),
sa.Column("unit_price", sa.Float),
sa.Column("supplier", sa.String(200)),
sa.Column("notes", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# --- warehouse_transactions ---
op.create_table(
"warehouse_transactions",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("spare_part_id", sa.Integer, sa.ForeignKey("spare_parts.id"), nullable=False),
sa.Column("transaction_type", sa.String(20)),
sa.Column("quantity", sa.Integer, nullable=False),
sa.Column("unit_price", sa.Float),
sa.Column("total_price", sa.Float),
sa.Column("transaction_date", sa.DateTime(timezone=True)),
sa.Column("work_order_id", sa.Integer, sa.ForeignKey("repair_orders.id"), nullable=True),
sa.Column("reason", sa.String(200)),
sa.Column("operator_id", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("notes", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# --- maintenance_work_plans ---
op.create_table(
"maintenance_work_plans",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("name", sa.String(200), nullable=False),
sa.Column("plan_type", sa.String(20)),
sa.Column("station_name", sa.String(200)),
sa.Column("device_ids", sa.JSON),
sa.Column("cycle_period", sa.String(20)),
sa.Column("execution_days", sa.Integer),
sa.Column("effective_start", sa.DateTime(timezone=True)),
sa.Column("effective_end", sa.DateTime(timezone=True)),
sa.Column("description", sa.Text),
sa.Column("workflow_config", sa.JSON),
sa.Column("is_active", sa.Boolean, server_default="true"),
sa.Column("created_by", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# --- billing_records ---
op.create_table(
"billing_records",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("station_name", sa.String(200)),
sa.Column("billing_type", sa.String(20)),
sa.Column("billing_period_start", sa.DateTime(timezone=True)),
sa.Column("billing_period_end", sa.DateTime(timezone=True)),
sa.Column("generation_kwh", sa.Float),
sa.Column("self_use_kwh", sa.Float),
sa.Column("grid_feed_kwh", sa.Float),
sa.Column("electricity_price", sa.Float),
sa.Column("self_use_price", sa.Float),
sa.Column("feed_in_tariff", sa.Float),
sa.Column("total_amount", sa.Float),
sa.Column("self_use_amount", sa.Float),
sa.Column("feed_in_amount", sa.Float),
sa.Column("subsidy_amount", sa.Float),
sa.Column("status", sa.String(20), server_default="pending"),
sa.Column("invoice_number", sa.String(100)),
sa.Column("invoice_date", sa.DateTime(timezone=True)),
sa.Column("attachment_url", sa.String(500)),
sa.Column("notes", sa.Text),
sa.Column("created_by", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
def downgrade() -> None:
op.drop_table("billing_records")
op.drop_table("maintenance_work_plans")
op.drop_table("warehouse_transactions")
op.drop_table("spare_parts")
op.drop_table("asset_changes")
op.drop_table("assets")
op.drop_table("asset_categories")
op.drop_column("repair_orders", "due_date")
op.drop_column("repair_orders", "station_name")
op.drop_column("repair_orders", "order_type")

View File

@@ -0,0 +1,35 @@
"""Add AI analysis results table
Revision ID: 010_ai_analysis
Revises: 009_maintenance_expansion
Create Date: 2026-04-11
"""
from alembic import op
import sqlalchemy as sa
revision = "010_ai_analysis"
down_revision = "009_maintenance_expansion"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"ai_analysis_results",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("scope", sa.String(20), server_default="station"),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id"), nullable=True),
sa.Column("analysis_type", sa.String(20), server_default="diagnostic"),
sa.Column("prompt_used", sa.Text),
sa.Column("result_text", sa.Text),
sa.Column("model_used", sa.String(100)),
sa.Column("provider_used", sa.String(20)),
sa.Column("tokens_used", sa.Integer),
sa.Column("duration_ms", sa.Integer),
sa.Column("created_by", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
def downgrade() -> None:
op.drop_table("ai_analysis_results")

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding, version, kpi
from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding, version, kpi, assets, warehouse, work_plans, billing
api_router = APIRouter(prefix="/api/v1")
@@ -28,3 +28,7 @@ api_router.include_router(ai_ops.router)
api_router.include_router(branding.router)
api_router.include_router(version.router)
api_router.include_router(kpi.router)
api_router.include_router(assets.router)
api_router.include_router(warehouse.router)
api_router.include_router(work_plans.router)
api_router.include_router(billing.router)

View File

@@ -588,3 +588,148 @@ async def ai_ops_dashboard(
):
"""AI运维总览仪表盘"""
return await get_dashboard_data(db)
# ── AI Analysis (LLM-powered) ──────────────────────────────────────
@router.post("/analyze")
async def ai_analyze(
scope: str = Query("station", description="station, device, all"),
device_id: int | None = Query(None),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Run AI analysis on station or specific device."""
import time
from app.models.setting import SystemSetting
from app.services.llm_service import chat_completion
from app.models.ai_ops import AIAnalysisResult
# Load raw settings (unmasked)
result = await db.execute(select(SystemSetting))
db_settings = {s.key: s.value for s in result.scalars().all()}
from app.api.v1.settings import DEFAULTS
settings = {**DEFAULTS, **db_settings}
if settings.get("ai_enabled") != "true":
raise HTTPException(status_code=400, detail="AI功能未启用请在系统设置中开启")
# Gather context data
context_parts = []
if scope == "device" and device_id:
dev_result = await db.execute(select(Device).where(Device.id == device_id))
device = dev_result.scalar_one_or_none()
if not device:
raise HTTPException(status_code=404, detail="设备不存在")
context_parts.append(f"设备名称: {device.name}, 类型: {device.device_type}, 状态: {device.status}")
# Get recent health scores
health_q = select(DeviceHealthScore).where(
DeviceHealthScore.device_id == device_id
).order_by(DeviceHealthScore.timestamp.desc()).limit(5)
health_scores = (await db.execute(health_q)).scalars().all()
if health_scores:
scores_text = ", ".join([f"{h.health_score:.0f}分({h.status})" for h in health_scores])
context_parts.append(f"近期健康评分: {scores_text}")
# Get recent anomalies
anom_q = select(AnomalyDetection).where(
AnomalyDetection.device_id == device_id
).order_by(AnomalyDetection.detected_at.desc()).limit(10)
anomalies = (await db.execute(anom_q)).scalars().all()
anom_text = ""
if anomalies:
anom_text = "; ".join([f"{a.anomaly_type}({a.severity}): {a.description}" for a in anomalies[:5]])
context_parts.append(f"近期异常: {anom_text}")
prompt_template = settings.get("ai_diagnostic_prompt", "")
user_prompt = prompt_template.replace("{device_info}", context_parts[0] if context_parts else "").replace("{metrics}", "\n".join(context_parts[1:])).replace("{alarms}", anom_text if anomalies else "")
else:
# Station-level analysis
dev_result = await db.execute(select(Device))
devices = dev_result.scalars().all()
context_parts.append(f"设备总数: {len(devices)}")
online = sum(1 for d in devices if d.status == "online")
context_parts.append(f"在线设备: {online}, 离线: {len(devices) - online}")
# Recent insights
insight_q = select(OpsInsight).order_by(OpsInsight.generated_at.desc()).limit(5)
insights = (await db.execute(insight_q)).scalars().all()
if insights:
insight_text = "; ".join([f"{i.title}: {i.description}" for i in insights[:3]])
context_parts.append(f"近期洞察: {insight_text}")
prompt_template = settings.get("ai_insight_prompt", "")
user_prompt = prompt_template.replace("{station_info}", context_parts[0]).replace("{kpis}", "\n".join(context_parts[1:])).replace("{recent_alarms}", "")
system_prompt = settings.get("ai_system_prompt", "你是光伏电站智能运维助手。")
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
start_time = time.time()
try:
result_text = await chat_completion(messages, settings)
except Exception as e:
raise HTTPException(status_code=500, detail=f"AI分析失败: {str(e)}")
duration_ms = int((time.time() - start_time) * 1000)
# Save result
analysis = AIAnalysisResult(
scope=scope,
device_id=device_id,
analysis_type="diagnostic" if scope == "device" else "insight",
prompt_used=user_prompt[:2000],
result_text=result_text,
model_used=settings.get("ai_model_name", ""),
provider_used=settings.get("ai_provider", "stepfun"),
tokens_used=0,
duration_ms=duration_ms,
created_by=user.id,
)
db.add(analysis)
await db.flush()
return {
"id": analysis.id,
"scope": scope,
"device_id": device_id,
"result": result_text,
"model": settings.get("ai_model_name"),
"duration_ms": duration_ms,
}
@router.get("/analysis-history")
async def get_analysis_history(
scope: str | None = None,
device_id: int | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=50),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""Get AI analysis history."""
from app.models.ai_ops import AIAnalysisResult
query = select(AIAnalysisResult)
if scope:
query = query.where(AIAnalysisResult.scope == scope)
if device_id:
query = query.where(AIAnalysisResult.device_id == device_id)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar() or 0
query = query.order_by(AIAnalysisResult.created_at.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
items = [{
"id": r.id, "scope": r.scope, "device_id": r.device_id,
"analysis_type": r.analysis_type, "result_text": r.result_text[:500],
"model_used": r.model_used, "provider_used": r.provider_used,
"duration_ms": r.duration_ms,
"created_at": str(r.created_at) if r.created_at else None,
} for r in result.scalars().all()]
return {"total": total, "items": items}

View File

@@ -0,0 +1,288 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from pydantic import BaseModel
from app.core.database import get_db
from app.core.deps import get_current_user, require_roles
from app.models.maintenance import Asset, AssetCategory, AssetChange
from app.models.user import User
router = APIRouter(prefix="/assets", tags=["资产管理"])
# ── Pydantic Schemas ────────────────────────────────────────────────
class AssetCreate(BaseModel):
name: str
code: str | None = None
category_id: int | None = None
station_name: str | None = None
location: str | None = None
manufacturer: str | None = None
model: str | None = None
serial_number: str | None = None
rated_power: float | None = None
install_date: str | None = None
warranty_until: str | None = None
status: str = "active"
specs: dict | None = None
notes: str | None = None
class AssetUpdate(BaseModel):
name: str | None = None
code: str | None = None
category_id: int | None = None
station_name: str | None = None
location: str | None = None
manufacturer: str | None = None
model: str | None = None
serial_number: str | None = None
rated_power: float | None = None
install_date: str | None = None
warranty_until: str | None = None
status: str | None = None
specs: dict | None = None
notes: str | None = None
class CategoryCreate(BaseModel):
name: str
description: str | None = None
parent_id: int | None = None
class ChangeCreate(BaseModel):
asset_id: int
change_type: str
description: str | None = None
changed_by: int | None = None
change_date: str | None = None
# ── Helpers ─────────────────────────────────────────────────────────
def _asset_to_dict(a: Asset) -> dict:
return {
"id": a.id, "name": a.name, "code": a.code,
"category_id": a.category_id, "station_name": a.station_name,
"location": a.location, "manufacturer": a.manufacturer,
"model": a.model, "serial_number": a.serial_number,
"rated_power": a.rated_power,
"install_date": str(a.install_date) if a.install_date else None,
"warranty_until": str(a.warranty_until) if a.warranty_until else None,
"status": a.status, "specs": a.specs, "notes": a.notes,
"created_at": str(a.created_at) if a.created_at else None,
"updated_at": str(a.updated_at) if a.updated_at else None,
}
def _category_to_dict(c: AssetCategory) -> dict:
return {
"id": c.id, "name": c.name, "description": c.description,
"parent_id": c.parent_id,
"created_at": str(c.created_at) if c.created_at else None,
}
def _change_to_dict(ch: AssetChange) -> dict:
return {
"id": ch.id, "asset_id": ch.asset_id, "change_type": ch.change_type,
"description": ch.description, "changed_by": ch.changed_by,
"change_date": str(ch.change_date) if ch.change_date else None,
"created_at": str(ch.created_at) if ch.created_at else None,
}
# ── Asset Categories ───────────────────────────────────────────────
@router.get("/categories")
async def list_categories(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(AssetCategory).order_by(AssetCategory.id))
return [_category_to_dict(c) for c in result.scalars().all()]
@router.post("/categories")
async def create_category(
data: CategoryCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
cat = AssetCategory(**data.model_dump())
db.add(cat)
await db.flush()
return _category_to_dict(cat)
# ── Asset Statistics ───────────────────────────────────────────────
@router.get("/stats")
async def asset_stats(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
# Count by status
status_q = select(Asset.status, func.count()).group_by(Asset.status)
status_result = await db.execute(status_q)
by_status = {row[0]: row[1] for row in status_result.all()}
# Count by category
cat_q = select(Asset.category_id, func.count()).group_by(Asset.category_id)
cat_result = await db.execute(cat_q)
by_category = {str(row[0]): row[1] for row in cat_result.all()}
total = sum(by_status.values())
return {
"total": total,
"by_status": by_status,
"by_category": by_category,
}
# ── Asset Change Records ──────────────────────────────────────────
@router.get("/changes")
async def list_changes(
asset_id: int | None = None,
change_type: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(AssetChange)
if asset_id:
query = query.where(AssetChange.asset_id == asset_id)
if change_type:
query = query.where(AssetChange.change_type == change_type)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(AssetChange.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_change_to_dict(ch) for ch in result.scalars().all()],
}
@router.post("/changes")
async def create_change(
data: ChangeCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
ch = AssetChange(**data.model_dump(exclude={"change_date"}))
if data.change_date:
ch.change_date = datetime.fromisoformat(data.change_date)
else:
ch.change_date = datetime.now(timezone.utc)
if not ch.changed_by:
ch.changed_by = user.id
db.add(ch)
await db.flush()
return _change_to_dict(ch)
# ── Assets CRUD ────────────────────────────────────────────────────
@router.get("")
async def list_assets(
station_name: str | None = None,
category_id: int | None = None,
status: str | None = None,
search: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(Asset)
if station_name:
query = query.where(Asset.station_name == station_name)
if category_id:
query = query.where(Asset.category_id == category_id)
if status:
query = query.where(Asset.status == status)
if search:
query = query.where(Asset.name.ilike(f"%{search}%"))
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(Asset.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_asset_to_dict(a) for a in result.scalars().all()],
}
@router.get("/{asset_id}")
async def get_asset(
asset_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(Asset).where(Asset.id == asset_id))
asset = result.scalar_one_or_none()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
return _asset_to_dict(asset)
@router.post("")
async def create_asset(
data: AssetCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
asset = Asset(**data.model_dump(exclude={"install_date", "warranty_until"}))
if data.install_date:
asset.install_date = datetime.fromisoformat(data.install_date)
if data.warranty_until:
asset.warranty_until = datetime.fromisoformat(data.warranty_until)
db.add(asset)
await db.flush()
return _asset_to_dict(asset)
@router.put("/{asset_id}")
async def update_asset(
asset_id: int,
data: AssetUpdate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(Asset).where(Asset.id == asset_id))
asset = result.scalar_one_or_none()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
for k, v in data.model_dump(exclude_unset=True, exclude={"install_date", "warranty_until"}).items():
setattr(asset, k, v)
if data.install_date:
asset.install_date = datetime.fromisoformat(data.install_date)
if data.warranty_until:
asset.warranty_until = datetime.fromisoformat(data.warranty_until)
return _asset_to_dict(asset)
@router.delete("/{asset_id}")
async def delete_asset(
asset_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(Asset).where(Asset.id == asset_id))
asset = result.scalar_one_or_none()
if not asset:
raise HTTPException(status_code=404, detail="资产不存在")
asset.status = "inactive"
return {"message": "已删除"}

View File

@@ -0,0 +1,244 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, extract
from pydantic import BaseModel
from app.core.database import get_db
from app.core.deps import get_current_user, require_roles
from app.models.maintenance import BillingRecord
from app.models.user import User
router = APIRouter(prefix="/billing", tags=["电费结算"])
# ── Pydantic Schemas ────────────────────────────────────────────────
class BillingCreate(BaseModel):
station_name: str
billing_type: str # "generation", "consumption", "grid_feed"
year: int
month: int
generation_kwh: float | None = None
consumption_kwh: float | None = None
grid_feed_kwh: float | None = None
unit_price: float | None = None
total_amount: float | None = None
status: str = "draft"
invoice_number: str | None = None
invoice_date: str | None = None
payment_date: str | None = None
notes: str | None = None
class BillingUpdate(BaseModel):
station_name: str | None = None
billing_type: str | None = None
year: int | None = None
month: int | None = None
generation_kwh: float | None = None
consumption_kwh: float | None = None
grid_feed_kwh: float | None = None
unit_price: float | None = None
total_amount: float | None = None
status: str | None = None
invoice_number: str | None = None
invoice_date: str | None = None
payment_date: str | None = None
notes: str | None = None
# ── Helpers ─────────────────────────────────────────────────────────
def _billing_to_dict(b: BillingRecord) -> dict:
return {
"id": b.id, "station_name": b.station_name,
"billing_type": b.billing_type,
"year": b.year, "month": b.month,
"generation_kwh": b.generation_kwh,
"consumption_kwh": b.consumption_kwh,
"grid_feed_kwh": b.grid_feed_kwh,
"unit_price": b.unit_price,
"total_amount": b.total_amount,
"status": b.status,
"invoice_number": b.invoice_number,
"invoice_date": str(b.invoice_date) if b.invoice_date else None,
"payment_date": str(b.payment_date) if b.payment_date else None,
"notes": b.notes,
"created_by": b.created_by,
"created_at": str(b.created_at) if b.created_at else None,
"updated_at": str(b.updated_at) if b.updated_at else None,
}
# ── Billing CRUD ───────────────────────────────────────────────────
@router.get("")
async def list_billing(
station_name: str | None = None,
billing_type: str | None = None,
status: str | None = None,
year: int | None = None,
month: int | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(BillingRecord)
if station_name:
query = query.where(BillingRecord.station_name == station_name)
if billing_type:
query = query.where(BillingRecord.billing_type == billing_type)
if status:
query = query.where(BillingRecord.status == status)
if year:
query = query.where(BillingRecord.year == year)
if month:
query = query.where(BillingRecord.month == month)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(BillingRecord.year.desc(), BillingRecord.month.desc(), BillingRecord.id.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_billing_to_dict(b) for b in result.scalars().all()],
}
@router.get("/stats")
async def billing_stats(
year: int | None = None,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(BillingRecord)
if year:
query = query.where(BillingRecord.year == year)
# Total generation
gen_q = select(func.sum(BillingRecord.generation_kwh))
if year:
gen_q = gen_q.where(BillingRecord.year == year)
total_generation = (await db.execute(gen_q)).scalar() or 0
# Total amount
amt_q = select(func.sum(BillingRecord.total_amount))
if year:
amt_q = amt_q.where(BillingRecord.year == year)
total_amount = (await db.execute(amt_q)).scalar() or 0
# By month
month_q = select(
BillingRecord.month,
func.sum(BillingRecord.generation_kwh).label("generation"),
func.sum(BillingRecord.total_amount).label("amount"),
).group_by(BillingRecord.month).order_by(BillingRecord.month)
if year:
month_q = month_q.where(BillingRecord.year == year)
month_result = await db.execute(month_q)
by_month = [
{"month": row[0], "generation_kwh": float(row[1] or 0), "total_amount": float(row[2] or 0)}
for row in month_result.all()
]
# By type
type_q = select(
BillingRecord.billing_type,
func.sum(BillingRecord.total_amount).label("amount"),
).group_by(BillingRecord.billing_type)
if year:
type_q = type_q.where(BillingRecord.year == year)
type_result = await db.execute(type_q)
by_type = {row[0]: float(row[1] or 0) for row in type_result.all()}
return {
"total_generation_kwh": float(total_generation),
"total_amount": float(total_amount),
"by_month": by_month,
"by_type": by_type,
}
@router.get("/export")
async def export_billing(
station_name: str | None = None,
year: int | None = None,
month: int | None = None,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(BillingRecord)
if station_name:
query = query.where(BillingRecord.station_name == station_name)
if year:
query = query.where(BillingRecord.year == year)
if month:
query = query.where(BillingRecord.month == month)
query = query.order_by(BillingRecord.year.desc(), BillingRecord.month.desc())
result = await db.execute(query)
records = [_billing_to_dict(b) for b in result.scalars().all()]
return {
"columns": [
"station_name", "billing_type", "year", "month",
"generation_kwh", "consumption_kwh", "grid_feed_kwh",
"unit_price", "total_amount", "status",
"invoice_number", "invoice_date", "payment_date",
],
"data": records,
}
@router.get("/{billing_id}")
async def get_billing(
billing_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(BillingRecord).where(BillingRecord.id == billing_id))
record = result.scalar_one_or_none()
if not record:
raise HTTPException(status_code=404, detail="结算记录不存在")
return _billing_to_dict(record)
@router.post("")
async def create_billing(
data: BillingCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
record = BillingRecord(
**data.model_dump(exclude={"invoice_date", "payment_date"}),
created_by=user.id,
)
if data.invoice_date:
record.invoice_date = datetime.fromisoformat(data.invoice_date)
if data.payment_date:
record.payment_date = datetime.fromisoformat(data.payment_date)
db.add(record)
await db.flush()
return _billing_to_dict(record)
@router.put("/{billing_id}")
async def update_billing(
billing_id: int,
data: BillingUpdate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(BillingRecord).where(BillingRecord.id == billing_id))
record = result.scalar_one_or_none()
if not record:
raise HTTPException(status_code=404, detail="结算记录不存在")
for k, v in data.model_dump(exclude_unset=True, exclude={"invoice_date", "payment_date"}).items():
setattr(record, k, v)
if data.invoice_date:
record.invoice_date = datetime.fromisoformat(data.invoice_date)
if data.payment_date:
record.payment_date = datetime.fromisoformat(data.payment_date)
return _billing_to_dict(record)

View File

@@ -105,58 +105,107 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
@router.get("/realtime")
async def get_realtime_data(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""实时功率数据 - 获取最近的采集数据,按站去重防止重复计数"""
"""实时功率数据 - 优先使用API电站汇总数据(station_power),回退到按站去重计算"""
now = datetime.now(timezone.utc)
window_start = now - timedelta(minutes=20)
# Get latest power per station (dedup by device name prefix)
# Sungrow collectors report station-level power, so multiple devices
# sharing the same station (AP1xx = Phase 1, AP2xx = Phase 2) report
# identical values. GROUP BY station prefix and take MAX to avoid
# double-counting.
from sqlalchemy import text as sa_text
pv_ids = await _get_pv_device_ids(db)
hp_ids = await _get_hp_device_ids(db)
# PV power: dedup by station prefix
# ── PV power ──
# Strategy: Use station_power (direct from API station summary) if available.
# This matches iSolarCloud's own total exactly, avoiding any timing/grouping
# discrepancies from computing it ourselves.
# Fallback: group by device name prefix and take MAX (legacy method).
pv_power = 0
if pv_ids:
pv_q = await db.execute(
# Try station_power first (API station total, stored by collector)
station_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
EnergyData.device_id,
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.data_type == "station_power",
EnergyData.device_id.in_(pv_ids),
)
).group_by(sa_text("1"))
).group_by(EnergyData.device_id)
)
pv_power = sum(row[1] or 0 for row in pv_q.all())
else:
pv_power = 0
station_rows = station_q.all()
# Heat pump power: dedup by station prefix
if station_rows:
# station_power is per-station total. Multiple devices sharing
# the same ps_id will have identical values, so we dedup by value
# to avoid double-counting. In practice, only ONE device per ps_id
# stores station_power (collector-level dedup), but we guard here
# too by taking distinct values.
seen_powers = set()
for row in station_rows:
val = round(row[1] or 0, 1)
if val > 0:
seen_powers.add(val)
pv_power = sum(seen_powers)
else:
# Fallback: legacy method — group by device name prefix, MAX per group
from sqlalchemy import text as sa_text
pv_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.device_id.in_(pv_ids),
)
).group_by(sa_text("1"))
)
pv_power = sum(row[1] or 0 for row in pv_q.all())
# ── Heat pump power (same logic) ──
heatpump_power = 0
if hp_ids:
hp_q = await db.execute(
station_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
EnergyData.device_id,
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.data_type == "station_power",
EnergyData.device_id.in_(hp_ids),
)
).group_by(sa_text("1"))
).group_by(EnergyData.device_id)
)
heatpump_power = sum(row[1] or 0 for row in hp_q.all())
else:
heatpump_power = 0
station_rows = station_q.all()
if station_rows:
seen_powers = set()
for row in station_rows:
val = round(row[1] or 0, 1)
if val > 0:
seen_powers.add(val)
heatpump_power = sum(seen_powers)
else:
from sqlalchemy import text as sa_text
hp_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.device_id.in_(hp_ids),
)
).group_by(sa_text("1"))
)
heatpump_power = sum(row[1] or 0 for row in hp_q.all())
return {
"timestamp": str(now),

View File

@@ -48,6 +48,9 @@ class OrderCreate(BaseModel):
alarm_event_id: int | None = None
priority: str = "medium"
cost_estimate: float | None = None
order_type: str = "repair" # repair, inspection, meter_reading
station_name: str | None = None
due_date: str | None = None
class OrderUpdate(BaseModel):
@@ -57,6 +60,8 @@ class OrderUpdate(BaseModel):
status: str | None = None
resolution: str | None = None
actual_cost: float | None = None
order_type: str | None = None
station_name: str | None = None
class DutyCreate(BaseModel):
@@ -104,6 +109,9 @@ def _order_to_dict(o: RepairOrder) -> dict:
"assigned_at": str(o.assigned_at) if o.assigned_at else None,
"completed_at": str(o.completed_at) if o.completed_at else None,
"closed_at": str(o.closed_at) if o.closed_at else None,
"order_type": o.order_type,
"station_name": o.station_name,
"due_date": str(o.due_date) if o.due_date else None,
}
@@ -116,9 +124,11 @@ def _duty_to_dict(d: DutySchedule) -> dict:
}
def _generate_order_code() -> str:
def _generate_order_code(order_type: str = "repair") -> str:
now = datetime.now(timezone.utc)
return f"WO-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}"
prefix_map = {"repair": "XQ", "inspection": "XJ", "meter_reading": "CB"}
prefix = prefix_map.get(order_type, "WO")
return f"{prefix}-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}"
# ── Inspection Plans ────────────────────────────────────────────────
@@ -271,6 +281,7 @@ async def update_record(
async def list_orders(
status: str | None = None,
priority: str | None = None,
order_type: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
@@ -281,6 +292,8 @@ async def list_orders(
query = query.where(RepairOrder.status == status)
if priority:
query = query.where(RepairOrder.priority == priority)
if order_type:
query = query.where(RepairOrder.order_type == order_type)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
@@ -300,10 +313,12 @@ async def create_order(
user: User = Depends(get_current_user),
):
order = RepairOrder(
**data.model_dump(),
code=_generate_order_code(),
**data.model_dump(exclude={"due_date"}),
code=_generate_order_code(data.order_type),
created_by=user.id,
)
if data.due_date:
order.due_date = datetime.fromisoformat(data.due_date)
db.add(order)
await db.flush()
return _order_to_dict(order)

View File

@@ -20,6 +20,23 @@ DEFAULTS: dict[str, str] = {
"notification_email_smtp": "",
"report_auto_schedule_enabled": "false",
"timezone": "Asia/Shanghai",
# AI Model Settings
"ai_enabled": "false",
"ai_provider": "stepfun", # stepfun, zhipu
"ai_api_base_url": "https://api.stepfun.com/step_plan/v1",
"ai_api_key": "1UVGFlMG9zaGrRvRATpBNdjKotLio6x9t6lKRKdxwYD3mEkLU2Itb30yb1rvzWRGs",
"ai_model_name": "step-2-16k",
"ai_temperature": "0.7",
"ai_max_tokens": "2000",
"ai_context_length": "8000",
"ai_fallback_enabled": "true",
"ai_fallback_provider": "zhipu",
"ai_fallback_api_base_url": "https://open.bigmodel.cn/api/coding/paas/v4",
"ai_fallback_api_key": "0b5fe625dfd64836bfd42cc9608aed42.wnQngOvi7EkAWjyn",
"ai_fallback_model_name": "codegeex-4",
"ai_system_prompt": "你是一个专业的光伏电站智能运维助手。你的任务是分析光伏电站的设备运行数据、告警信息和历史趋势,提供专业的诊断分析和运维建议。请用中文回答,结构清晰,重点突出。",
"ai_diagnostic_prompt": "请分析以下光伏设备的运行数据,给出诊断报告:\n\n设备信息:{device_info}\n运行数据:{metrics}\n告警记录:{alarms}\n\n请按以下结构输出:\n## 运行概况\n## 问题诊断\n## 建议措施\n## 风险预警",
"ai_insight_prompt": "请根据以下电站运行数据,生成运营洞察报告:\n\n电站概况:{station_info}\n关键指标:{kpis}\n近期告警:{recent_alarms}\n\n请给出3-5条关键洞察和建议。",
}
@@ -32,6 +49,35 @@ class SettingsUpdate(BaseModel):
notification_email_smtp: str | None = None
report_auto_schedule_enabled: bool | None = None
timezone: str | None = None
ai_enabled: bool | None = None
ai_provider: str | None = None
ai_api_base_url: str | None = None
ai_api_key: str | None = None
ai_model_name: str | None = None
ai_temperature: float | None = None
ai_max_tokens: int | None = None
ai_context_length: int | None = None
ai_fallback_enabled: bool | None = None
ai_fallback_provider: str | None = None
ai_fallback_api_base_url: str | None = None
ai_fallback_api_key: str | None = None
ai_fallback_model_name: str | None = None
ai_system_prompt: str | None = None
ai_diagnostic_prompt: str | None = None
ai_insight_prompt: str | None = None
def _mask_key(key: str) -> str:
if not key or len(key) < 8:
return "****"
return "*" * (len(key) - 4) + key[-4:]
async def _get_raw_settings(db: AsyncSession) -> dict[str, str]:
"""Return merged settings dict WITHOUT masking (for internal use)."""
result = await db.execute(select(SystemSetting))
db_settings = {s.key: s.value for s in result.scalars().all()}
return {**DEFAULTS, **db_settings}
@router.get("")
@@ -40,10 +86,7 @@ async def get_settings(
user: User = Depends(get_current_user),
):
"""Return all platform settings as a flat dict."""
result = await db.execute(select(SystemSetting))
db_settings = {s.key: s.value for s in result.scalars().all()}
# Merge defaults with DB values
merged = {**DEFAULTS, **db_settings}
merged = await _get_raw_settings(db)
# Cast types for frontend
return {
"platform_name": merged["platform_name"],
@@ -54,6 +97,23 @@ async def get_settings(
"notification_email_smtp": merged["notification_email_smtp"],
"report_auto_schedule_enabled": merged["report_auto_schedule_enabled"] == "true",
"timezone": merged["timezone"],
# AI settings
"ai_enabled": merged["ai_enabled"] == "true",
"ai_provider": merged["ai_provider"],
"ai_api_base_url": merged["ai_api_base_url"],
"ai_api_key": _mask_key(merged["ai_api_key"]),
"ai_model_name": merged["ai_model_name"],
"ai_temperature": float(merged["ai_temperature"]),
"ai_max_tokens": int(merged["ai_max_tokens"]),
"ai_context_length": int(merged["ai_context_length"]),
"ai_fallback_enabled": merged["ai_fallback_enabled"] == "true",
"ai_fallback_provider": merged["ai_fallback_provider"],
"ai_fallback_api_base_url": merged["ai_fallback_api_base_url"],
"ai_fallback_api_key": _mask_key(merged["ai_fallback_api_key"]),
"ai_fallback_model_name": merged["ai_fallback_model_name"],
"ai_system_prompt": merged["ai_system_prompt"],
"ai_diagnostic_prompt": merged["ai_diagnostic_prompt"],
"ai_insight_prompt": merged["ai_insight_prompt"],
}
@@ -82,3 +142,15 @@ async def update_settings(
)
return {"message": "设置已更新"}
@router.post("/test-ai")
async def test_ai_connection(
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin")),
):
"""Test AI model connection."""
from app.services.llm_service import test_connection
settings = await _get_raw_settings(db)
result = await test_connection(settings)
return result

View File

@@ -0,0 +1,245 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from pydantic import BaseModel
from app.core.database import get_db
from app.core.deps import get_current_user, require_roles
from app.models.maintenance import SparePart, WarehouseTransaction
from app.models.user import User
router = APIRouter(prefix="/warehouse", tags=["仓库管理"])
# ── Pydantic Schemas ────────────────────────────────────────────────
class PartCreate(BaseModel):
name: str
code: str | None = None
category: str | None = None
unit: str | None = None
current_stock: int = 0
min_stock: int = 0
location: str | None = None
supplier: str | None = None
unit_price: float | None = None
specs: dict | None = None
notes: str | None = None
class PartUpdate(BaseModel):
name: str | None = None
code: str | None = None
category: str | None = None
unit: str | None = None
min_stock: int | None = None
location: str | None = None
supplier: str | None = None
unit_price: float | None = None
specs: dict | None = None
notes: str | None = None
class TransactionCreate(BaseModel):
spare_part_id: int
type: str # "in" or "out"
quantity: int
reason: str | None = None
related_order_id: int | None = None
operator_id: int | None = None
notes: str | None = None
# ── Helpers ─────────────────────────────────────────────────────────
def _part_to_dict(p: SparePart) -> dict:
return {
"id": p.id, "name": p.name, "code": p.code,
"category": p.category, "unit": p.unit,
"current_stock": p.current_stock, "min_stock": p.min_stock,
"location": p.location, "supplier": p.supplier,
"unit_price": p.unit_price, "specs": p.specs, "notes": p.notes,
"created_at": str(p.created_at) if p.created_at else None,
"updated_at": str(p.updated_at) if p.updated_at else None,
}
def _transaction_to_dict(t: WarehouseTransaction) -> dict:
return {
"id": t.id, "spare_part_id": t.spare_part_id,
"type": t.type, "quantity": t.quantity,
"reason": t.reason, "related_order_id": t.related_order_id,
"operator_id": t.operator_id, "notes": t.notes,
"created_at": str(t.created_at) if t.created_at else None,
}
# ── Spare Parts ────────────────────────────────────────────────────
@router.get("/parts")
async def list_parts(
category: str | None = None,
search: str | None = None,
low_stock_only: bool = False,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(SparePart)
if category:
query = query.where(SparePart.category == category)
if search:
query = query.where(SparePart.name.ilike(f"%{search}%"))
if low_stock_only:
query = query.where(SparePart.current_stock <= SparePart.min_stock)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(SparePart.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_part_to_dict(p) for p in result.scalars().all()],
}
@router.get("/parts/{part_id}")
async def get_part(
part_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(SparePart).where(SparePart.id == part_id))
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="备件不存在")
return _part_to_dict(part)
@router.post("/parts")
async def create_part(
data: PartCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
part = SparePart(**data.model_dump())
db.add(part)
await db.flush()
return _part_to_dict(part)
@router.put("/parts/{part_id}")
async def update_part(
part_id: int,
data: PartUpdate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(SparePart).where(SparePart.id == part_id))
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="备件不存在")
for k, v in data.model_dump(exclude_unset=True).items():
setattr(part, k, v)
return _part_to_dict(part)
# ── Warehouse Transactions ─────────────────────────────────────────
@router.get("/transactions")
async def list_transactions(
spare_part_id: int | None = None,
type: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(WarehouseTransaction)
if spare_part_id:
query = query.where(WarehouseTransaction.spare_part_id == spare_part_id)
if type:
query = query.where(WarehouseTransaction.type == type)
if start_date:
query = query.where(WarehouseTransaction.created_at >= datetime.fromisoformat(start_date))
if end_date:
query = query.where(WarehouseTransaction.created_at <= datetime.fromisoformat(end_date))
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(WarehouseTransaction.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_transaction_to_dict(t) for t in result.scalars().all()],
}
@router.post("/transactions")
async def create_transaction(
data: TransactionCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
# Fetch the spare part
result = await db.execute(select(SparePart).where(SparePart.id == data.spare_part_id))
part = result.scalar_one_or_none()
if not part:
raise HTTPException(status_code=404, detail="备件不存在")
# Validate stock for outbound
if data.type == "out" and part.current_stock < data.quantity:
raise HTTPException(status_code=400, detail=f"库存不足,当前库存: {part.current_stock}")
# Update stock
if data.type == "in":
part.current_stock += data.quantity
elif data.type == "out":
part.current_stock -= data.quantity
else:
raise HTTPException(status_code=400, detail="类型必须为 'in''out'")
txn = WarehouseTransaction(**data.model_dump())
if not txn.operator_id:
txn.operator_id = user.id
db.add(txn)
await db.flush()
return _transaction_to_dict(txn)
# ── Statistics ─────────────────────────────────────────────────────
@router.get("/stats")
async def warehouse_stats(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
# Total parts count
total_q = select(func.count()).select_from(SparePart)
total_parts = (await db.execute(total_q)).scalar() or 0
# Low stock alerts
low_q = select(func.count()).select_from(SparePart).where(
SparePart.current_stock <= SparePart.min_stock
)
low_stock_count = (await db.execute(low_q)).scalar() or 0
# Recent transactions (last 30 days)
from datetime import timedelta
cutoff = datetime.now(timezone.utc) - timedelta(days=30)
recent_q = select(func.count()).select_from(WarehouseTransaction).where(
WarehouseTransaction.created_at >= cutoff
)
recent_transactions = (await db.execute(recent_q)).scalar() or 0
return {
"total_parts": total_parts,
"low_stock_count": low_stock_count,
"recent_transactions": recent_transactions,
}

View File

@@ -0,0 +1,193 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from pydantic import BaseModel
from app.core.database import get_db
from app.core.deps import get_current_user, require_roles
from app.models.maintenance import MaintenanceWorkPlan, InspectionRecord, RepairOrder
from app.models.user import User
router = APIRouter(prefix="/work-plans", tags=["工作计划"])
# ── Pydantic Schemas ────────────────────────────────────────────────
class PlanCreate(BaseModel):
name: str
plan_type: str # "inspection", "maintenance", "repair"
description: str | None = None
station_name: str | None = None
schedule_type: str | None = None
schedule_cron: str | None = None
assigned_to: int | None = None
device_ids: list[int] | None = None
checklist: list[dict] | None = None
is_active: bool = True
next_run_at: str | None = None
notes: str | None = None
class PlanUpdate(BaseModel):
name: str | None = None
plan_type: str | None = None
description: str | None = None
station_name: str | None = None
schedule_type: str | None = None
schedule_cron: str | None = None
assigned_to: int | None = None
device_ids: list[int] | None = None
checklist: list[dict] | None = None
is_active: bool | None = None
next_run_at: str | None = None
notes: str | None = None
# ── Helpers ─────────────────────────────────────────────────────────
def _plan_to_dict(p: MaintenanceWorkPlan) -> dict:
return {
"id": p.id, "name": p.name, "plan_type": p.plan_type,
"description": p.description, "station_name": p.station_name,
"schedule_type": p.schedule_type, "schedule_cron": p.schedule_cron,
"assigned_to": p.assigned_to, "device_ids": p.device_ids,
"checklist": p.checklist, "is_active": p.is_active,
"next_run_at": str(p.next_run_at) if p.next_run_at else None,
"notes": p.notes, "created_by": p.created_by,
"created_at": str(p.created_at) if p.created_at else None,
"updated_at": str(p.updated_at) if p.updated_at else None,
}
def _generate_order_code() -> str:
now = datetime.now(timezone.utc)
return f"WO-{now.strftime('%Y%m%d')}-{now.strftime('%H%M%S')}"
# ── Work Plans CRUD ────────────────────────────────────────────────
@router.get("")
async def list_plans(
plan_type: str | None = None,
station_name: str | None = None,
is_active: bool | None = None,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
query = select(MaintenanceWorkPlan)
if plan_type:
query = query.where(MaintenanceWorkPlan.plan_type == plan_type)
if station_name:
query = query.where(MaintenanceWorkPlan.station_name == station_name)
if is_active is not None:
query = query.where(MaintenanceWorkPlan.is_active == is_active)
count_q = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_q)).scalar()
query = query.order_by(MaintenanceWorkPlan.id.desc()).offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
return {
"total": total,
"items": [_plan_to_dict(p) for p in result.scalars().all()],
}
@router.get("/{plan_id}")
async def get_plan(
plan_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
result = await db.execute(select(MaintenanceWorkPlan).where(MaintenanceWorkPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="工作计划不存在")
return _plan_to_dict(plan)
@router.post("")
async def create_plan(
data: PlanCreate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
plan = MaintenanceWorkPlan(
**data.model_dump(exclude={"next_run_at"}),
created_by=user.id,
)
if data.next_run_at:
plan.next_run_at = datetime.fromisoformat(data.next_run_at)
db.add(plan)
await db.flush()
return _plan_to_dict(plan)
@router.put("/{plan_id}")
async def update_plan(
plan_id: int,
data: PlanUpdate,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(MaintenanceWorkPlan).where(MaintenanceWorkPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="工作计划不存在")
for k, v in data.model_dump(exclude_unset=True, exclude={"next_run_at"}).items():
setattr(plan, k, v)
if data.next_run_at:
plan.next_run_at = datetime.fromisoformat(data.next_run_at)
return _plan_to_dict(plan)
@router.delete("/{plan_id}")
async def delete_plan(
plan_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
result = await db.execute(select(MaintenanceWorkPlan).where(MaintenanceWorkPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="工作计划不存在")
plan.is_active = False
return {"message": "已删除"}
@router.post("/{plan_id}/trigger")
async def trigger_plan(
plan_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(require_roles("admin", "energy_manager")),
):
"""手动触发工作计划,根据计划类型生成对应工单"""
result = await db.execute(select(MaintenanceWorkPlan).where(MaintenanceWorkPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="工作计划不存在")
if plan.plan_type == "inspection":
record = InspectionRecord(
plan_id=plan.id,
inspector_id=plan.assigned_to or user.id,
status="pending",
)
db.add(record)
await db.flush()
return {"type": "inspection", "id": record.id, "message": "已生成巡检记录"}
else:
order = RepairOrder(
title=f"[{plan.plan_type}] {plan.name}",
description=plan.description,
code=_generate_order_code(),
created_by=user.id,
assigned_to=plan.assigned_to,
status="open",
)
db.add(order)
await db.flush()
return {"type": "repair_order", "id": order.id, "message": "已生成工单"}

View File

@@ -87,6 +87,13 @@ class SungrowCollector(BaseCollector):
ps_data = await self._get_station_data()
if ps_data:
data.update(ps_data)
# Also store station-level power with a distinct data_type
# so dashboard can read the API total directly instead of
# computing from individual device readings.
if "power" in ps_data:
data["station_power"] = ps_data["power"]
if "daily_energy" in ps_data:
data["station_daily_energy"] = ps_data["daily_energy"]
self.logger.info(
"Station %s data: power=%.1f kW, daily=%.1f kWh",
self._ps_id,

View File

@@ -14,12 +14,16 @@ from app.models.charging import (
)
from app.models.quota import EnergyQuota, QuotaUsage
from app.models.pricing import ElectricityPricing, PricingPeriod
from app.models.maintenance import InspectionPlan, InspectionRecord, RepairOrder, DutySchedule
from app.models.maintenance import (
InspectionPlan, InspectionRecord, RepairOrder, DutySchedule,
Asset, AssetCategory, AssetChange, SparePart, WarehouseTransaction,
MaintenanceWorkPlan, BillingRecord,
)
from app.models.management import Regulation, Standard, ProcessDoc, EmergencyPlan
from app.models.prediction import PredictionTask, PredictionResult, OptimizationSchedule
from app.models.energy_strategy import TouPricing, TouPricingPeriod, EnergyStrategy, StrategyExecution, MonthlyCostReport
from app.models.weather import WeatherData, WeatherConfig
from app.models.ai_ops import DeviceHealthScore, AnomalyDetection, DiagnosticReport, MaintenancePrediction, OpsInsight
from app.models.ai_ops import DeviceHealthScore, AnomalyDetection, DiagnosticReport, MaintenancePrediction, OpsInsight, AIAnalysisResult
__all__ = [
"User", "Role", "AuditLog",
@@ -35,9 +39,12 @@ __all__ = [
"EnergyQuota", "QuotaUsage",
"ElectricityPricing", "PricingPeriod",
"InspectionPlan", "InspectionRecord", "RepairOrder", "DutySchedule",
"Asset", "AssetCategory", "AssetChange", "SparePart", "WarehouseTransaction",
"MaintenanceWorkPlan", "BillingRecord",
"Regulation", "Standard", "ProcessDoc", "EmergencyPlan",
"PredictionTask", "PredictionResult", "OptimizationSchedule",
"TouPricing", "TouPricingPeriod", "EnergyStrategy", "StrategyExecution", "MonthlyCostReport",
"WeatherData", "WeatherConfig",
"DeviceHealthScore", "AnomalyDetection", "DiagnosticReport", "MaintenancePrediction", "OpsInsight",
"AIAnalysisResult",
]

View File

@@ -86,3 +86,21 @@ class OpsInsight(Base):
generated_at = Column(DateTime(timezone=True), server_default=func.now())
valid_until = Column(DateTime(timezone=True))
created_at = Column(DateTime(timezone=True), server_default=func.now())
class AIAnalysisResult(Base):
"""AI智能分析结果"""
__tablename__ = "ai_analysis_results"
id = Column(Integer, primary_key=True, autoincrement=True)
scope = Column(String(20), default="station") # device, station, all
device_id = Column(Integer, ForeignKey("devices.id"), nullable=True)
analysis_type = Column(String(20), default="diagnostic") # diagnostic, insight, custom
prompt_used = Column(Text)
result_text = Column(Text)
model_used = Column(String(100))
provider_used = Column(String(20)) # stepfun, zhipu
tokens_used = Column(Integer)
duration_ms = Column(Integer)
created_by = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())

View File

@@ -40,9 +40,11 @@ class RepairOrder(Base):
id = Column(Integer, primary_key=True, autoincrement=True)
code = Column(String(50), unique=True, nullable=False) # WO-20260402-001
order_type = Column(String(20), default="repair") # repair(消缺), inspection(巡检), meter_reading(抄表)
title = Column(String(200), nullable=False)
description = Column(Text)
device_id = Column(Integer, ForeignKey("devices.id"))
station_name = Column(String(200))
alarm_event_id = Column(Integer, ForeignKey("alarm_events.id"))
priority = Column(String(20), default="medium") # critical, high, medium, low
status = Column(String(20), default="open") # open, assigned, in_progress, completed, verified, closed
@@ -55,6 +57,7 @@ class RepairOrder(Base):
assigned_at = Column(DateTime(timezone=True))
completed_at = Column(DateTime(timezone=True))
closed_at = Column(DateTime(timezone=True))
due_date = Column(DateTime(timezone=True)) # 要求完成时间
class DutySchedule(Base):
@@ -67,3 +70,137 @@ class DutySchedule(Base):
area_id = Column(Integer, ForeignKey("device_groups.id"))
notes = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class AssetCategory(Base):
__tablename__ = "asset_categories"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
parent_id = Column(Integer, ForeignKey("asset_categories.id"), nullable=True)
description = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class Asset(Base):
__tablename__ = "assets"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(200), nullable=False)
asset_code = Column(String(50), unique=True)
category_id = Column(Integer, ForeignKey("asset_categories.id"))
device_id = Column(Integer, ForeignKey("devices.id"), nullable=True)
station_name = Column(String(200))
manufacturer = Column(String(200))
model_number = Column(String(100))
serial_number = Column(String(100))
purchase_date = Column(DateTime(timezone=True))
warranty_expiry = Column(DateTime(timezone=True))
purchase_price = Column(Float)
status = Column(String(20), default="active")
location = Column(String(200))
responsible_dept = Column(String(100))
custodian = Column(String(100))
supplier = Column(String(200))
notes = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
class AssetChange(Base):
__tablename__ = "asset_changes"
id = Column(Integer, primary_key=True, autoincrement=True)
asset_id = Column(Integer, ForeignKey("assets.id"), nullable=False)
change_type = Column(String(20))
change_date = Column(DateTime(timezone=True))
description = Column(Text)
old_value = Column(JSON)
new_value = Column(JSON)
operator_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
class SparePart(Base):
__tablename__ = "spare_parts"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(200), nullable=False)
part_code = Column(String(50), unique=True)
category = Column(String(100))
specification = Column(String(200))
unit = Column(String(20))
current_stock = Column(Integer, default=0)
min_stock = Column(Integer, default=0)
max_stock = Column(Integer)
warehouse_location = Column(String(100))
unit_price = Column(Float)
supplier = Column(String(200))
notes = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
class WarehouseTransaction(Base):
__tablename__ = "warehouse_transactions"
id = Column(Integer, primary_key=True, autoincrement=True)
spare_part_id = Column(Integer, ForeignKey("spare_parts.id"), nullable=False)
transaction_type = Column(String(20))
quantity = Column(Integer, nullable=False)
unit_price = Column(Float)
total_price = Column(Float)
transaction_date = Column(DateTime(timezone=True))
work_order_id = Column(Integer, ForeignKey("repair_orders.id"), nullable=True)
reason = Column(String(200))
operator_id = Column(Integer, ForeignKey("users.id"))
notes = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
class MaintenanceWorkPlan(Base):
__tablename__ = "maintenance_work_plans"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(200), nullable=False)
plan_type = Column(String(20))
station_name = Column(String(200))
device_ids = Column(JSON)
cycle_period = Column(String(20))
execution_days = Column(Integer)
effective_start = Column(DateTime(timezone=True))
effective_end = Column(DateTime(timezone=True))
description = Column(Text)
workflow_config = Column(JSON)
is_active = Column(Boolean, default=True)
created_by = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
class BillingRecord(Base):
__tablename__ = "billing_records"
id = Column(Integer, primary_key=True, autoincrement=True)
station_name = Column(String(200))
billing_type = Column(String(20))
billing_period_start = Column(DateTime(timezone=True))
billing_period_end = Column(DateTime(timezone=True))
generation_kwh = Column(Float)
self_use_kwh = Column(Float)
grid_feed_kwh = Column(Float)
electricity_price = Column(Float)
self_use_price = Column(Float)
feed_in_tariff = Column(Float)
total_amount = Column(Float)
self_use_amount = Column(Float)
feed_in_amount = Column(Float)
subsidy_amount = Column(Float)
status = Column(String(20), default="pending")
invoice_number = Column(String(100))
invoice_date = Column(DateTime(timezone=True))
attachment_url = Column(String(500))
notes = Column(Text)
created_by = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

View File

@@ -0,0 +1,107 @@
"""LLM service for AI-powered analysis using OpenAI-compatible APIs (StepFun, ZhipuAI)."""
import logging
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
async def get_llm_client(settings: dict, use_fallback: bool = False) -> tuple[AsyncOpenAI, str]:
"""Create an OpenAI-compatible client based on settings.
Returns (client, model_name) tuple.
"""
if use_fallback:
base_url = settings.get("ai_fallback_api_base_url", "")
api_key = settings.get("ai_fallback_api_key", "")
model = settings.get("ai_fallback_model_name", "codegeex-4")
else:
base_url = settings.get("ai_api_base_url", "")
api_key = settings.get("ai_api_key", "")
model = settings.get("ai_model_name", "step-2-16k")
client = AsyncOpenAI(base_url=base_url, api_key=api_key, timeout=30.0)
return client, model
async def chat_completion(
messages: list[dict],
settings: dict,
temperature: float | None = None,
max_tokens: int | None = None,
) -> str:
"""Send a chat completion request with automatic fallback."""
temp = temperature or float(settings.get("ai_temperature", "0.7"))
tokens = max_tokens or int(settings.get("ai_max_tokens", "2000"))
# Try primary
try:
client, model = await get_llm_client(settings, use_fallback=False)
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=temp,
max_tokens=tokens,
)
return response.choices[0].message.content or ""
except Exception as e:
logger.warning(f"Primary LLM failed: {e}")
# Try fallback if enabled
if settings.get("ai_fallback_enabled") == "true":
try:
client, model = await get_llm_client(settings, use_fallback=True)
response = await client.chat.completions.create(
model=model,
messages=messages,
temperature=temp,
max_tokens=tokens,
)
return response.choices[0].message.content or ""
except Exception as e2:
logger.error(f"Fallback LLM also failed: {e2}")
raise Exception(f"All LLM providers failed. Primary: {e}, Fallback: {e2}")
else:
raise
async def test_connection(settings: dict) -> dict:
"""Test connection to the configured LLM provider."""
results = {"primary": {"status": "unknown"}, "fallback": {"status": "unknown"}}
# Test primary
try:
client, model = await get_llm_client(settings, use_fallback=False)
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": "你好,请回复'连接成功'"}],
max_tokens=20,
temperature=0,
)
reply = response.choices[0].message.content or ""
results["primary"] = {
"status": "success",
"model": model,
"reply": reply[:100],
}
except Exception as e:
results["primary"] = {"status": "error", "error": str(e)[:200]}
# Test fallback
if settings.get("ai_fallback_enabled") == "true":
try:
client, model = await get_llm_client(settings, use_fallback=True)
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": "你好,请回复'连接成功'"}],
max_tokens=20,
temperature=0,
)
reply = response.choices[0].message.content or ""
results["fallback"] = {
"status": "success",
"model": model,
"reply": reply[:100],
}
except Exception as e:
results["fallback"] = {"status": "error", "error": str(e)[:200]}
return results