2 Commits
v1.3.0 ... main

Author SHA1 Message Date
Du Wenbo
56132bae32 chore: add validate_data.py for buyoff data accuracy checks
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 19:05:56 +08:00
Du Wenbo
475313855d feat: add version API and solar KPI endpoint (v1.4.0)
New endpoints:
- GET /api/v1/version — returns VERSIONS.json (no auth required)
  For field engineers to check platform version from login page
- GET /api/v1/kpi/solar — returns PR, self-consumption rate,
  equivalent utilization hours, and daily revenue
  Handles station-level vs device-level data deduplication

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 22:35:08 +08:00
6 changed files with 445 additions and 4 deletions

View File

@@ -1 +1 @@
1.3.0 1.4.0

View File

@@ -1,6 +1,6 @@
{ {
"project": "ems-core", "project": "ems-core",
"project_version": "1.3.0", "project_version": "1.4.0",
"last_updated": "2026-04-06", "last_updated": "2026-04-06",
"notes": "Generic defaults, dashboard energy fallback, PV device type filter fix" "notes": "Version API, solar KPI endpoint (PR, equiv hours, revenue, self-consumption)"
} }

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter from fastapi import APIRouter
from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding, version, kpi
api_router = APIRouter(prefix="/api/v1") api_router = APIRouter(prefix="/api/v1")
@@ -26,3 +26,5 @@ api_router.include_router(energy_strategy.router)
api_router.include_router(weather.router) api_router.include_router(weather.router)
api_router.include_router(ai_ops.router) api_router.include_router(ai_ops.router)
api_router.include_router(branding.router) api_router.include_router(branding.router)
api_router.include_router(version.router)
api_router.include_router(kpi.router)

93
backend/app/api/v1/kpi.py Normal file
View File

@@ -0,0 +1,93 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.device import Device
from app.models.energy import EnergyData
from app.models.user import User
router = APIRouter(prefix="/kpi", tags=["关键指标"])
@router.get("/solar")
async def get_solar_kpis(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""Solar performance KPIs - PR, self-consumption, equivalent hours, revenue"""
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Get PV devices and their rated power
pv_q = await db.execute(
select(Device.id, Device.rated_power).where(
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
Device.is_active == True,
)
)
pv_devices = pv_q.all()
pv_ids = [d[0] for d in pv_devices]
total_rated_kw = sum(d[1] or 0 for d in pv_devices) # kW
if not pv_ids or total_rated_kw == 0:
return {
"pr": 0, "self_consumption_rate": 0,
"equivalent_hours": 0, "revenue_today": 0,
"total_rated_kw": 0, "daily_generation_kwh": 0,
}
# Get latest daily_energy per PV device for today
daily_gen_q = await db.execute(
select(
EnergyData.device_id,
func.max(EnergyData.value).label("max_energy"),
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
EnergyData.device_id.in_(pv_ids),
)
).group_by(EnergyData.device_id)
)
# Check if values are station-level (all identical) or device-level
daily_values = daily_gen_q.all()
if not daily_values:
daily_generation_kwh = 0
else:
values = [row[1] or 0 for row in daily_values]
# If all values are identical, it's station-level data — use max (not sum)
if len(set(values)) == 1 and len(values) > 1:
daily_generation_kwh = values[0]
else:
daily_generation_kwh = sum(values)
# Performance Ratio (PR) = actual output / (rated capacity * peak sun hours)
# Approximate peak sun hours from time of day (simplified)
hours_since_sunrise = max(0, min(12, (now.hour + now.minute / 60) - 6)) # approx 6am sunrise
theoretical_kwh = total_rated_kw * hours_since_sunrise * 0.8 # 0.8 = typical irradiance factor
pr = (daily_generation_kwh / theoretical_kwh * 100) if theoretical_kwh > 0 else 0
pr = min(100, round(pr, 1)) # Cap at 100%
# Self-consumption rate (without grid export meter, assume 100% self-consumed for now)
# TODO: integrate grid export meter data when available
self_consumption_rate = 100.0
# Equivalent utilization hours = daily generation / rated capacity
equivalent_hours = round(daily_generation_kwh / total_rated_kw, 2) if total_rated_kw > 0 else 0
# Revenue = daily generation * electricity price
# TODO: get actual price from electricity_pricing table
# Default industrial TOU average price in Beijing: ~0.65 CNY/kWh
avg_price = 0.65
revenue_today = round(daily_generation_kwh * avg_price, 2)
return {
"pr": pr,
"self_consumption_rate": round(self_consumption_rate, 1),
"equivalent_hours": equivalent_hours,
"revenue_today": revenue_today,
"total_rated_kw": total_rated_kw,
"daily_generation_kwh": round(daily_generation_kwh, 2),
"avg_price_per_kwh": avg_price,
"pv_device_count": len(pv_ids),
}

View File

@@ -0,0 +1,32 @@
import os
import json
from fastapi import APIRouter
router = APIRouter(prefix="/version", tags=["版本信息"])
@router.get("")
async def get_version():
"""Return platform version information for display on login/dashboard"""
# Read VERSIONS.json from project root (2 levels up from backend/)
backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# Try multiple paths for VERSIONS.json
for path in [
os.path.join(backend_dir, "VERSIONS.json"), # standalone
os.path.join(backend_dir, "..", "VERSIONS.json"), # inside core/ subtree
os.path.join(backend_dir, "..", "..", "VERSIONS.json"), # customer project root
]:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
versions = json.load(f)
return versions
# Fallback: read VERSION file
version_file = os.path.join(backend_dir, "VERSION")
version = "unknown"
if os.path.exists(version_file):
with open(version_file, 'r') as f:
version = f.read().strip()
return {"project_version": version, "project": "ems-core"}

314
scripts/validate_data.py Normal file
View File

@@ -0,0 +1,314 @@
"""EMS 数据验证脚本 — 对比平台数据与源API确保偏差在容许范围内
Used during deployment buyoff to verify that EMS dashboard numbers match
the upstream source (e.g. iSolarCloud). Supports both automated source-API
comparison and manual interactive mode.
Exit code 0 = all checks within tolerance
Exit code 1 = one or more checks exceeded tolerance
"""
import argparse
import getpass
import sys
from typing import Optional
import requests
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _pct_diff(ems_val: float, source_val: float) -> Optional[float]:
"""Return absolute percentage difference, or None if source is zero."""
if source_val == 0:
return None if ems_val == 0 else float("inf")
return abs(ems_val - source_val) / abs(source_val)
def _fmt_pct(pct: Optional[float]) -> str:
if pct is None:
return "N/A"
if pct == float("inf"):
return "INF"
return f"{pct * 100:.2f}%"
def _status(pct: Optional[float], tolerance: float) -> str:
if pct is None:
return "SKIP"
if pct <= tolerance:
return "PASS"
return "FAIL"
# ---------------------------------------------------------------------------
# EMS data fetching
# ---------------------------------------------------------------------------
def ems_login(base_url: str, username: str, password: str) -> str:
"""Authenticate against the EMS backend and return a bearer token."""
url = f"{base_url}/api/v1/auth/login"
resp = requests.post(url, json={"username": username, "password": password}, timeout=10)
resp.raise_for_status()
data = resp.json()
token = data.get("access_token") or data.get("token") or data.get("data", {}).get("access_token")
if not token:
raise RuntimeError(f"Login succeeded but no token found in response: {list(data.keys())}")
return token
def fetch_ems_metrics(base_url: str, token: str) -> dict:
"""Pull all relevant metrics from the EMS backend."""
headers = {"Authorization": f"Bearer {token}"}
metrics: dict = {}
# --- /api/v1/dashboard/realtime ---
try:
resp = requests.get(f"{base_url}/api/v1/dashboard/realtime", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["pv_power_kw"] = float(d.get("pv_power", 0))
except Exception as exc:
print(f" [warn] dashboard/realtime failed: {exc}")
metrics["pv_power_kw"] = None
# --- /api/v1/dashboard/overview ---
try:
resp = requests.get(f"{base_url}/api/v1/dashboard/overview", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["energy_today_kwh"] = float(d.get("energy_today", 0))
metrics["total_generation_kwh"] = float(d.get("total_generation", 0))
except Exception as exc:
print(f" [warn] dashboard/overview failed: {exc}")
metrics["energy_today_kwh"] = None
metrics["total_generation_kwh"] = None
# --- /api/v1/devices/stats ---
try:
resp = requests.get(f"{base_url}/api/v1/devices/stats", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["online_count"] = int(d.get("online_count", 0))
metrics["total_count"] = int(d.get("total_count", 0))
except Exception as exc:
print(f" [warn] devices/stats failed: {exc}")
metrics["online_count"] = None
metrics["total_count"] = None
# --- /api/v1/kpi/solar ---
try:
resp = requests.get(f"{base_url}/api/v1/kpi/solar", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["pr"] = float(d.get("pr", 0))
metrics["equivalent_hours"] = float(d.get("equivalent_hours", 0))
metrics["revenue"] = float(d.get("revenue", 0))
metrics["self_consumption_rate"] = float(d.get("self_consumption_rate", 0))
except Exception as exc:
print(f" [warn] kpi/solar failed: {exc}")
metrics["pr"] = None
metrics["equivalent_hours"] = None
metrics["revenue"] = None
metrics["self_consumption_rate"] = None
return metrics
# ---------------------------------------------------------------------------
# Source data (manual mode)
# ---------------------------------------------------------------------------
METRIC_LABELS = {
"pv_power_kw": "Real-time PV Power (kW)",
"energy_today_kwh": "Today Generation (kWh)",
"total_generation_kwh": "Total Generation (kWh)",
"online_count": "Devices Online",
"total_count": "Devices Total",
"pr": "Performance Ratio",
"equivalent_hours": "Equivalent Hours (h)",
"revenue": "Revenue (CNY)",
"self_consumption_rate": "Self-consumption Rate",
}
def prompt_source_values(ems_metrics: dict) -> dict:
"""Interactively ask the user for source reference values."""
print("\n--- Manual Source Entry ---")
print("Enter the reference value from the source system for each metric.")
print("Press Enter to skip a metric.\n")
source: dict = {}
for key, label in METRIC_LABELS.items():
if ems_metrics.get(key) is None:
continue
raw = input(f" {label} [{key}]: ").strip()
if raw == "":
source[key] = None
else:
try:
source[key] = float(raw)
except ValueError:
print(f" -> invalid number, skipping {key}")
source[key] = None
return source
# ---------------------------------------------------------------------------
# Comparison & reporting
# ---------------------------------------------------------------------------
def compare_and_report(ems: dict, source: dict, tolerance: float) -> bool:
"""Print a comparison table and return True if all checks pass."""
col_metric = 30
col_val = 14
col_pct = 10
col_st = 6
sep = "-" * (col_metric + col_val * 2 + col_pct + col_st + 8)
print("\n" + "=" * len(sep))
print(" EMS Data Validation Report")
print("=" * len(sep))
header = (
f"{'Metric':<{col_metric}} "
f"{'EMS':>{col_val}} "
f"{'Source':>{col_val}} "
f"{'Diff%':>{col_pct}} "
f"{'Status':>{col_st}}"
)
print(header)
print(sep)
all_pass = True
checked = 0
failed = 0
for key, label in METRIC_LABELS.items():
ems_val = ems.get(key)
src_val = source.get(key)
if ems_val is None or src_val is None:
pct = None
st = "SKIP"
ems_str = str(ems_val) if ems_val is not None else "-"
src_str = str(src_val) if src_val is not None else "-"
else:
pct = _pct_diff(ems_val, src_val)
st = _status(pct, tolerance)
ems_str = f"{ems_val:.2f}" if isinstance(ems_val, float) else str(ems_val)
src_str = f"{src_val:.2f}" if isinstance(src_val, float) else str(src_val)
if st == "FAIL":
all_pass = False
failed += 1
if st != "SKIP":
checked += 1
print(
f"{label:<{col_metric}} "
f"{ems_str:>{col_val}} "
f"{src_str:>{col_val}} "
f"{_fmt_pct(pct):>{col_pct}} "
f"{st:>{col_st}}"
)
print(sep)
print(f"\nTolerance: {tolerance * 100:.1f}%")
print(f"Checked: {checked} | Passed: {checked - failed} | Failed: {failed}")
if checked == 0:
print("\n[WARN] No metrics were compared. Provide source values to validate.")
return True
if all_pass:
print("\n[PASS] All metrics within tolerance.")
else:
print("\n[FAIL] One or more metrics exceed tolerance!")
return all_pass
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate EMS platform data against source API or manual reference values."
)
parser.add_argument(
"--ems-url",
default="http://localhost:8000",
help="Base URL of the EMS backend (default: http://localhost:8000)",
)
parser.add_argument(
"--source-url",
default=None,
help="Base URL of the source API (e.g. iSolarCloud proxy). Not yet implemented — reserved for future use.",
)
parser.add_argument(
"--tolerance",
type=float,
default=0.05,
help="Maximum allowed fractional difference, e.g. 0.05 = 5%% (default: 0.05)",
)
parser.add_argument(
"--username",
default="admin",
help="EMS login username (default: admin)",
)
parser.add_argument(
"--password",
default=None,
help="EMS login password (will prompt if not provided)",
)
parser.add_argument(
"--manual",
action="store_true",
help="Manually enter source reference values interactively",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
password = args.password or getpass.getpass("EMS password: ")
# ---- Authenticate ----
print(f"Connecting to EMS at {args.ems_url} ...")
try:
token = ems_login(args.ems_url, args.username, password)
except requests.HTTPError as exc:
print(f"[ERROR] Login failed: {exc}")
sys.exit(1)
except requests.ConnectionError:
print(f"[ERROR] Cannot connect to {args.ems_url}")
sys.exit(1)
print(" Authenticated successfully.")
# ---- Fetch EMS metrics ----
print("Fetching EMS metrics ...")
ems_metrics = fetch_ems_metrics(args.ems_url, token)
# ---- Get source metrics ----
if args.manual:
source_metrics = prompt_source_values(ems_metrics)
elif args.source_url:
# Placeholder for automated source-API fetching
print(f"[ERROR] Automated source-API mode (--source-url {args.source_url}) is not yet implemented.")
print(" Use --manual mode to enter values interactively.")
sys.exit(1)
else:
print("[ERROR] No source data provided. Use --manual or --source-url.")
sys.exit(1)
# ---- Compare ----
passed = compare_and_report(ems_metrics, source_metrics, args.tolerance)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()