3 Commits
v1.2.0 ... main

Author SHA1 Message Date
Du Wenbo
56132bae32 chore: add validate_data.py for buyoff data accuracy checks
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 19:05:56 +08:00
Du Wenbo
475313855d feat: add version API and solar KPI endpoint (v1.4.0)
New endpoints:
- GET /api/v1/version — returns VERSIONS.json (no auth required)
  For field engineers to check platform version from login page
- GET /api/v1/kpi/solar — returns PR, self-consumption rate,
  equivalent utilization hours, and daily revenue
  Handles station-level vs device-level data deduplication

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 22:35:08 +08:00
Du Wenbo
60e7f08d7e feat!: generic defaults, dashboard fallback, PV filter fix (v1.3.0)
- Replace all hardcoded Tianpu/天普 defaults with generic "EMS Platform"
- Add energy_today fallback: query raw energy_data when daily summary empty
- Fix PV device filter to include sungrow_inverter device type
- Update APP_NAME, CUSTOMER default, SECRET_KEY, SMTP, Celery, email templates

BREAKING: CUSTOMER default changed from "tianpu" to "default"
Existing deployments with CUSTOMER=tianpu in .env are unaffected.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 22:02:38 +08:00
19 changed files with 503 additions and 30 deletions

View File

@@ -1 +1 @@
1.2.0 1.4.0

View File

@@ -1,6 +1,6 @@
{ {
"project": "ems-core", "project": "ems-core",
"project_version": "1.2.0", "project_version": "1.4.0",
"last_updated": "2026-04-06", "last_updated": "2026-04-06",
"notes": "Backend-only architecture, frontend removed to customer repos" "notes": "Version API, solar KPI endpoint (PR, equiv hours, revenue, self-consumption)"
} }

View File

@@ -1,6 +1,6 @@
[alembic] [alembic]
script_location = alembic script_location = alembic
sqlalchemy.url = postgresql://tianpu:tianpu2026@localhost:5432/tianpu_ems sqlalchemy.url = postgresql://ems:ems2026@localhost:5432/ems
[loggers] [loggers]
keys = root,sqlalchemy,alembic keys = root,sqlalchemy,alembic

View File

@@ -26,7 +26,7 @@ def upgrade() -> None:
# Seed default settings # Seed default settings
op.execute(""" op.execute("""
INSERT INTO system_settings (key, value, description) VALUES INSERT INTO system_settings (key, value, description) VALUES
('platform_name', '天普零碳园区智慧能源管理平台', '平台名称'), ('platform_name', 'Smart Energy Management Platform', '平台名称'),
('data_retention_days', '365', '数据保留天数'), ('data_retention_days', '365', '数据保留天数'),
('alarm_auto_resolve_minutes', '30', '告警自动解除时间(分钟)'), ('alarm_auto_resolve_minutes', '30', '告警自动解除时间(分钟)'),
('simulator_interval_seconds', '15', '模拟器采集间隔(秒)'), ('simulator_interval_seconds', '15', '模拟器采集间隔(秒)'),

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter from fastapi import APIRouter
from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding, version, kpi
api_router = APIRouter(prefix="/api/v1") api_router = APIRouter(prefix="/api/v1")
@@ -26,3 +26,5 @@ api_router.include_router(energy_strategy.router)
api_router.include_router(weather.router) api_router.include_router(weather.router)
api_router.include_router(ai_ops.router) api_router.include_router(ai_ops.router)
api_router.include_router(branding.router) api_router.include_router(branding.router)
api_router.include_router(version.router)
api_router.include_router(kpi.router)

View File

@@ -26,7 +26,7 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
) )
device_stats = {row[0]: row[1] for row in device_stats_q.all()} device_stats = {row[0]: row[1] for row in device_stats_q.all()}
# 今日能耗汇总 # 今日能耗汇总 (from daily summary table)
daily_q = await db.execute( daily_q = await db.execute(
select( select(
EnergyDailySummary.energy_type, EnergyDailySummary.energy_type,
@@ -38,6 +38,35 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
for row in daily_q.all(): for row in daily_q.all():
energy_summary[row[0]] = {"consumption": row[1] or 0, "generation": row[2] or 0} energy_summary[row[0]] = {"consumption": row[1] or 0, "generation": row[2] or 0}
# Fallback: if daily summary is empty, compute from raw energy_data
if not energy_summary:
from sqlalchemy import distinct
fallback_q = await db.execute(
select(
func.sum(EnergyData.value),
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
)
).group_by(EnergyData.device_id).order_by(EnergyData.device_id)
)
# Get the latest daily_energy per device (avoid double-counting)
latest_energy_q = await db.execute(
select(
EnergyData.device_id,
func.max(EnergyData.value).label("max_energy"),
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
)
).group_by(EnergyData.device_id)
)
total_gen = sum(row[1] or 0 for row in latest_energy_q.all())
if total_gen > 0:
energy_summary["electricity"] = {"consumption": 0, "generation": round(total_gen, 2)}
# 今日碳排放 # 今日碳排放
carbon_q = await db.execute( carbon_q = await db.execute(
select(func.sum(CarbonEmission.emission), func.sum(CarbonEmission.reduction)) select(func.sum(CarbonEmission.emission), func.sum(CarbonEmission.reduction))
@@ -134,7 +163,10 @@ async def get_load_curve(
async def _get_pv_device_ids(db: AsyncSession): async def _get_pv_device_ids(db: AsyncSession):
result = await db.execute( result = await db.execute(
select(Device.id).where(Device.device_type == "pv_inverter", Device.is_active == True) select(Device.id).where(
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
Device.is_active == True,
)
) )
return [r[0] for r in result.fetchall()] return [r[0] for r in result.fetchall()]

93
backend/app/api/v1/kpi.py Normal file
View File

@@ -0,0 +1,93 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.device import Device
from app.models.energy import EnergyData
from app.models.user import User
router = APIRouter(prefix="/kpi", tags=["关键指标"])
@router.get("/solar")
async def get_solar_kpis(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""Solar performance KPIs - PR, self-consumption, equivalent hours, revenue"""
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Get PV devices and their rated power
pv_q = await db.execute(
select(Device.id, Device.rated_power).where(
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
Device.is_active == True,
)
)
pv_devices = pv_q.all()
pv_ids = [d[0] for d in pv_devices]
total_rated_kw = sum(d[1] or 0 for d in pv_devices) # kW
if not pv_ids or total_rated_kw == 0:
return {
"pr": 0, "self_consumption_rate": 0,
"equivalent_hours": 0, "revenue_today": 0,
"total_rated_kw": 0, "daily_generation_kwh": 0,
}
# Get latest daily_energy per PV device for today
daily_gen_q = await db.execute(
select(
EnergyData.device_id,
func.max(EnergyData.value).label("max_energy"),
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
EnergyData.device_id.in_(pv_ids),
)
).group_by(EnergyData.device_id)
)
# Check if values are station-level (all identical) or device-level
daily_values = daily_gen_q.all()
if not daily_values:
daily_generation_kwh = 0
else:
values = [row[1] or 0 for row in daily_values]
# If all values are identical, it's station-level data — use max (not sum)
if len(set(values)) == 1 and len(values) > 1:
daily_generation_kwh = values[0]
else:
daily_generation_kwh = sum(values)
# Performance Ratio (PR) = actual output / (rated capacity * peak sun hours)
# Approximate peak sun hours from time of day (simplified)
hours_since_sunrise = max(0, min(12, (now.hour + now.minute / 60) - 6)) # approx 6am sunrise
theoretical_kwh = total_rated_kw * hours_since_sunrise * 0.8 # 0.8 = typical irradiance factor
pr = (daily_generation_kwh / theoretical_kwh * 100) if theoretical_kwh > 0 else 0
pr = min(100, round(pr, 1)) # Cap at 100%
# Self-consumption rate (without grid export meter, assume 100% self-consumed for now)
# TODO: integrate grid export meter data when available
self_consumption_rate = 100.0
# Equivalent utilization hours = daily generation / rated capacity
equivalent_hours = round(daily_generation_kwh / total_rated_kw, 2) if total_rated_kw > 0 else 0
# Revenue = daily generation * electricity price
# TODO: get actual price from electricity_pricing table
# Default industrial TOU average price in Beijing: ~0.65 CNY/kWh
avg_price = 0.65
revenue_today = round(daily_generation_kwh * avg_price, 2)
return {
"pr": pr,
"self_consumption_rate": round(self_consumption_rate, 1),
"equivalent_hours": equivalent_hours,
"revenue_today": revenue_today,
"total_rated_kw": total_rated_kw,
"daily_generation_kwh": round(daily_generation_kwh, 2),
"avg_price_per_kwh": avg_price,
"pv_device_count": len(pv_ids),
}

View File

@@ -12,7 +12,7 @@ router = APIRouter(prefix="/settings", tags=["系统设置"])
# Default settings — used when keys are missing from DB # Default settings — used when keys are missing from DB
DEFAULTS: dict[str, str] = { DEFAULTS: dict[str, str] = {
"platform_name": "天普零碳园区智慧能源管理平台", "platform_name": "Smart Energy Management Platform",
"data_retention_days": "365", "data_retention_days": "365",
"alarm_auto_resolve_minutes": "30", "alarm_auto_resolve_minutes": "30",
"simulator_interval_seconds": "15", "simulator_interval_seconds": "15",

View File

@@ -0,0 +1,32 @@
import os
import json
from fastapi import APIRouter
router = APIRouter(prefix="/version", tags=["版本信息"])
@router.get("")
async def get_version():
"""Return platform version information for display on login/dashboard"""
# Read VERSIONS.json from project root (2 levels up from backend/)
backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# Try multiple paths for VERSIONS.json
for path in [
os.path.join(backend_dir, "VERSIONS.json"), # standalone
os.path.join(backend_dir, "..", "VERSIONS.json"), # inside core/ subtree
os.path.join(backend_dir, "..", "..", "VERSIONS.json"), # customer project root
]:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
versions = json.load(f)
return versions
# Fallback: read VERSION file
version_file = os.path.join(backend_dir, "VERSION")
version = "unknown"
if os.path.exists(version_file):
with open(version_file, 'r') as f:
version = f.read().strip()
return {"project_version": version, "project": "ems-core"}

View File

@@ -6,23 +6,23 @@ import yaml
class Settings(BaseSettings): class Settings(BaseSettings):
APP_NAME: str = "TianpuEMS" APP_NAME: str = "EMS Platform"
DEBUG: bool = True DEBUG: bool = True
API_V1_PREFIX: str = "/api/v1" API_V1_PREFIX: str = "/api/v1"
# Customer configuration # Customer configuration
CUSTOMER: str = "tianpu" # tianpu, zpark, etc. CUSTOMER: str = "default" # tianpu, zpark, etc.
CUSTOMER_DISPLAY_NAME: str = "" # Loaded from customer config CUSTOMER_DISPLAY_NAME: str = "" # Loaded from customer config
# Database: set DATABASE_URL in .env to override. # Database: set DATABASE_URL in .env to override.
# Default: SQLite for local dev. Docker sets PostgreSQL via env var. # Default: SQLite for local dev. Docker sets PostgreSQL via env var.
# Examples: # Examples:
# SQLite: sqlite+aiosqlite:///./tianpu_ems.db # SQLite: sqlite+aiosqlite:///./ems.db
# PostgreSQL: postgresql+asyncpg://tianpu:tianpu2026@localhost:5432/tianpu_ems # PostgreSQL: postgresql+asyncpg://ems:ems2026@localhost:5432/ems
DATABASE_URL: str = "sqlite+aiosqlite:///./tianpu_ems.db" DATABASE_URL: str = "sqlite+aiosqlite:///./ems.db"
REDIS_URL: str = "redis://localhost:6379/0" REDIS_URL: str = "redis://localhost:6379/0"
SECRET_KEY: str = "tianpu-ems-secret-key-change-in-production-2026" SECRET_KEY: str = "ems-secret-key-change-in-production-2026"
ALGORITHM: str = "HS256" ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 480 ACCESS_TOKEN_EXPIRE_MINUTES: int = 480
@@ -40,7 +40,7 @@ class Settings(BaseSettings):
SMTP_PORT: int = 587 SMTP_PORT: int = 587
SMTP_USER: str = "" SMTP_USER: str = ""
SMTP_PASSWORD: str = "" SMTP_PASSWORD: str = ""
SMTP_FROM: str = "noreply@tianpu-ems.com" SMTP_FROM: str = "noreply@ems-platform.com"
SMTP_ENABLED: bool = False SMTP_ENABLED: bool = False
# Platform URL for links in emails # Platform URL for links in emails

View File

@@ -84,8 +84,8 @@ async def lifespan(app: FastAPI):
app = FastAPI( app = FastAPI(
title=customer_config.get("platform_name", "天普零碳园区智慧能源管理平台"), title=customer_config.get("platform_name", "Smart Energy Management Platform"),
description=customer_config.get("platform_name_en", "Tianpu Zero-Carbon Park Smart Energy Management System"), description=customer_config.get("platform_name_en", "Smart Energy Management System"),
version="1.0.0", version="1.0.0",
lifespan=lifespan, lifespan=lifespan,
) )

View File

@@ -104,7 +104,7 @@ async def _send_alarm_email(
platform_url=settings.PLATFORM_URL, platform_url=settings.PLATFORM_URL,
) )
subject = f"[{severity_cfg['label']}] {event.title} - 天普EMS告警通知" subject = f"[{severity_cfg['label']}] {event.title} - EMS Alarm Notification"
asyncio.create_task(send_email(to=emails, subject=subject, body_html=body_html)) asyncio.create_task(send_email(to=emails, subject=subject, body_html=body_html))
# Rate limit: don't create duplicate events for the same rule+device within this window # Rate limit: don't create duplicate events for the same rule+device within this window

View File

@@ -1,5 +1,5 @@
""" """
报表生成服务 - PDF/Excel report generation for Tianpu EMS. 报表生成服务 - PDF/Excel report generation for EMS Platform.
""" """
import os import os
import io import io
@@ -18,7 +18,7 @@ from app.models.carbon import CarbonEmission
REPORTS_DIR = Path(__file__).resolve().parent.parent.parent / "reports" REPORTS_DIR = Path(__file__).resolve().parent.parent.parent / "reports"
REPORTS_DIR.mkdir(exist_ok=True) REPORTS_DIR.mkdir(exist_ok=True)
PLATFORM_TITLE = "天普零碳园区智慧能源管理平台" PLATFORM_TITLE = "Smart Energy Management Platform"
ENERGY_TYPE_LABELS = { ENERGY_TYPE_LABELS = {
"electricity": "电力", "electricity": "电力",

View File

@@ -103,10 +103,10 @@ async def _run_report_task(task_id: int):
recipients = task.recipients or [] recipients = task.recipients or []
if isinstance(recipients, list) and recipients: if isinstance(recipients, list) and recipients:
report_name = task.name or template.name report_name = task.name or template.name
subject = f"{report_name} - 天普EMS自动报表" subject = f"{report_name} - EMS Auto Report"
body_html = f""" body_html = f"""
<div style="font-family: 'Microsoft YaHei', sans-serif; padding: 20px;"> <div style="font-family: 'Microsoft YaHei', sans-serif; padding: 20px;">
<h2 style="color: #1a73e8;">天普零碳园区智慧能源管理平台</h2> <h2 style="color: #1a73e8;">Smart Energy Management Platform</h2>
<p>您好,</p> <p>您好,</p>
<p>系统已自动生成 <strong>{report_name}</strong>,请查收附件。</p> <p>系统已自动生成 <strong>{report_name}</strong>,请查收附件。</p>
<p style="color: #666; font-size: 13px;"> <p style="color: #666; font-size: 13px;">

View File

@@ -1,4 +1,4 @@
"""模拟数据生成器 - 为天普园区设备生成真实感的模拟数据 """模拟数据生成器 - 为园区设备生成真实感的模拟数据
Uses physics-based solar position, Beijing weather models, cloud transients, Uses physics-based solar position, Beijing weather models, cloud transients,
temperature derating, and realistic building load patterns to produce data temperature derating, and realistic building load patterns to produce data

View File

@@ -3,7 +3,7 @@
Shared by both the real-time simulator and the backfill script. Shared by both the real-time simulator and the backfill script.
Deterministic when given a seed — call set_seed() for reproducible backfills. Deterministic when given a seed — call set_seed() for reproducible backfills.
Tianpu campus: 39.9N, 116.4E (Beijing / Daxing district) Default campus: 39.9N, 116.4E (Beijing / Daxing district)
""" """
import math import math
@@ -87,7 +87,7 @@ def solar_altitude(dt: datetime) -> float:
# Local solar time # Local solar time
beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt beijing_dt = dt + timedelta(hours=BEIJING_TZ_OFFSET) if dt.tzinfo else dt
# Standard meridian for UTC+8 is 120E; Tianpu is at 116.4E # Standard meridian for UTC+8 is 120E; default campus is at 116.4E
time_offset = _equation_of_time(doy) + 4 * (BEIJING_LON - 120.0) time_offset = _equation_of_time(doy) + 4 * (BEIJING_LON - 120.0)
solar_hour = beijing_dt.hour + beijing_dt.minute / 60.0 + beijing_dt.second / 3600.0 solar_hour = beijing_dt.hour + beijing_dt.minute / 60.0 + beijing_dt.second / 3600.0
solar_hour += time_offset / 60.0 solar_hour += time_offset / 60.0

View File

@@ -4,7 +4,7 @@ from app.core.config import get_settings
settings = get_settings() settings = get_settings()
celery_app = Celery( celery_app = Celery(
"tianpu_ems", "ems_platform",
broker=settings.REDIS_URL, broker=settings.REDIS_URL,
backend=settings.REDIS_URL, backend=settings.REDIS_URL,
) )

View File

@@ -13,8 +13,8 @@
<!-- Header --> <!-- Header -->
<tr> <tr>
<td style="background: linear-gradient(135deg, #1a73e8, #0d47a1); padding:24px 32px; text-align:center;"> <td style="background: linear-gradient(135deg, #1a73e8, #0d47a1); padding:24px 32px; text-align:center;">
<div style="font-size:12px; color:rgba(255,255,255,0.8); margin-bottom:4px;">TIANPU EMS</div> <div style="font-size:12px; color:rgba(255,255,255,0.8); margin-bottom:4px;">EMS PLATFORM</div>
<div style="font-size:20px; font-weight:bold; color:#ffffff; letter-spacing:1px;">天普零碳园区智慧能源管理平台</div> <div style="font-size:20px; font-weight:bold; color:#ffffff; letter-spacing:1px;">Smart Energy Management Platform</div>
</td> </td>
</tr> </tr>
@@ -85,7 +85,7 @@
<td style="background-color:#f8f9fa; padding:16px 32px; border-top:1px solid #e8e8e8;"> <td style="background-color:#f8f9fa; padding:16px 32px; border-top:1px solid #e8e8e8;">
<div style="font-size:12px; color:#999; text-align:center; line-height:1.6;"> <div style="font-size:12px; color:#999; text-align:center; line-height:1.6;">
此为系统自动发送,请勿回复。<br> 此为系统自动发送,请勿回复。<br>
天普零碳园区智慧能源管理平台 &copy; 2026 Smart Energy Management Platform &copy; 2026
</div> </div>
</td> </td>
</tr> </tr>

314
scripts/validate_data.py Normal file
View File

@@ -0,0 +1,314 @@
"""EMS 数据验证脚本 — 对比平台数据与源API确保偏差在容许范围内
Used during deployment buyoff to verify that EMS dashboard numbers match
the upstream source (e.g. iSolarCloud). Supports both automated source-API
comparison and manual interactive mode.
Exit code 0 = all checks within tolerance
Exit code 1 = one or more checks exceeded tolerance
"""
import argparse
import getpass
import sys
from typing import Optional
import requests
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _pct_diff(ems_val: float, source_val: float) -> Optional[float]:
"""Return absolute percentage difference, or None if source is zero."""
if source_val == 0:
return None if ems_val == 0 else float("inf")
return abs(ems_val - source_val) / abs(source_val)
def _fmt_pct(pct: Optional[float]) -> str:
if pct is None:
return "N/A"
if pct == float("inf"):
return "INF"
return f"{pct * 100:.2f}%"
def _status(pct: Optional[float], tolerance: float) -> str:
if pct is None:
return "SKIP"
if pct <= tolerance:
return "PASS"
return "FAIL"
# ---------------------------------------------------------------------------
# EMS data fetching
# ---------------------------------------------------------------------------
def ems_login(base_url: str, username: str, password: str) -> str:
"""Authenticate against the EMS backend and return a bearer token."""
url = f"{base_url}/api/v1/auth/login"
resp = requests.post(url, json={"username": username, "password": password}, timeout=10)
resp.raise_for_status()
data = resp.json()
token = data.get("access_token") or data.get("token") or data.get("data", {}).get("access_token")
if not token:
raise RuntimeError(f"Login succeeded but no token found in response: {list(data.keys())}")
return token
def fetch_ems_metrics(base_url: str, token: str) -> dict:
"""Pull all relevant metrics from the EMS backend."""
headers = {"Authorization": f"Bearer {token}"}
metrics: dict = {}
# --- /api/v1/dashboard/realtime ---
try:
resp = requests.get(f"{base_url}/api/v1/dashboard/realtime", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["pv_power_kw"] = float(d.get("pv_power", 0))
except Exception as exc:
print(f" [warn] dashboard/realtime failed: {exc}")
metrics["pv_power_kw"] = None
# --- /api/v1/dashboard/overview ---
try:
resp = requests.get(f"{base_url}/api/v1/dashboard/overview", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["energy_today_kwh"] = float(d.get("energy_today", 0))
metrics["total_generation_kwh"] = float(d.get("total_generation", 0))
except Exception as exc:
print(f" [warn] dashboard/overview failed: {exc}")
metrics["energy_today_kwh"] = None
metrics["total_generation_kwh"] = None
# --- /api/v1/devices/stats ---
try:
resp = requests.get(f"{base_url}/api/v1/devices/stats", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["online_count"] = int(d.get("online_count", 0))
metrics["total_count"] = int(d.get("total_count", 0))
except Exception as exc:
print(f" [warn] devices/stats failed: {exc}")
metrics["online_count"] = None
metrics["total_count"] = None
# --- /api/v1/kpi/solar ---
try:
resp = requests.get(f"{base_url}/api/v1/kpi/solar", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["pr"] = float(d.get("pr", 0))
metrics["equivalent_hours"] = float(d.get("equivalent_hours", 0))
metrics["revenue"] = float(d.get("revenue", 0))
metrics["self_consumption_rate"] = float(d.get("self_consumption_rate", 0))
except Exception as exc:
print(f" [warn] kpi/solar failed: {exc}")
metrics["pr"] = None
metrics["equivalent_hours"] = None
metrics["revenue"] = None
metrics["self_consumption_rate"] = None
return metrics
# ---------------------------------------------------------------------------
# Source data (manual mode)
# ---------------------------------------------------------------------------
METRIC_LABELS = {
"pv_power_kw": "Real-time PV Power (kW)",
"energy_today_kwh": "Today Generation (kWh)",
"total_generation_kwh": "Total Generation (kWh)",
"online_count": "Devices Online",
"total_count": "Devices Total",
"pr": "Performance Ratio",
"equivalent_hours": "Equivalent Hours (h)",
"revenue": "Revenue (CNY)",
"self_consumption_rate": "Self-consumption Rate",
}
def prompt_source_values(ems_metrics: dict) -> dict:
"""Interactively ask the user for source reference values."""
print("\n--- Manual Source Entry ---")
print("Enter the reference value from the source system for each metric.")
print("Press Enter to skip a metric.\n")
source: dict = {}
for key, label in METRIC_LABELS.items():
if ems_metrics.get(key) is None:
continue
raw = input(f" {label} [{key}]: ").strip()
if raw == "":
source[key] = None
else:
try:
source[key] = float(raw)
except ValueError:
print(f" -> invalid number, skipping {key}")
source[key] = None
return source
# ---------------------------------------------------------------------------
# Comparison & reporting
# ---------------------------------------------------------------------------
def compare_and_report(ems: dict, source: dict, tolerance: float) -> bool:
"""Print a comparison table and return True if all checks pass."""
col_metric = 30
col_val = 14
col_pct = 10
col_st = 6
sep = "-" * (col_metric + col_val * 2 + col_pct + col_st + 8)
print("\n" + "=" * len(sep))
print(" EMS Data Validation Report")
print("=" * len(sep))
header = (
f"{'Metric':<{col_metric}} "
f"{'EMS':>{col_val}} "
f"{'Source':>{col_val}} "
f"{'Diff%':>{col_pct}} "
f"{'Status':>{col_st}}"
)
print(header)
print(sep)
all_pass = True
checked = 0
failed = 0
for key, label in METRIC_LABELS.items():
ems_val = ems.get(key)
src_val = source.get(key)
if ems_val is None or src_val is None:
pct = None
st = "SKIP"
ems_str = str(ems_val) if ems_val is not None else "-"
src_str = str(src_val) if src_val is not None else "-"
else:
pct = _pct_diff(ems_val, src_val)
st = _status(pct, tolerance)
ems_str = f"{ems_val:.2f}" if isinstance(ems_val, float) else str(ems_val)
src_str = f"{src_val:.2f}" if isinstance(src_val, float) else str(src_val)
if st == "FAIL":
all_pass = False
failed += 1
if st != "SKIP":
checked += 1
print(
f"{label:<{col_metric}} "
f"{ems_str:>{col_val}} "
f"{src_str:>{col_val}} "
f"{_fmt_pct(pct):>{col_pct}} "
f"{st:>{col_st}}"
)
print(sep)
print(f"\nTolerance: {tolerance * 100:.1f}%")
print(f"Checked: {checked} | Passed: {checked - failed} | Failed: {failed}")
if checked == 0:
print("\n[WARN] No metrics were compared. Provide source values to validate.")
return True
if all_pass:
print("\n[PASS] All metrics within tolerance.")
else:
print("\n[FAIL] One or more metrics exceed tolerance!")
return all_pass
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate EMS platform data against source API or manual reference values."
)
parser.add_argument(
"--ems-url",
default="http://localhost:8000",
help="Base URL of the EMS backend (default: http://localhost:8000)",
)
parser.add_argument(
"--source-url",
default=None,
help="Base URL of the source API (e.g. iSolarCloud proxy). Not yet implemented — reserved for future use.",
)
parser.add_argument(
"--tolerance",
type=float,
default=0.05,
help="Maximum allowed fractional difference, e.g. 0.05 = 5%% (default: 0.05)",
)
parser.add_argument(
"--username",
default="admin",
help="EMS login username (default: admin)",
)
parser.add_argument(
"--password",
default=None,
help="EMS login password (will prompt if not provided)",
)
parser.add_argument(
"--manual",
action="store_true",
help="Manually enter source reference values interactively",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
password = args.password or getpass.getpass("EMS password: ")
# ---- Authenticate ----
print(f"Connecting to EMS at {args.ems_url} ...")
try:
token = ems_login(args.ems_url, args.username, password)
except requests.HTTPError as exc:
print(f"[ERROR] Login failed: {exc}")
sys.exit(1)
except requests.ConnectionError:
print(f"[ERROR] Cannot connect to {args.ems_url}")
sys.exit(1)
print(" Authenticated successfully.")
# ---- Fetch EMS metrics ----
print("Fetching EMS metrics ...")
ems_metrics = fetch_ems_metrics(args.ems_url, token)
# ---- Get source metrics ----
if args.manual:
source_metrics = prompt_source_values(ems_metrics)
elif args.source_url:
# Placeholder for automated source-API fetching
print(f"[ERROR] Automated source-API mode (--source-url {args.source_url}) is not yet implemented.")
print(" Use --manual mode to enter values interactively.")
sys.exit(1)
else:
print("[ERROR] No source data provided. Use --manual or --source-url.")
sys.exit(1)
# ---- Compare ----
passed = compare_and_report(ems_metrics, source_metrics, args.tolerance)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()