Files

1017 lines
38 KiB
Python
Raw Permalink Normal View History

"""AI运维智能体服务 - 设备健康评分、异常检测、诊断智能、预测性维护、运营洞察
Inspired by Envision's "构网智能体" concept: self-sensing, self-adapting, self-evolving
intelligent agents for energy asset management.
"""
import logging
import random
import math
from datetime import datetime, timezone, timedelta
from sqlalchemy import select, func, and_, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.device import Device
from app.models.energy import EnergyData, EnergyDailySummary
from app.models.alarm import AlarmEvent
from app.models.ai_ops import (
DeviceHealthScore, AnomalyDetection, DiagnosticReport,
MaintenancePrediction, OpsInsight,
)
logger = logging.getLogger("ai_ops")
# ── Device type configurations ──────────────────────────────────────
DEVICE_RATED_EFFICIENCY = {
"pv_inverter": {"metric": "power", "rated_cop": None, "temp_range": (20, 60)},
"heat_pump": {"metric": "cop", "rated_cop": 3.5, "temp_range": (30, 55)},
"meter": {"metric": "power_factor", "rated_cop": None, "temp_range": None},
"sensor": {"metric": "temperature", "rated_cop": None, "temp_range": (18, 28)},
"heat_meter": {"metric": "heat_power", "rated_cop": None, "temp_range": None},
}
HEALTH_WEIGHTS = {
"power_stability": 0.20,
"efficiency": 0.25,
"alarm_frequency": 0.20,
"uptime": 0.20,
"temperature": 0.15,
}
# ── Health Score Calculation ────────────────────────────────────────
async def calculate_device_health(
session: AsyncSession, device: Device, now: datetime | None = None
) -> DeviceHealthScore:
"""Calculate health score (0-100) for a single device based on weighted factors."""
now = now or datetime.now(timezone.utc)
factors = {}
# Factor 1: Power output stability (std_dev of power over last 24h)
factors["power_stability"] = await _calc_power_stability(session, device.id, now)
# Factor 2: Efficiency / COP
factors["efficiency"] = await _calc_efficiency_score(session, device, now)
# Factor 3: Alarm frequency (last 7 days)
factors["alarm_frequency"] = await _calc_alarm_frequency_score(session, device.id, now)
# Factor 4: Uptime (last 30 days)
factors["uptime"] = await _calc_uptime_score(session, device, now)
# Factor 5: Temperature
factors["temperature"] = await _calc_temperature_score(session, device, now)
# Weighted average
health_score = sum(
factors[k] * HEALTH_WEIGHTS[k] for k in HEALTH_WEIGHTS
)
health_score = max(0, min(100, round(health_score, 1)))
# Status
if health_score >= 80:
status = "healthy"
elif health_score >= 60:
status = "warning"
else:
status = "critical"
# Trend: compare with last score
trend = await _calc_trend(session, device.id, health_score)
score = DeviceHealthScore(
device_id=device.id,
timestamp=now,
health_score=health_score,
status=status,
factors=factors,
trend=trend,
)
session.add(score)
return score
async def _calc_power_stability(session: AsyncSession, device_id: int, now: datetime) -> float:
"""Power stability score: low std_dev = high score."""
cutoff = now - timedelta(hours=24)
result = await session.execute(
select(
func.avg(EnergyData.value).label("avg_val"),
func.stddev(EnergyData.value).label("std_val"),
).where(and_(
EnergyData.device_id == device_id,
EnergyData.data_type == "power",
EnergyData.timestamp >= cutoff,
))
)
row = result.one_or_none()
if not row or row.avg_val is None or row.avg_val == 0:
return 85.0 # no data = assume OK
avg_val = float(row.avg_val)
std_val = float(row.std_val or 0)
cv = std_val / avg_val if avg_val > 0 else 0 # coefficient of variation
# cv < 0.1 = 100, cv > 0.5 = 40
score = max(40, min(100, 100 - (cv - 0.1) * 150))
return round(score, 1)
async def _calc_efficiency_score(session: AsyncSession, device: Device, now: datetime) -> float:
"""Efficiency score based on device type."""
cutoff = now - timedelta(hours=24)
if device.device_type == "heat_pump":
result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "cop",
EnergyData.timestamp >= cutoff,
))
)
avg_cop = result.scalar()
if avg_cop is None:
return 85.0
rated_cop = 3.5
ratio = float(avg_cop) / rated_cop
return round(max(30, min(100, ratio * 100)), 1)
elif device.device_type == "pv_inverter":
rated_power = device.rated_power or 110.0
result = await session.execute(
select(func.max(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power",
EnergyData.timestamp >= cutoff,
))
)
max_power = result.scalar()
if max_power is None:
return 85.0
ratio = float(max_power) / rated_power
return round(max(30, min(100, ratio * 110)), 1)
elif device.device_type == "meter":
result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power_factor",
EnergyData.timestamp >= cutoff,
))
)
avg_pf = result.scalar()
if avg_pf is None:
return 85.0
return round(max(40, min(100, float(avg_pf) * 105)), 1)
return 85.0
async def _calc_alarm_frequency_score(session: AsyncSession, device_id: int, now: datetime) -> float:
"""Fewer alarms = higher score. 0 alarms = 100, 10+ = 30."""
cutoff = now - timedelta(days=7)
result = await session.execute(
select(func.count(AlarmEvent.id)).where(and_(
AlarmEvent.device_id == device_id,
AlarmEvent.triggered_at >= cutoff,
))
)
count = result.scalar() or 0
score = max(30, 100 - count * 7)
return float(score)
async def _calc_uptime_score(session: AsyncSession, device: Device, now: datetime) -> float:
"""Uptime based on device status and data availability."""
cutoff = now - timedelta(days=7)
result = await session.execute(
select(func.count(EnergyData.id)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power",
EnergyData.timestamp >= cutoff,
))
)
data_count = result.scalar() or 0
# Expected ~4 readings/min * 60 * 24 * 7 = ~40320 (at 15s interval)
expected = 7 * 24 * 60 * 4
ratio = min(1.0, data_count / max(1, expected))
# Also check current status
status_penalty = 0
if device.status == "offline":
status_penalty = 15
elif device.status == "alarm":
status_penalty = 5
return round(max(30, ratio * 100 - status_penalty), 1)
async def _calc_temperature_score(session: AsyncSession, device: Device, now: datetime) -> float:
"""Temperature within normal range = high score."""
cfg = DEVICE_RATED_EFFICIENCY.get(device.device_type, {})
temp_range = cfg.get("temp_range")
if not temp_range:
return 90.0 # N/A for this device type
cutoff = now - timedelta(hours=6)
result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "temperature",
EnergyData.timestamp >= cutoff,
))
)
avg_temp = result.scalar()
if avg_temp is None:
return 85.0
avg_temp = float(avg_temp)
low, high = temp_range
if low <= avg_temp <= high:
return 100.0
deviation = max(0, avg_temp - high, low - avg_temp)
return round(max(20, 100 - deviation * 5), 1)
async def _calc_trend(session: AsyncSession, device_id: int, current_score: float) -> str:
"""Compare with recent scores to determine trend."""
result = await session.execute(
select(DeviceHealthScore.health_score)
.where(DeviceHealthScore.device_id == device_id)
.order_by(DeviceHealthScore.timestamp.desc())
.limit(5)
)
scores = [float(r) for r in result.scalars().all()]
if len(scores) < 2:
return "stable"
avg_prev = sum(scores) / len(scores)
diff = current_score - avg_prev
if diff > 3:
return "improving"
elif diff < -3:
return "degrading"
return "stable"
# ── Anomaly Detection ───────────────────────────────────────────────
async def scan_anomalies(session: AsyncSession, device_id: int | None = None) -> list[AnomalyDetection]:
"""Scan for anomalies across devices using Z-score and pattern-based methods."""
now = datetime.now(timezone.utc)
anomalies = []
query = select(Device).where(Device.is_active == True)
if device_id:
query = query.where(Device.id == device_id)
result = await session.execute(query)
devices = result.scalars().all()
for device in devices:
# Z-score based detection
device_anomalies = await _zscore_detection(session, device, now)
anomalies.extend(device_anomalies)
# Pattern-based detection
pattern_anomalies = await _pattern_detection(session, device, now)
anomalies.extend(pattern_anomalies)
for a in anomalies:
session.add(a)
return anomalies
async def _zscore_detection(
session: AsyncSession, device: Device, now: datetime
) -> list[AnomalyDetection]:
"""Detect anomalies using Z-score method (> 3 sigma)."""
anomalies = []
metrics = ["power"]
if device.device_type == "heat_pump":
metrics.append("cop")
if device.device_type == "sensor":
metrics = ["temperature"]
for metric in metrics:
# Get rolling stats from last 24h
cutoff_stats = now - timedelta(hours=24)
stats_result = await session.execute(
select(
func.avg(EnergyData.value).label("avg_val"),
func.stddev(EnergyData.value).label("std_val"),
).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == metric,
EnergyData.timestamp >= cutoff_stats,
))
)
stats = stats_result.one_or_none()
if not stats or stats.avg_val is None or stats.std_val is None or float(stats.std_val) == 0:
continue
avg_val = float(stats.avg_val)
std_val = float(stats.std_val)
# Check latest value
latest_result = await session.execute(
select(EnergyData.value).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == metric,
)).order_by(EnergyData.timestamp.desc()).limit(1)
)
latest = latest_result.scalar()
if latest is None:
continue
latest = float(latest)
z_score = abs(latest - avg_val) / std_val
if z_score > 3:
deviation_pct = round(abs(latest - avg_val) / avg_val * 100, 1) if avg_val != 0 else 0
severity = "critical" if z_score > 5 else "warning"
anomaly_type = _classify_anomaly(device.device_type, metric, latest, avg_val)
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type=anomaly_type,
severity=severity,
description=f"{metric} 异常: 当前值 {latest:.2f}, 均值 {avg_val:.2f}, Z-score {z_score:.1f}",
metric_name=metric,
expected_value=round(avg_val, 2),
actual_value=round(latest, 2),
deviation_percent=deviation_pct,
status="detected",
))
return anomalies
def _classify_anomaly(device_type: str, metric: str, actual: float, expected: float) -> str:
"""Classify anomaly type based on device type and metric."""
if metric == "power" and actual < expected:
return "power_drop"
if metric == "cop" and actual < expected:
return "efficiency_loss"
if metric == "temperature":
return "abnormal_temperature"
return "pattern_deviation"
async def _pattern_detection(
session: AsyncSession, device: Device, now: datetime
) -> list[AnomalyDetection]:
"""Pattern-based anomaly detection for specific device types."""
anomalies = []
# Check for communication loss (no data in last 5 minutes)
cutoff = now - timedelta(minutes=5)
result = await session.execute(
select(func.count(EnergyData.id)).where(and_(
EnergyData.device_id == device.id,
EnergyData.timestamp >= cutoff,
))
)
count = result.scalar() or 0
if count == 0 and device.status == "online":
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type="communication_loss",
severity="warning",
description=f"设备 {device.name} 超过5分钟无数据上报",
metric_name="data_availability",
expected_value=1.0,
actual_value=0.0,
deviation_percent=100.0,
status="detected",
))
# PV specific: power drop during daytime (8:00-17:00 Beijing time)
if device.device_type == "pv_inverter":
beijing_hour = (now + timedelta(hours=8)).hour
if 8 <= beijing_hour <= 17:
latest_result = await session.execute(
select(EnergyData.value).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "power",
)).order_by(EnergyData.timestamp.desc()).limit(1)
)
power = latest_result.scalar()
rated = device.rated_power or 110.0
if power is not None and float(power) < rated * 0.05 and beijing_hour >= 9 and beijing_hour <= 16:
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type="power_drop",
severity="warning",
description=f"光伏 {device.name} 在日照时段功率异常偏低: {power:.1f} kW",
metric_name="power",
expected_value=round(rated * 0.3, 2),
actual_value=round(float(power), 2),
deviation_percent=round((1 - float(power) / (rated * 0.3)) * 100, 1),
status="detected",
))
# Heat pump: COP degradation
if device.device_type == "heat_pump":
latest_result = await session.execute(
select(EnergyData.value).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "cop",
)).order_by(EnergyData.timestamp.desc()).limit(1)
)
cop = latest_result.scalar()
if cop is not None and float(cop) < 2.0:
anomalies.append(AnomalyDetection(
device_id=device.id,
detected_at=now,
anomaly_type="efficiency_loss",
severity="warning" if float(cop) >= 1.5 else "critical",
description=f"热泵 {device.name} COP降至 {cop:.2f},低于正常水平",
metric_name="cop",
expected_value=3.5,
actual_value=round(float(cop), 2),
deviation_percent=round((1 - float(cop) / 3.5) * 100, 1),
status="detected",
))
return anomalies
# ── Diagnostic Intelligence ─────────────────────────────────────────
async def run_diagnostics(
session: AsyncSession, device_id: int, report_type: str = "triggered"
) -> DiagnosticReport:
"""Run diagnostic analysis for a device and generate report."""
now = datetime.now(timezone.utc)
result = await session.execute(select(Device).where(Device.id == device_id))
device = result.scalar_one_or_none()
if not device:
raise ValueError(f"Device {device_id} not found")
findings = []
recommendations = []
energy_loss_kwh = 0.0
cost_impact_yuan = 0.0
# Check recent anomalies
cutoff = now - timedelta(days=7)
anomaly_result = await session.execute(
select(AnomalyDetection).where(and_(
AnomalyDetection.device_id == device_id,
AnomalyDetection.detected_at >= cutoff,
)).order_by(AnomalyDetection.detected_at.desc())
)
anomalies = anomaly_result.scalars().all()
# Check alarm history
alarm_result = await session.execute(
select(AlarmEvent).where(and_(
AlarmEvent.device_id == device_id,
AlarmEvent.triggered_at >= cutoff,
))
)
alarms = alarm_result.scalars().all()
# Get current health
health_result = await session.execute(
select(DeviceHealthScore).where(
DeviceHealthScore.device_id == device_id
).order_by(DeviceHealthScore.timestamp.desc()).limit(1)
)
health = health_result.scalar_one_or_none()
# Generate findings based on device type
if device.device_type == "pv_inverter":
findings, recommendations, energy_loss_kwh, cost_impact_yuan = \
await _diagnose_pv(session, device, anomalies, alarms, health, now)
elif device.device_type == "heat_pump":
findings, recommendations, energy_loss_kwh, cost_impact_yuan = \
await _diagnose_heat_pump(session, device, anomalies, alarms, health, now)
else:
findings, recommendations, energy_loss_kwh, cost_impact_yuan = \
_diagnose_general(device, anomalies, alarms, health)
report = DiagnosticReport(
device_id=device_id,
generated_at=now,
report_type=report_type,
findings=findings,
recommendations=recommendations,
estimated_impact={"energy_loss_kwh": round(energy_loss_kwh, 2), "cost_impact_yuan": round(cost_impact_yuan, 2)},
status="generated",
)
session.add(report)
return report
async def _diagnose_pv(session, device, anomalies, alarms, health, now):
"""Diagnostic logic for PV inverters."""
findings = []
recommendations = []
energy_loss = 0.0
cost_impact = 0.0
# Check temperature
cutoff = now - timedelta(hours=6)
temp_result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.device_id == device.id,
EnergyData.data_type == "temperature",
EnergyData.timestamp >= cutoff,
))
)
avg_temp = temp_result.scalar()
power_anomalies = [a for a in anomalies if a.anomaly_type == "power_drop"]
temp_anomalies = [a for a in anomalies if a.anomaly_type == "abnormal_temperature"]
if power_anomalies and avg_temp and float(avg_temp) > 55:
findings.append({
"finding": "光伏逆变器功率下降伴随高温",
"severity": "warning",
"detail": f"平均温度 {float(avg_temp):.1f}°C高于正常范围。功率异常 {len(power_anomalies)}",
})
recommendations.append({
"action": "清洁光伏面板并检查通风散热",
"priority": "high",
"detail": "高温导致逆变器降额运行,建议清洁面板、检查散热风扇",
})
energy_loss = len(power_anomalies) * 5.0
cost_impact = energy_loss * 0.6
elif power_anomalies:
findings.append({
"finding": "光伏功率间歇性下降",
"severity": "info",
"detail": f"近7天检测到 {len(power_anomalies)} 次功率下降",
})
recommendations.append({
"action": "检查光伏面板遮挡和接线",
"priority": "medium",
"detail": "排除树木遮挡、面板污染或接线松动",
})
if alarms:
findings.append({
"finding": f"近7天触发 {len(alarms)} 条告警",
"severity": "warning" if len(alarms) > 3 else "info",
"detail": "频繁告警可能指示设备劣化",
})
if health and health.health_score < 70:
findings.append({
"finding": f"设备健康评分偏低: {health.health_score}",
"severity": "warning",
"detail": f"当前趋势: {health.trend}",
})
recommendations.append({
"action": "安排专项巡检",
"priority": "high",
"detail": "建议安排技术人员进行全面检查",
})
if not findings:
findings.append({
"finding": "设备运行状态良好",
"severity": "info",
"detail": "未发现明显异常",
})
return findings, recommendations, energy_loss, cost_impact
async def _diagnose_heat_pump(session, device, anomalies, alarms, health, now):
"""Diagnostic logic for heat pumps."""
findings = []
recommendations = []
energy_loss = 0.0
cost_impact = 0.0
# Check outdoor temperature for context
cutoff = now - timedelta(hours=6)
cop_anomalies = [a for a in anomalies if a.anomaly_type == "efficiency_loss"]
# Check outdoor temp via sensors
outdoor_result = await session.execute(
select(func.avg(EnergyData.value)).where(and_(
EnergyData.data_type == "outdoor_temp",
EnergyData.timestamp >= cutoff,
))
)
outdoor_temp = outdoor_result.scalar()
if cop_anomalies and outdoor_temp and float(outdoor_temp) < -5:
findings.append({
"finding": "热泵COP下降与低温天气相关",
"severity": "info",
"detail": f"室外温度 {float(outdoor_temp):.1f}°CCOP降额属正常现象",
})
recommendations.append({
"action": "调整运行策略以适应低温",
"priority": "low",
"detail": "低温环境下COP降低属正常特性可适当调整运行时段",
})
elif cop_anomalies:
findings.append({
"finding": "热泵能效异常下降",
"severity": "warning",
"detail": f"检测到 {len(cop_anomalies)} 次COP异常",
})
recommendations.append({
"action": "检查冷媒充注量和换热器",
"priority": "high",
"detail": "COP异常降低可能由冷媒泄漏或换热器结垢导致",
})
energy_loss = len(cop_anomalies) * 8.0
cost_impact = energy_loss * 0.6
comm_anomalies = [a for a in anomalies if a.anomaly_type == "communication_loss"]
if comm_anomalies:
findings.append({
"finding": f"通讯中断 {len(comm_anomalies)}",
"severity": "warning",
"detail": "频繁通讯丢失需检查网络和DTU设备",
})
recommendations.append({
"action": "检查通讯网络和DTU",
"priority": "medium",
"detail": "检查DTU设备状态、网线连接和信号强度",
})
if health and health.health_score < 70:
findings.append({
"finding": f"设备健康评分偏低: {health.health_score}",
"severity": "warning",
"detail": f"当前趋势: {health.trend}",
})
if not findings:
findings.append({
"finding": "设备运行状态良好",
"severity": "info",
"detail": "未发现明显异常",
})
return findings, recommendations, energy_loss, cost_impact
def _diagnose_general(device, anomalies, alarms, health):
"""Generic diagnostic for other device types."""
findings = []
recommendations = []
energy_loss = 0.0
cost_impact = 0.0
if anomalies:
by_type = {}
for a in anomalies:
by_type.setdefault(a.anomaly_type, []).append(a)
for atype, items in by_type.items():
findings.append({
"finding": f"检测到 {len(items)}{atype} 异常",
"severity": "warning" if len(items) > 2 else "info",
"detail": items[0].description if items else "",
})
if alarms:
findings.append({
"finding": f"近7天触发 {len(alarms)} 条告警",
"severity": "warning" if len(alarms) > 3 else "info",
"detail": "频繁告警需要关注",
})
if health and health.health_score < 70:
recommendations.append({
"action": "安排设备巡检",
"priority": "high",
"detail": f"健康评分 {health.health_score},趋势 {health.trend}",
})
if not findings:
findings.append({
"finding": "设备运行正常",
"severity": "info",
"detail": "未发现异常",
})
return findings, recommendations, energy_loss, cost_impact
# ── Predictive Maintenance ──────────────────────────────────────────
async def generate_maintenance_predictions(session: AsyncSession) -> list[MaintenancePrediction]:
"""Generate maintenance predictions based on health trends and patterns."""
now = datetime.now(timezone.utc)
predictions = []
result = await session.execute(select(Device).where(Device.is_active == True))
devices = result.scalars().all()
for device in devices:
# Get recent health scores
health_result = await session.execute(
select(DeviceHealthScore).where(
DeviceHealthScore.device_id == device.id
).order_by(DeviceHealthScore.timestamp.desc()).limit(10)
)
scores = health_result.scalars().all()
if not scores:
continue
latest = scores[0]
# Rule: health score < 60 and degrading
if latest.health_score < 60 and latest.trend == "degrading":
days_to_failure = max(3, int((latest.health_score - 20) / 5))
predictions.append(MaintenancePrediction(
device_id=device.id,
predicted_at=now,
component=_get_weak_component(latest.factors),
failure_mode="设备性能持续下降,可能导致故障停机",
probability=round(min(0.9, (100 - latest.health_score) / 100 + 0.2), 2),
predicted_failure_date=now + timedelta(days=days_to_failure),
recommended_action="安排全面检修,重点检查薄弱环节",
urgency="critical" if latest.health_score < 40 else "high",
estimated_downtime_hours=4.0 if device.device_type in ("heat_pump", "pv_inverter") else 2.0,
estimated_repair_cost=_estimate_repair_cost(device.device_type),
status="predicted",
))
# Rule: health between 60-75 and degrading
elif 60 <= latest.health_score < 75 and latest.trend == "degrading":
predictions.append(MaintenancePrediction(
device_id=device.id,
predicted_at=now,
component=_get_weak_component(latest.factors),
failure_mode="性能下降趋势,需预防性维护",
probability=round(min(0.6, (80 - latest.health_score) / 100 + 0.1), 2),
predicted_failure_date=now + timedelta(days=14),
recommended_action="安排预防性巡检和维护",
urgency="medium",
estimated_downtime_hours=2.0,
estimated_repair_cost=_estimate_repair_cost(device.device_type) * 0.5,
status="predicted",
))
# Rule: check alarm frequency
alarm_score = latest.factors.get("alarm_frequency", 100) if latest.factors else 100
if alarm_score < 50:
predictions.append(MaintenancePrediction(
device_id=device.id,
predicted_at=now,
component="告警系统/传感器",
failure_mode="频繁告警可能预示设备故障",
probability=0.4,
predicted_failure_date=now + timedelta(days=7),
recommended_action="排查告警根因,检查传感器和控制系统",
urgency="medium",
estimated_downtime_hours=1.0,
estimated_repair_cost=500.0,
status="predicted",
))
for p in predictions:
session.add(p)
return predictions
def _get_weak_component(factors: dict | None) -> str:
"""Identify the weakest factor as the component needing attention."""
if not factors:
return "综合"
component_map = {
"power_stability": "电力输出系统",
"efficiency": "能效/换热系统",
"alarm_frequency": "监控传感器",
"uptime": "通讯/控制系统",
"temperature": "散热/温控系统",
}
weakest = min(factors, key=lambda k: factors.get(k, 100))
return component_map.get(weakest, "综合")
def _estimate_repair_cost(device_type: str) -> float:
"""Estimate repair cost by device type."""
costs = {
"pv_inverter": 3000.0,
"heat_pump": 5000.0,
"meter": 800.0,
"sensor": 300.0,
"heat_meter": 1500.0,
}
return costs.get(device_type, 1000.0)
# ── Operational Insights ────────────────────────────────────────────
async def generate_insights(session: AsyncSession) -> list[OpsInsight]:
"""Generate operational insights from data analysis."""
now = datetime.now(timezone.utc)
insights = []
# Insight 1: Device efficiency comparison
result = await session.execute(
select(
DeviceHealthScore.device_id,
func.avg(DeviceHealthScore.health_score).label("avg_score"),
).where(
DeviceHealthScore.timestamp >= now - timedelta(days=7)
).group_by(DeviceHealthScore.device_id)
)
scores_by_device = result.all()
if scores_by_device:
scores = [float(s.avg_score) for s in scores_by_device]
avg_all = sum(scores) / len(scores) if scores else 0
low_performers = [s for s in scores_by_device if float(s.avg_score) < avg_all - 10]
if low_performers:
device_ids = [s.device_id for s in low_performers]
dev_result = await session.execute(
select(Device.id, Device.name).where(Device.id.in_(device_ids))
)
device_names = {r.id: r.name for r in dev_result.all()}
insights.append(OpsInsight(
insight_type="performance_comparison",
title="部分设备健康评分低于平均水平",
description=f"以下设备健康评分低于园区平均值({avg_all:.0f})超过10分: "
+ ", ".join(device_names.get(d, f"#{d}") for d in device_ids),
data={"avg_score": round(avg_all, 1), "low_performers": [
{"device_id": s.device_id, "score": round(float(s.avg_score), 1),
"name": device_names.get(s.device_id, f"#{s.device_id}")}
for s in low_performers
]},
impact_level="medium" if len(low_performers) <= 2 else "high",
actionable=True,
recommended_action="重点关注低评分设备,安排巡检和维护",
generated_at=now,
valid_until=now + timedelta(days=7),
))
# Insight 2: Anomaly trend
week_ago = now - timedelta(days=7)
two_weeks_ago = now - timedelta(days=14)
this_week = await session.execute(
select(func.count(AnomalyDetection.id)).where(
AnomalyDetection.detected_at >= week_ago
)
)
last_week = await session.execute(
select(func.count(AnomalyDetection.id)).where(and_(
AnomalyDetection.detected_at >= two_weeks_ago,
AnomalyDetection.detected_at < week_ago,
))
)
this_count = this_week.scalar() or 0
last_count = last_week.scalar() or 0
if this_count > last_count * 1.5 and this_count > 3:
insights.append(OpsInsight(
insight_type="efficiency_trend",
title="异常检测数量环比上升",
description=f"本周检测到 {this_count} 次异常,上周 {last_count} 次,增长 {((this_count/max(1,last_count))-1)*100:.0f}%",
data={"this_week": this_count, "last_week": last_count},
impact_level="high",
actionable=True,
recommended_action="建议全面排查设备状态,加强巡检频次",
generated_at=now,
valid_until=now + timedelta(days=3),
))
# Insight 3: Energy cost optimization
insights.append(OpsInsight(
insight_type="cost_anomaly",
title="能源使用效率周报",
description="园区整体运行状况评估",
data={
"total_devices": len(scores_by_device) if scores_by_device else 0,
"avg_health": round(avg_all, 1) if scores_by_device else 0,
"anomaly_count": this_count,
},
impact_level="low",
actionable=False,
generated_at=now,
valid_until=now + timedelta(days=7),
))
for i in insights:
session.add(i)
return insights
# ── Dashboard Aggregation ───────────────────────────────────────────
async def get_dashboard_data(session: AsyncSession) -> dict:
"""Get AI Ops dashboard overview data."""
now = datetime.now(timezone.utc)
# Latest health scores per device
subq = (
select(
DeviceHealthScore.device_id,
func.max(DeviceHealthScore.timestamp).label("max_ts"),
).group_by(DeviceHealthScore.device_id).subquery()
)
health_result = await session.execute(
select(DeviceHealthScore).join(
subq, and_(
DeviceHealthScore.device_id == subq.c.device_id,
DeviceHealthScore.timestamp == subq.c.max_ts,
)
)
)
health_scores = health_result.scalars().all()
# Get device names
device_ids = [h.device_id for h in health_scores]
if device_ids:
dev_result = await session.execute(
select(Device.id, Device.name, Device.device_type).where(Device.id.in_(device_ids))
)
device_map = {r.id: {"name": r.name, "type": r.device_type} for r in dev_result.all()}
else:
device_map = {}
health_summary = {
"healthy": sum(1 for h in health_scores if h.status == "healthy"),
"warning": sum(1 for h in health_scores if h.status == "warning"),
"critical": sum(1 for h in health_scores if h.status == "critical"),
"avg_score": round(sum(h.health_score for h in health_scores) / max(1, len(health_scores)), 1),
"devices": [{
"device_id": h.device_id,
"device_name": device_map.get(h.device_id, {}).get("name", f"#{h.device_id}"),
"device_type": device_map.get(h.device_id, {}).get("type", "unknown"),
"health_score": h.health_score,
"status": h.status,
"trend": h.trend,
"factors": h.factors,
} for h in health_scores],
}
# Recent anomalies
anomaly_result = await session.execute(
select(AnomalyDetection)
.where(AnomalyDetection.detected_at >= now - timedelta(days=7))
.order_by(AnomalyDetection.detected_at.desc())
.limit(20)
)
recent_anomalies = anomaly_result.scalars().all()
anomaly_stats = {
"total": len(recent_anomalies),
"by_severity": {},
"by_type": {},
}
for a in recent_anomalies:
anomaly_stats["by_severity"][a.severity] = anomaly_stats["by_severity"].get(a.severity, 0) + 1
anomaly_stats["by_type"][a.anomaly_type] = anomaly_stats["by_type"].get(a.anomaly_type, 0) + 1
# Maintenance predictions
pred_result = await session.execute(
select(MaintenancePrediction).where(
MaintenancePrediction.status == "predicted"
).order_by(MaintenancePrediction.urgency.desc()).limit(10)
)
predictions = pred_result.scalars().all()
# Latest insights
insight_result = await session.execute(
select(OpsInsight).where(
OpsInsight.valid_until >= now
).order_by(OpsInsight.generated_at.desc()).limit(5)
)
latest_insights = insight_result.scalars().all()
return {
"health": health_summary,
"anomalies": {
"stats": anomaly_stats,
"recent": [{
"id": a.id,
"device_id": a.device_id,
"device_name": device_map.get(a.device_id, {}).get("name", f"#{a.device_id}"),
"anomaly_type": a.anomaly_type,
"severity": a.severity,
"description": a.description,
"detected_at": str(a.detected_at),
"status": a.status,
} for a in recent_anomalies[:10]],
},
"predictions": [{
"id": p.id,
"device_id": p.device_id,
"device_name": device_map.get(p.device_id, {}).get("name", f"#{p.device_id}"),
"component": p.component,
"failure_mode": p.failure_mode,
"probability": p.probability,
"predicted_failure_date": str(p.predicted_failure_date) if p.predicted_failure_date else None,
"urgency": p.urgency,
"recommended_action": p.recommended_action,
} for p in predictions],
"insights": [{
"id": i.id,
"insight_type": i.insight_type,
"title": i.title,
"description": i.description,
"impact_level": i.impact_level,
"actionable": i.actionable,
"recommended_action": i.recommended_action,
"generated_at": str(i.generated_at),
} for i in latest_insights],
}