From 56132bae3290f73c1ad69ae414b5c9ada26ec918 Mon Sep 17 00:00:00 2001 From: Du Wenbo Date: Tue, 7 Apr 2026 19:05:56 +0800 Subject: [PATCH] chore: add validate_data.py for buyoff data accuracy checks Co-Authored-By: Claude Opus 4.6 (1M context) --- scripts/validate_data.py | 314 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 314 insertions(+) create mode 100644 scripts/validate_data.py diff --git a/scripts/validate_data.py b/scripts/validate_data.py new file mode 100644 index 0000000..bf8f446 --- /dev/null +++ b/scripts/validate_data.py @@ -0,0 +1,314 @@ +"""EMS 数据验证脚本 — 对比平台数据与源API,确保偏差在容许范围内 + +Used during deployment buyoff to verify that EMS dashboard numbers match +the upstream source (e.g. iSolarCloud). Supports both automated source-API +comparison and manual interactive mode. + +Exit code 0 = all checks within tolerance +Exit code 1 = one or more checks exceeded tolerance +""" + +import argparse +import getpass +import sys +from typing import Optional + +import requests + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _pct_diff(ems_val: float, source_val: float) -> Optional[float]: + """Return absolute percentage difference, or None if source is zero.""" + if source_val == 0: + return None if ems_val == 0 else float("inf") + return abs(ems_val - source_val) / abs(source_val) + + +def _fmt_pct(pct: Optional[float]) -> str: + if pct is None: + return "N/A" + if pct == float("inf"): + return "INF" + return f"{pct * 100:.2f}%" + + +def _status(pct: Optional[float], tolerance: float) -> str: + if pct is None: + return "SKIP" + if pct <= tolerance: + return "PASS" + return "FAIL" + + +# --------------------------------------------------------------------------- +# EMS data fetching +# --------------------------------------------------------------------------- + +def ems_login(base_url: str, username: str, password: str) -> str: + """Authenticate against the EMS backend and return a bearer token.""" + url = f"{base_url}/api/v1/auth/login" + resp = requests.post(url, json={"username": username, "password": password}, timeout=10) + resp.raise_for_status() + data = resp.json() + token = data.get("access_token") or data.get("token") or data.get("data", {}).get("access_token") + if not token: + raise RuntimeError(f"Login succeeded but no token found in response: {list(data.keys())}") + return token + + +def fetch_ems_metrics(base_url: str, token: str) -> dict: + """Pull all relevant metrics from the EMS backend.""" + headers = {"Authorization": f"Bearer {token}"} + metrics: dict = {} + + # --- /api/v1/dashboard/realtime --- + try: + resp = requests.get(f"{base_url}/api/v1/dashboard/realtime", headers=headers, timeout=10) + resp.raise_for_status() + d = resp.json().get("data", resp.json()) + metrics["pv_power_kw"] = float(d.get("pv_power", 0)) + except Exception as exc: + print(f" [warn] dashboard/realtime failed: {exc}") + metrics["pv_power_kw"] = None + + # --- /api/v1/dashboard/overview --- + try: + resp = requests.get(f"{base_url}/api/v1/dashboard/overview", headers=headers, timeout=10) + resp.raise_for_status() + d = resp.json().get("data", resp.json()) + metrics["energy_today_kwh"] = float(d.get("energy_today", 0)) + metrics["total_generation_kwh"] = float(d.get("total_generation", 0)) + except Exception as exc: + print(f" [warn] dashboard/overview failed: {exc}") + metrics["energy_today_kwh"] = None + metrics["total_generation_kwh"] = None + + # --- /api/v1/devices/stats --- + try: + resp = requests.get(f"{base_url}/api/v1/devices/stats", headers=headers, timeout=10) + resp.raise_for_status() + d = resp.json().get("data", resp.json()) + metrics["online_count"] = int(d.get("online_count", 0)) + metrics["total_count"] = int(d.get("total_count", 0)) + except Exception as exc: + print(f" [warn] devices/stats failed: {exc}") + metrics["online_count"] = None + metrics["total_count"] = None + + # --- /api/v1/kpi/solar --- + try: + resp = requests.get(f"{base_url}/api/v1/kpi/solar", headers=headers, timeout=10) + resp.raise_for_status() + d = resp.json().get("data", resp.json()) + metrics["pr"] = float(d.get("pr", 0)) + metrics["equivalent_hours"] = float(d.get("equivalent_hours", 0)) + metrics["revenue"] = float(d.get("revenue", 0)) + metrics["self_consumption_rate"] = float(d.get("self_consumption_rate", 0)) + except Exception as exc: + print(f" [warn] kpi/solar failed: {exc}") + metrics["pr"] = None + metrics["equivalent_hours"] = None + metrics["revenue"] = None + metrics["self_consumption_rate"] = None + + return metrics + + +# --------------------------------------------------------------------------- +# Source data (manual mode) +# --------------------------------------------------------------------------- + +METRIC_LABELS = { + "pv_power_kw": "Real-time PV Power (kW)", + "energy_today_kwh": "Today Generation (kWh)", + "total_generation_kwh": "Total Generation (kWh)", + "online_count": "Devices Online", + "total_count": "Devices Total", + "pr": "Performance Ratio", + "equivalent_hours": "Equivalent Hours (h)", + "revenue": "Revenue (CNY)", + "self_consumption_rate": "Self-consumption Rate", +} + + +def prompt_source_values(ems_metrics: dict) -> dict: + """Interactively ask the user for source reference values.""" + print("\n--- Manual Source Entry ---") + print("Enter the reference value from the source system for each metric.") + print("Press Enter to skip a metric.\n") + source: dict = {} + for key, label in METRIC_LABELS.items(): + if ems_metrics.get(key) is None: + continue + raw = input(f" {label} [{key}]: ").strip() + if raw == "": + source[key] = None + else: + try: + source[key] = float(raw) + except ValueError: + print(f" -> invalid number, skipping {key}") + source[key] = None + return source + + +# --------------------------------------------------------------------------- +# Comparison & reporting +# --------------------------------------------------------------------------- + +def compare_and_report(ems: dict, source: dict, tolerance: float) -> bool: + """Print a comparison table and return True if all checks pass.""" + col_metric = 30 + col_val = 14 + col_pct = 10 + col_st = 6 + sep = "-" * (col_metric + col_val * 2 + col_pct + col_st + 8) + + print("\n" + "=" * len(sep)) + print(" EMS Data Validation Report") + print("=" * len(sep)) + header = ( + f"{'Metric':<{col_metric}} " + f"{'EMS':>{col_val}} " + f"{'Source':>{col_val}} " + f"{'Diff%':>{col_pct}} " + f"{'Status':>{col_st}}" + ) + print(header) + print(sep) + + all_pass = True + checked = 0 + failed = 0 + + for key, label in METRIC_LABELS.items(): + ems_val = ems.get(key) + src_val = source.get(key) + + if ems_val is None or src_val is None: + pct = None + st = "SKIP" + ems_str = str(ems_val) if ems_val is not None else "-" + src_str = str(src_val) if src_val is not None else "-" + else: + pct = _pct_diff(ems_val, src_val) + st = _status(pct, tolerance) + ems_str = f"{ems_val:.2f}" if isinstance(ems_val, float) else str(ems_val) + src_str = f"{src_val:.2f}" if isinstance(src_val, float) else str(src_val) + + if st == "FAIL": + all_pass = False + failed += 1 + if st != "SKIP": + checked += 1 + + print( + f"{label:<{col_metric}} " + f"{ems_str:>{col_val}} " + f"{src_str:>{col_val}} " + f"{_fmt_pct(pct):>{col_pct}} " + f"{st:>{col_st}}" + ) + + print(sep) + print(f"\nTolerance: {tolerance * 100:.1f}%") + print(f"Checked: {checked} | Passed: {checked - failed} | Failed: {failed}") + + if checked == 0: + print("\n[WARN] No metrics were compared. Provide source values to validate.") + return True + + if all_pass: + print("\n[PASS] All metrics within tolerance.") + else: + print("\n[FAIL] One or more metrics exceed tolerance!") + + return all_pass + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Validate EMS platform data against source API or manual reference values." + ) + parser.add_argument( + "--ems-url", + default="http://localhost:8000", + help="Base URL of the EMS backend (default: http://localhost:8000)", + ) + parser.add_argument( + "--source-url", + default=None, + help="Base URL of the source API (e.g. iSolarCloud proxy). Not yet implemented — reserved for future use.", + ) + parser.add_argument( + "--tolerance", + type=float, + default=0.05, + help="Maximum allowed fractional difference, e.g. 0.05 = 5%% (default: 0.05)", + ) + parser.add_argument( + "--username", + default="admin", + help="EMS login username (default: admin)", + ) + parser.add_argument( + "--password", + default=None, + help="EMS login password (will prompt if not provided)", + ) + parser.add_argument( + "--manual", + action="store_true", + help="Manually enter source reference values interactively", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + + password = args.password or getpass.getpass("EMS password: ") + + # ---- Authenticate ---- + print(f"Connecting to EMS at {args.ems_url} ...") + try: + token = ems_login(args.ems_url, args.username, password) + except requests.HTTPError as exc: + print(f"[ERROR] Login failed: {exc}") + sys.exit(1) + except requests.ConnectionError: + print(f"[ERROR] Cannot connect to {args.ems_url}") + sys.exit(1) + + print(" Authenticated successfully.") + + # ---- Fetch EMS metrics ---- + print("Fetching EMS metrics ...") + ems_metrics = fetch_ems_metrics(args.ems_url, token) + + # ---- Get source metrics ---- + if args.manual: + source_metrics = prompt_source_values(ems_metrics) + elif args.source_url: + # Placeholder for automated source-API fetching + print(f"[ERROR] Automated source-API mode (--source-url {args.source_url}) is not yet implemented.") + print(" Use --manual mode to enter values interactively.") + sys.exit(1) + else: + print("[ERROR] No source data provided. Use --manual or --source-url.") + sys.exit(1) + + # ---- Compare ---- + passed = compare_and_report(ems_metrics, source_metrics, args.tolerance) + sys.exit(0 if passed else 1) + + +if __name__ == "__main__": + main()