Files
tp-ems/core/backend/app/services/ai_ops.py

1017 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI运维智能体服务 - 设备健康评分、异常检测、诊断智能、预测性维护、运营洞察
Inspired by Envision's "构网智能体" concept: self-sensing, self-adapting, self-evolving
intelligent agents for energy asset management.
"""
import logging
import random
import math
from datetime import datetime, timezone, timedelta
from sqlalchemy import select, func, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.device import Device
from app.models.energy import EnergyData, EnergyDailySummary
from app.models.alarm import AlarmEvent
from app.models.ai_ops import (
DeviceHealthScore, AnomalyDetection, DiagnosticReport,
MaintenancePrediction, OpsInsight,
)
logger = logging.getLogger("ai_ops")
# ── Device type configurations ──────────────────────────────────────
DEVICE_RATED_EFFICIENCY = {
"pv_inverter": {"metric": "power", "rated_cop": None, "temp_range": (20, 60)},
"heat_pump": {"metric": "cop", "rated_cop": 3.5, "temp_range": (30, 55)},
"meter": {"metric": "power_factor", "rated_cop": None, "temp_range": None},
"sensor": {"metric": "temperature", "rated_cop": None, "temp_range": (18, 28)},
"heat_meter": {"metric": "heat_power", "rated_cop": None, "temp_range": None},
}
HEALTH_WEIGHTS = {
"power_stability": 0.20,
"efficiency": 0.25,
"alarm_frequency": 0.20,
"uptime": 0.20,
"temperature": 0.15,
}
# ── Health Score Calculation ────────────────────────────────────────
async def calculate_device_health(
session: AsyncSession, device: Device, now: datetime | None = None
) -> DeviceHealthScore:
"""Calculate health score (0-100) for a single device based on weighted factors."""
now = now or datetime.now(timezone.utc)
factors = {}
# Factor 1: Power output stability (std_dev of power over last 24h)
factors["power_stability"] = await _calc_power_stability(session, device.id, now)
# Factor 2: Efficiency / COP
factors["efficiency"] = await _calc_efficiency_score(session, device, now)
# Factor 3: Alarm frequency (last 7 days)
factors["alarm_frequency"] = await _calc_alarm_frequency_score(session, device.id, now)
# Factor 4: Uptime (last 30 days)
factors["uptime"] = await _calc_uptime_score(session, device, now)
# Factor 5: Temperature
factors["temperature"] = await _calc_temperature_score(session, device, now)
# Weighted average
health_score = sum(
factors[k] * HEALTH_WEIGHTS[k] for k in HEALTH_WEIGHTS
)
health_score = max(0, min(100, round(health_score, 1)))
# Status
if health_score >= 80:
status = "healthy"
elif health_score >= 60:
status = "warning"
else:
status = "critical"
# Trend: compare with last score
trend = await _calc_trend(session, device.id, health_score)
score = DeviceHealthScore(
device_id=device.id,
timestamp=now,
health_score=health_score,
status=status,
factors=factors,
trend=trend,
)
session.add(score)
return score
async def _calc_power_stability(session: AsyncSession, device_id: int, now: datetime) -> float:
"""Power stability score: low std_dev = high score."""
cutoff = now - timedelta(hours=24)
result = await session.execute(
select(
func.avg(EnergyData.value).label("avg_val"),
func.stddev(EnergyData.value).label("std_val"),
).where(and_(
EnergyData.device_id == device_id,
EnergyData.data_type == "power",
EnergyData.timestamp >= cutoff,
))
)
row = result.one_or_none()
if not row or row.avg_val is None or row.avg_val == 0:
return 85.0 # no data = assume OK
avg_val = float(row.avg_val)
std_val = float(row.std_val or 0)
cv = std_val / avg_val if avg_val > 0 else 0 # coefficient of variation
# cv < 0.1 = 100, cv > 0.5 = 40
score = max(40, min(100, 100 - (cv - 0.1) * 150))
return round(score, 1)
async def _calc_efficiency_score(session: AsyncSession, device: Device, now: datetime) -> float:
"""Efficiency score based on device type."""
cutoff = now - timedelta(hours=24)
if device.device_type == "heat_pump":
result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "cop",
EnergyData.timestamp >= cutoff,
))
)
avg_cop = result.scalar()
if avg_cop is None:
return 85.0
rated_cop = 3.5
ratio = float(avg_cop) / rated_cop
return round(max(30, min(100, ratio * 100)), 1)
elif device.device_type == "pv_inverter":
rated_power = device.rated_power or 110.0
result = await session.execute(
select(func.max(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power",
EnergyData.timestamp >= cutoff,
))
)
max_power = result.scalar()
if max_power is None:
return 85.0
ratio = float(max_power) / rated_power
return round(max(30, min(100, ratio * 110)), 1)
elif device.device_type == "meter":
result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power_factor",
EnergyData.timestamp >= cutoff,
))
)
avg_pf = result.scalar()
if avg_pf is None:
return 85.0
return round(max(40, min(100, float(avg_pf) * 105)), 1)
return 85.0
async def _calc_alarm_frequency_score(session: AsyncSession, device_id: int, now: datetime) -> float:
"""Fewer alarms = higher score. 0 alarms = 100, 10+ = 30."""
cutoff = now - timedelta(days=7)
result = await session.execute(
select(func.count(AlarmEvent.id)).where(and_(
AlarmEvent.device_id == device_id,
AlarmEvent.triggered_at >= cutoff,
))
)
count = result.scalar() or 0
score = max(30, 100 - count * 7)
return float(score)
async def _calc_uptime_score(session: AsyncSession, device: Device, now: datetime) -> float:
"""Uptime based on device status and data availability."""
cutoff = now - timedelta(days=7)
result = await session.execute(
select(func.count(EnergyData.id)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power",
EnergyData.timestamp >= cutoff,
))
)
data_count = result.scalar() or 0
# Expected ~4 readings/min * 60 * 24 * 7 = ~40320 (at 15s interval)
expected = 7 * 24 * 60 * 4
ratio = min(1.0, data_count / max(1, expected))
# Also check current status
status_penalty = 0
if device.status == "offline":
status_penalty = 15
elif device.status == "alarm":
status_penalty = 5
return round(max(30, ratio * 100 - status_penalty), 1)
async def _calc_temperature_score(session: AsyncSession, device: Device, now: datetime) -> float:
"""Temperature within normal range = high score."""
cfg = DEVICE_RATED_EFFICIENCY.get(device.device_type, {})
temp_range = cfg.get("temp_range")
if not temp_range:
return 90.0 # N/A for this device type
cutoff = now - timedelta(hours=6)
result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "temperature",
EnergyData.timestamp >= cutoff,
))
)
avg_temp = result.scalar()
if avg_temp is None:
return 85.0
avg_temp = float(avg_temp)
low, high = temp_range
if low <= avg_temp <= high:
return 100.0
deviation = max(0, avg_temp - high, low - avg_temp)
return round(max(20, 100 - deviation * 5), 1)
async def _calc_trend(session: AsyncSession, device_id: int, current_score: float) -> str:
"""Compare with recent scores to determine trend."""
result = await session.execute(
select(DeviceHealthScore.health_score)
.where(DeviceHealthScore.device_id == device_id)
.order_by(DeviceHealthScore.timestamp.desc())
.limit(5)
)
scores = [float(r) for r in result.scalars().all()]
if len(scores) < 2:
return "stable"
avg_prev = sum(scores) / len(scores)
diff = current_score - avg_prev
if diff > 3:
return "improving"
elif diff < -3:
return "degrading"
return "stable"
# ── Anomaly Detection ───────────────────────────────────────────────
async def scan_anomalies(session: AsyncSession, device_id: int | None = None) -> list[AnomalyDetection]:
"""Scan for anomalies across devices using Z-score and pattern-based methods."""
now = datetime.now(timezone.utc)
anomalies = []
query = select(Device).where(Device.is_active == True)
if device_id:
query = query.where(Device.id == device_id)
result = await session.execute(query)
devices = result.scalars().all()
for device in devices:
# Z-score based detection
device_anomalies = await _zscore_detection(session, device, now)
anomalies.extend(device_anomalies)
# Pattern-based detection
pattern_anomalies = await _pattern_detection(session, device, now)
anomalies.extend(pattern_anomalies)
for a in anomalies:
session.add(a)
return anomalies
async def _zscore_detection(
session: AsyncSession, device: Device, now: datetime
) -> list[AnomalyDetection]:
"""Detect anomalies using Z-score method (> 3 sigma)."""
anomalies = []
metrics = ["power"]
if device.device_type == "heat_pump":
metrics.append("cop")
if device.device_type == "sensor":
metrics = ["temperature"]
for metric in metrics:
# Get rolling stats from last 24h
cutoff_stats = now - timedelta(hours=24)
stats_result = await session.execute(
select(
func.avg(EnergyData.value).label("avg_val"),
func.stddev(EnergyData.value).label("std_val"),
).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == metric,
EnergyData.timestamp >= cutoff_stats,
))
)
stats = stats_result.one_or_none()
if not stats or stats.avg_val is None or stats.std_val is None or float(stats.std_val) == 0:
continue
avg_val = float(stats.avg_val)
std_val = float(stats.std_val)
# Check latest value
latest_result = await session.execute(
select(EnergyData.value).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == metric,
)).order_by(EnergyData.timestamp.desc()).limit(1)
)
latest = latest_result.scalar()
if latest is None:
continue
latest = float(latest)
z_score = abs(latest - avg_val) / std_val
if z_score > 3:
deviation_pct = round(abs(latest - avg_val) / avg_val * 100, 1) if avg_val != 0 else 0
severity = "critical" if z_score > 5 else "warning"
anomaly_type = _classify_anomaly(device.device_type, metric, latest, avg_val)
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type=anomaly_type,
severity=severity,
description=f"{metric} 异常: 当前值 {latest:.2f}, 均值 {avg_val:.2f}, Z-score {z_score:.1f}",
metric_name=metric,
expected_value=round(avg_val, 2),
actual_value=round(latest, 2),
deviation_percent=deviation_pct,
status="detected",
))
return anomalies
def _classify_anomaly(device_type: str, metric: str, actual: float, expected: float) -> str:
"""Classify anomaly type based on device type and metric."""
if metric == "power" and actual < expected:
return "power_drop"
if metric == "cop" and actual < expected:
return "efficiency_loss"
if metric == "temperature":
return "abnormal_temperature"
return "pattern_deviation"
async def _pattern_detection(
session: AsyncSession, device: Device, now: datetime
) -> list[AnomalyDetection]:
"""Pattern-based anomaly detection for specific device types."""
anomalies = []
# Check for communication loss (no data in last 5 minutes)
cutoff = now - timedelta(minutes=5)
result = await session.execute(
select(func.count(EnergyData.id)).where(and_(
EnergyData.device_id == device.id,
EnergyData.timestamp >= cutoff,
))
)
count = result.scalar() or 0
if count == 0 and device.status == "online":
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type="communication_loss",
severity="warning",
description=f"设备 {device.name} 超过5分钟无数据上报",
metric_name="data_availability",
expected_value=1.0,
actual_value=0.0,
deviation_percent=100.0,
status="detected",
))
# PV specific: power drop during daytime (8:00-17:00 Beijing time)
if device.device_type == "pv_inverter":
beijing_hour = (now + timedelta(hours=8)).hour
if 8 <= beijing_hour <= 17:
latest_result = await session.execute(
select(EnergyData.value).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power",
)).order_by(EnergyData.timestamp.desc()).limit(1)
)
power = latest_result.scalar()
rated = device.rated_power or 110.0
if power is not None and float(power) < rated * 0.05 and beijing_hour >= 9 and beijing_hour <= 16:
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type="power_drop",
severity="warning",
description=f"光伏 {device.name} 在日照时段功率异常偏低: {power:.1f} kW",
metric_name="power",
expected_value=round(rated * 0.3, 2),
actual_value=round(float(power), 2),
deviation_percent=round((1 - float(power) / (rated * 0.3)) * 100, 1),
status="detected",
))
# Heat pump: COP degradation
if device.device_type == "heat_pump":
latest_result = await session.execute(
select(EnergyData.value).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "cop",
)).order_by(EnergyData.timestamp.desc()).limit(1)
)
cop = latest_result.scalar()
if cop is not None and float(cop) < 2.0:
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type="efficiency_loss",
severity="warning" if float(cop) >= 1.5 else "critical",
description=f"热泵 {device.name} COP降至 {cop:.2f},低于正常水平",
metric_name="cop",
expected_value=3.5,
actual_value=round(float(cop), 2),
deviation_percent=round((1 - float(cop) / 3.5) * 100, 1),
status="detected",
))
return anomalies
# ── Diagnostic Intelligence ─────────────────────────────────────────
async def run_diagnostics(
session: AsyncSession, device_id: int, report_type: str = "triggered"
) -> DiagnosticReport:
"""Run diagnostic analysis for a device and generate report."""
now = datetime.now(timezone.utc)
result = await session.execute(select(Device).where(Device.id == device_id))
device = result.scalar_one_or_none()
if not device:
raise ValueError(f"Device {device_id} not found")
findings = []
recommendations = []
energy_loss_kwh = 0.0
cost_impact_yuan = 0.0
# Check recent anomalies
cutoff = now - timedelta(days=7)
anomaly_result = await session.execute(
select(AnomalyDetection).where(and_(
AnomalyDetection.device_id == device_id,
AnomalyDetection.detected_at >= cutoff,
)).order_by(AnomalyDetection.detected_at.desc())
)
anomalies = anomaly_result.scalars().all()
# Check alarm history
alarm_result = await session.execute(
select(AlarmEvent).where(and_(
AlarmEvent.device_id == device_id,
AlarmEvent.triggered_at >= cutoff,
))
)
alarms = alarm_result.scalars().all()
# Get current health
health_result = await session.execute(
select(DeviceHealthScore).where(
DeviceHealthScore.device_id == device_id
).order_by(DeviceHealthScore.timestamp.desc()).limit(1)
)
health = health_result.scalar_one_or_none()
# Generate findings based on device type
if device.device_type == "pv_inverter":
findings, recommendations, energy_loss_kwh, cost_impact_yuan = \
await _diagnose_pv(session, device, anomalies, alarms, health, now)
elif device.device_type == "heat_pump":
findings, recommendations, energy_loss_kwh, cost_impact_yuan = \
await _diagnose_heat_pump(session, device, anomalies, alarms, health, now)
else:
findings, recommendations, energy_loss_kwh, cost_impact_yuan = \
_diagnose_general(device, anomalies, alarms, health)
report = DiagnosticReport(
device_id=device_id,
generated_at=now,
report_type=report_type,
findings=findings,
recommendations=recommendations,
estimated_impact={"energy_loss_kwh": round(energy_loss_kwh, 2), "cost_impact_yuan": round(cost_impact_yuan, 2)},
status="generated",
)
session.add(report)
return report
async def _diagnose_pv(session, device, anomalies, alarms, health, now):
"""Diagnostic logic for PV inverters."""
findings = []
recommendations = []
energy_loss = 0.0
cost_impact = 0.0
# Check temperature
cutoff = now - timedelta(hours=6)
temp_result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "temperature",
EnergyData.timestamp >= cutoff,
))
)
avg_temp = temp_result.scalar()
power_anomalies = [a for a in anomalies if a.anomaly_type == "power_drop"]
temp_anomalies = [a for a in anomalies if a.anomaly_type == "abnormal_temperature"]
if power_anomalies and avg_temp and float(avg_temp) > 55:
findings.append({
"finding": "光伏逆变器功率下降伴随高温",
"severity": "warning",
"detail": f"平均温度 {float(avg_temp):.1f}°C高于正常范围。功率异常 {len(power_anomalies)}",
})
recommendations.append({
"action": "清洁光伏面板并检查通风散热",
"priority": "high",
"detail": "高温导致逆变器降额运行,建议清洁面板、检查散热风扇",
})
energy_loss = len(power_anomalies) * 5.0
cost_impact = energy_loss * 0.6
elif power_anomalies:
findings.append({
"finding": "光伏功率间歇性下降",
"severity": "info",
"detail": f"近7天检测到 {len(power_anomalies)} 次功率下降",
})
recommendations.append({
"action": "检查光伏面板遮挡和接线",
"priority": "medium",
"detail": "排除树木遮挡、面板污染或接线松动",
})
if alarms:
findings.append({
"finding": f"近7天触发 {len(alarms)} 条告警",
"severity": "warning" if len(alarms) > 3 else "info",
"detail": "频繁告警可能指示设备劣化",
})
if health and health.health_score < 70:
findings.append({
"finding": f"设备健康评分偏低: {health.health_score}",
"severity": "warning",
"detail": f"当前趋势: {health.trend}",
})
recommendations.append({
"action": "安排专项巡检",
"priority": "high",
"detail": "建议安排技术人员进行全面检查",
})
if not findings:
findings.append({
"finding": "设备运行状态良好",
"severity": "info",
"detail": "未发现明显异常",
})
return findings, recommendations, energy_loss, cost_impact
async def _diagnose_heat_pump(session, device, anomalies, alarms, health, now):
"""Diagnostic logic for heat pumps."""
findings = []
recommendations = []
energy_loss = 0.0
cost_impact = 0.0
# Check outdoor temperature for context
cutoff = now - timedelta(hours=6)
cop_anomalies = [a for a in anomalies if a.anomaly_type == "efficiency_loss"]
# Check outdoor temp via sensors
outdoor_result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.data_type == "outdoor_temp",
EnergyData.timestamp >= cutoff,
))
)
outdoor_temp = outdoor_result.scalar()
if cop_anomalies and outdoor_temp and float(outdoor_temp) < -5:
findings.append({
"finding": "热泵COP下降与低温天气相关",
"severity": "info",
"detail": f"室外温度 {float(outdoor_temp):.1f}°CCOP降额属正常现象",
})
recommendations.append({
"action": "调整运行策略以适应低温",
"priority": "low",
"detail": "低温环境下COP降低属正常特性可适当调整运行时段",
})
elif cop_anomalies:
findings.append({
"finding": "热泵能效异常下降",
"severity": "warning",
"detail": f"检测到 {len(cop_anomalies)} 次COP异常",
})
recommendations.append({
"action": "检查冷媒充注量和换热器",
"priority": "high",
"detail": "COP异常降低可能由冷媒泄漏或换热器结垢导致",
})
energy_loss = len(cop_anomalies) * 8.0
cost_impact = energy_loss * 0.6
comm_anomalies = [a for a in anomalies if a.anomaly_type == "communication_loss"]
if comm_anomalies:
findings.append({
"finding": f"通讯中断 {len(comm_anomalies)}",
"severity": "warning",
"detail": "频繁通讯丢失需检查网络和DTU设备",
})
recommendations.append({
"action": "检查通讯网络和DTU",
"priority": "medium",
"detail": "检查DTU设备状态、网线连接和信号强度",
})
if health and health.health_score < 70:
findings.append({
"finding": f"设备健康评分偏低: {health.health_score}",
"severity": "warning",
"detail": f"当前趋势: {health.trend}",
})
if not findings:
findings.append({
"finding": "设备运行状态良好",
"severity": "info",
"detail": "未发现明显异常",
})
return findings, recommendations, energy_loss, cost_impact
def _diagnose_general(device, anomalies, alarms, health):
"""Generic diagnostic for other device types."""
findings = []
recommendations = []
energy_loss = 0.0
cost_impact = 0.0
if anomalies:
by_type = {}
for a in anomalies:
by_type.setdefault(a.anomaly_type, []).append(a)
for atype, items in by_type.items():
findings.append({
"finding": f"检测到 {len(items)}{atype} 异常",
"severity": "warning" if len(items) > 2 else "info",
"detail": items[0].description if items else "",
})
if alarms:
findings.append({
"finding": f"近7天触发 {len(alarms)} 条告警",
"severity": "warning" if len(alarms) > 3 else "info",
"detail": "频繁告警需要关注",
})
if health and health.health_score < 70:
recommendations.append({
"action": "安排设备巡检",
"priority": "high",
"detail": f"健康评分 {health.health_score},趋势 {health.trend}",
})
if not findings:
findings.append({
"finding": "设备运行正常",
"severity": "info",
"detail": "未发现异常",
})
return findings, recommendations, energy_loss, cost_impact
# ── Predictive Maintenance ──────────────────────────────────────────
async def generate_maintenance_predictions(session: AsyncSession) -> list[MaintenancePrediction]:
"""Generate maintenance predictions based on health trends and patterns."""
now = datetime.now(timezone.utc)
predictions = []
result = await session.execute(select(Device).where(Device.is_active == True))
devices = result.scalars().all()
for device in devices:
# Get recent health scores
health_result = await session.execute(
select(DeviceHealthScore).where(
DeviceHealthScore.device_id == device.id
).order_by(DeviceHealthScore.timestamp.desc()).limit(10)
)
scores = health_result.scalars().all()
if not scores:
continue
latest = scores[0]
# Rule: health score < 60 and degrading
if latest.health_score < 60 and latest.trend == "degrading":
days_to_failure = max(3, int((latest.health_score - 20) / 5))
predictions.append(MaintenancePrediction(
device_id=device.id,
predicted_at=now,
component=_get_weak_component(latest.factors),
failure_mode="设备性能持续下降,可能导致故障停机",
probability=round(min(0.9, (100 - latest.health_score) / 100 + 0.2), 2),
predicted_failure_date=now + timedelta(days=days_to_failure),
recommended_action="安排全面检修,重点检查薄弱环节",
urgency="critical" if latest.health_score < 40 else "high",
estimated_downtime_hours=4.0 if device.device_type in ("heat_pump", "pv_inverter") else 2.0,
estimated_repair_cost=_estimate_repair_cost(device.device_type),
status="predicted",
))
# Rule: health between 60-75 and degrading
elif 60 <= latest.health_score < 75 and latest.trend == "degrading":
predictions.append(MaintenancePrediction(
device_id=device.id,
predicted_at=now,
component=_get_weak_component(latest.factors),
failure_mode="性能下降趋势,需预防性维护",
probability=round(min(0.6, (80 - latest.health_score) / 100 + 0.1), 2),
predicted_failure_date=now + timedelta(days=14),
recommended_action="安排预防性巡检和维护",
urgency="medium",
estimated_downtime_hours=2.0,
estimated_repair_cost=_estimate_repair_cost(device.device_type) * 0.5,
status="predicted",
))
# Rule: check alarm frequency
alarm_score = latest.factors.get("alarm_frequency", 100) if latest.factors else 100
if alarm_score < 50:
predictions.append(MaintenancePrediction(
device_id=device.id,
predicted_at=now,
component="告警系统/传感器",
failure_mode="频繁告警可能预示设备故障",
probability=0.4,
predicted_failure_date=now + timedelta(days=7),
recommended_action="排查告警根因,检查传感器和控制系统",
urgency="medium",
estimated_downtime_hours=1.0,
estimated_repair_cost=500.0,
status="predicted",
))
for p in predictions:
session.add(p)
return predictions
def _get_weak_component(factors: dict | None) -> str:
"""Identify the weakest factor as the component needing attention."""
if not factors:
return "综合"
component_map = {
"power_stability": "电力输出系统",
"efficiency": "能效/换热系统",
"alarm_frequency": "监控传感器",
"uptime": "通讯/控制系统",
"temperature": "散热/温控系统",
}
weakest = min(factors, key=lambda k: factors.get(k, 100))
return component_map.get(weakest, "综合")
def _estimate_repair_cost(device_type: str) -> float:
"""Estimate repair cost by device type."""
costs = {
"pv_inverter": 3000.0,
"heat_pump": 5000.0,
"meter": 800.0,
"sensor": 300.0,
"heat_meter": 1500.0,
}
return costs.get(device_type, 1000.0)
# ── Operational Insights ────────────────────────────────────────────
async def generate_insights(session: AsyncSession) -> list[OpsInsight]:
"""Generate operational insights from data analysis."""
now = datetime.now(timezone.utc)
insights = []
# Insight 1: Device efficiency comparison
result = await session.execute(
select(
DeviceHealthScore.device_id,
func.avg(DeviceHealthScore.health_score).label("avg_score"),
).where(
DeviceHealthScore.timestamp >= now - timedelta(days=7)
).group_by(DeviceHealthScore.device_id)
)
scores_by_device = result.all()
if scores_by_device:
scores = [float(s.avg_score) for s in scores_by_device]
avg_all = sum(scores) / len(scores) if scores else 0
low_performers = [s for s in scores_by_device if float(s.avg_score) < avg_all - 10]
if low_performers:
device_ids = [s.device_id for s in low_performers]
dev_result = await session.execute(
select(Device.id, Device.name).where(Device.id.in_(device_ids))
)
device_names = {r.id: r.name for r in dev_result.all()}
insights.append(OpsInsight(
insight_type="performance_comparison",
title="部分设备健康评分低于平均水平",
description=f"以下设备健康评分低于园区平均值({avg_all:.0f})超过10分: "
+ ", ".join(device_names.get(d, f"#{d}") for d in device_ids),
data={"avg_score": round(avg_all, 1), "low_performers": [
{"device_id": s.device_id, "score": round(float(s.avg_score), 1),
"name": device_names.get(s.device_id, f"#{s.device_id}")}
for s in low_performers
]},
impact_level="medium" if len(low_performers) <= 2 else "high",
actionable=True,
recommended_action="重点关注低评分设备,安排巡检和维护",
generated_at=now,
valid_until=now + timedelta(days=7),
))
# Insight 2: Anomaly trend
week_ago = now - timedelta(days=7)
two_weeks_ago = now - timedelta(days=14)
this_week = await session.execute(
select(func.count(AnomalyDetection.id)).where(
AnomalyDetection.detected_at >= week_ago
)
)
last_week = await session.execute(
select(func.count(AnomalyDetection.id)).where(and_(
AnomalyDetection.detected_at >= two_weeks_ago,
AnomalyDetection.detected_at < week_ago,
))
)
this_count = this_week.scalar() or 0
last_count = last_week.scalar() or 0
if this_count > last_count * 1.5 and this_count > 3:
insights.append(OpsInsight(
insight_type="efficiency_trend",
title="异常检测数量环比上升",
description=f"本周检测到 {this_count} 次异常,上周 {last_count} 次,增长 {((this_count/max(1,last_count))-1)*100:.0f}%",
data={"this_week": this_count, "last_week": last_count},
impact_level="high",
actionable=True,
recommended_action="建议全面排查设备状态,加强巡检频次",
generated_at=now,
valid_until=now + timedelta(days=3),
))
# Insight 3: Energy cost optimization
insights.append(OpsInsight(
insight_type="cost_anomaly",
title="能源使用效率周报",
description="园区整体运行状况评估",
data={
"total_devices": len(scores_by_device) if scores_by_device else 0,
"avg_health": round(avg_all, 1) if scores_by_device else 0,
"anomaly_count": this_count,
},
impact_level="low",
actionable=False,
generated_at=now,
valid_until=now + timedelta(days=7),
))
for i in insights:
session.add(i)
return insights
# ── Dashboard Aggregation ───────────────────────────────────────────
async def get_dashboard_data(session: AsyncSession) -> dict:
"""Get AI Ops dashboard overview data."""
now = datetime.now(timezone.utc)
# Latest health scores per device
subq = (
select(
DeviceHealthScore.device_id,
func.max(DeviceHealthScore.timestamp).label("max_ts"),
).group_by(DeviceHealthScore.device_id).subquery()
)
health_result = await session.execute(
select(DeviceHealthScore).join(
subq, and_(
DeviceHealthScore.device_id == subq.c.device_id,
DeviceHealthScore.timestamp == subq.c.max_ts,
)
)
)
health_scores = health_result.scalars().all()
# Get device names
device_ids = [h.device_id for h in health_scores]
if device_ids:
dev_result = await session.execute(
select(Device.id, Device.name, Device.device_type).where(Device.id.in_(device_ids))
)
device_map = {r.id: {"name": r.name, "type": r.device_type} for r in dev_result.all()}
else:
device_map = {}
health_summary = {
"healthy": sum(1 for h in health_scores if h.status == "healthy"),
"warning": sum(1 for h in health_scores if h.status == "warning"),
"critical": sum(1 for h in health_scores if h.status == "critical"),
"avg_score": round(sum(h.health_score for h in health_scores) / max(1, len(health_scores)), 1),
"devices": [{
"device_id": h.device_id,
"device_name": device_map.get(h.device_id, {}).get("name", f"#{h.device_id}"),
"device_type": device_map.get(h.device_id, {}).get("type", "unknown"),
"health_score": h.health_score,
"status": h.status,
"trend": h.trend,
"factors": h.factors,
} for h in health_scores],
}
# Recent anomalies
anomaly_result = await session.execute(
select(AnomalyDetection)
.where(AnomalyDetection.detected_at >= now - timedelta(days=7))
.order_by(AnomalyDetection.detected_at.desc())
.limit(20)
)
recent_anomalies = anomaly_result.scalars().all()
anomaly_stats = {
"total": len(recent_anomalies),
"by_severity": {},
"by_type": {},
}
for a in recent_anomalies:
anomaly_stats["by_severity"][a.severity] = anomaly_stats["by_severity"].get(a.severity, 0) + 1
anomaly_stats["by_type"][a.anomaly_type] = anomaly_stats["by_type"].get(a.anomaly_type, 0) + 1
# Maintenance predictions
pred_result = await session.execute(
select(MaintenancePrediction).where(
MaintenancePrediction.status == "predicted"
).order_by(MaintenancePrediction.urgency.desc()).limit(10)
)
predictions = pred_result.scalars().all()
# Latest insights
insight_result = await session.execute(
select(OpsInsight).where(
OpsInsight.valid_until >= now
).order_by(OpsInsight.generated_at.desc()).limit(5)
)
latest_insights = insight_result.scalars().all()
return {
"health": health_summary,
"anomalies": {
"stats": anomaly_stats,
"recent": [{
"id": a.id,
"device_id": a.device_id,
"device_name": device_map.get(a.device_id, {}).get("name", f"#{a.device_id}"),
"anomaly_type": a.anomaly_type,
"severity": a.severity,
"description": a.description,
"detected_at": str(a.detected_at),
"status": a.status,
} for a in recent_anomalies[:10]],
},
"predictions": [{
"id": p.id,
"device_id": p.device_id,
"device_name": device_map.get(p.device_id, {}).get("name", f"#{p.device_id}"),
"component": p.component,
"failure_mode": p.failure_mode,
"probability": p.probability,
"predicted_failure_date": str(p.predicted_failure_date) if p.predicted_failure_date else None,
"urgency": p.urgency,
"recommended_action": p.recommended_action,
} for p in predictions],
"insights": [{
"id": i.id,
"insight_type": i.insight_type,
"title": i.title,
"description": i.description,
"impact_level": i.impact_level,
"actionable": i.actionable,
"recommended_action": i.recommended_action,
"generated_at": str(i.generated_at),
} for i in latest_insights],
}