"""EMS 数据验证脚本 — 对比平台数据与源API,确保偏差在容许范围内 Used during deployment buyoff to verify that EMS dashboard numbers match the upstream source (e.g. iSolarCloud). Supports both automated source-API comparison and manual interactive mode. Exit code 0 = all checks within tolerance Exit code 1 = one or more checks exceeded tolerance """ import argparse import getpass import sys from typing import Optional import requests # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _pct_diff(ems_val: float, source_val: float) -> Optional[float]: """Return absolute percentage difference, or None if source is zero.""" if source_val == 0: return None if ems_val == 0 else float("inf") return abs(ems_val - source_val) / abs(source_val) def _fmt_pct(pct: Optional[float]) -> str: if pct is None: return "N/A" if pct == float("inf"): return "INF" return f"{pct * 100:.2f}%" def _status(pct: Optional[float], tolerance: float) -> str: if pct is None: return "SKIP" if pct <= tolerance: return "PASS" return "FAIL" # --------------------------------------------------------------------------- # EMS data fetching # --------------------------------------------------------------------------- def ems_login(base_url: str, username: str, password: str) -> str: """Authenticate against the EMS backend and return a bearer token.""" url = f"{base_url}/api/v1/auth/login" resp = requests.post(url, json={"username": username, "password": password}, timeout=10) resp.raise_for_status() data = resp.json() token = data.get("access_token") or data.get("token") or data.get("data", {}).get("access_token") if not token: raise RuntimeError(f"Login succeeded but no token found in response: {list(data.keys())}") return token def fetch_ems_metrics(base_url: str, token: str) -> dict: """Pull all relevant metrics from the EMS backend.""" headers = {"Authorization": f"Bearer {token}"} metrics: dict = {} # --- /api/v1/dashboard/realtime --- try: resp = requests.get(f"{base_url}/api/v1/dashboard/realtime", headers=headers, timeout=10) resp.raise_for_status() d = resp.json().get("data", resp.json()) metrics["pv_power_kw"] = float(d.get("pv_power", 0)) except Exception as exc: print(f" [warn] dashboard/realtime failed: {exc}") metrics["pv_power_kw"] = None # --- /api/v1/dashboard/overview --- try: resp = requests.get(f"{base_url}/api/v1/dashboard/overview", headers=headers, timeout=10) resp.raise_for_status() d = resp.json().get("data", resp.json()) metrics["energy_today_kwh"] = float(d.get("energy_today", 0)) metrics["total_generation_kwh"] = float(d.get("total_generation", 0)) except Exception as exc: print(f" [warn] dashboard/overview failed: {exc}") metrics["energy_today_kwh"] = None metrics["total_generation_kwh"] = None # --- /api/v1/devices/stats --- try: resp = requests.get(f"{base_url}/api/v1/devices/stats", headers=headers, timeout=10) resp.raise_for_status() d = resp.json().get("data", resp.json()) metrics["online_count"] = int(d.get("online_count", 0)) metrics["total_count"] = int(d.get("total_count", 0)) except Exception as exc: print(f" [warn] devices/stats failed: {exc}") metrics["online_count"] = None metrics["total_count"] = None # --- /api/v1/kpi/solar --- try: resp = requests.get(f"{base_url}/api/v1/kpi/solar", headers=headers, timeout=10) resp.raise_for_status() d = resp.json().get("data", resp.json()) metrics["pr"] = float(d.get("pr", 0)) metrics["equivalent_hours"] = float(d.get("equivalent_hours", 0)) metrics["revenue"] = float(d.get("revenue", 0)) metrics["self_consumption_rate"] = float(d.get("self_consumption_rate", 0)) except Exception as exc: print(f" [warn] kpi/solar failed: {exc}") metrics["pr"] = None metrics["equivalent_hours"] = None metrics["revenue"] = None metrics["self_consumption_rate"] = None return metrics # --------------------------------------------------------------------------- # Source data (manual mode) # --------------------------------------------------------------------------- METRIC_LABELS = { "pv_power_kw": "Real-time PV Power (kW)", "energy_today_kwh": "Today Generation (kWh)", "total_generation_kwh": "Total Generation (kWh)", "online_count": "Devices Online", "total_count": "Devices Total", "pr": "Performance Ratio", "equivalent_hours": "Equivalent Hours (h)", "revenue": "Revenue (CNY)", "self_consumption_rate": "Self-consumption Rate", } def prompt_source_values(ems_metrics: dict) -> dict: """Interactively ask the user for source reference values.""" print("\n--- Manual Source Entry ---") print("Enter the reference value from the source system for each metric.") print("Press Enter to skip a metric.\n") source: dict = {} for key, label in METRIC_LABELS.items(): if ems_metrics.get(key) is None: continue raw = input(f" {label} [{key}]: ").strip() if raw == "": source[key] = None else: try: source[key] = float(raw) except ValueError: print(f" -> invalid number, skipping {key}") source[key] = None return source # --------------------------------------------------------------------------- # Comparison & reporting # --------------------------------------------------------------------------- def compare_and_report(ems: dict, source: dict, tolerance: float) -> bool: """Print a comparison table and return True if all checks pass.""" col_metric = 30 col_val = 14 col_pct = 10 col_st = 6 sep = "-" * (col_metric + col_val * 2 + col_pct + col_st + 8) print("\n" + "=" * len(sep)) print(" EMS Data Validation Report") print("=" * len(sep)) header = ( f"{'Metric':<{col_metric}} " f"{'EMS':>{col_val}} " f"{'Source':>{col_val}} " f"{'Diff%':>{col_pct}} " f"{'Status':>{col_st}}" ) print(header) print(sep) all_pass = True checked = 0 failed = 0 for key, label in METRIC_LABELS.items(): ems_val = ems.get(key) src_val = source.get(key) if ems_val is None or src_val is None: pct = None st = "SKIP" ems_str = str(ems_val) if ems_val is not None else "-" src_str = str(src_val) if src_val is not None else "-" else: pct = _pct_diff(ems_val, src_val) st = _status(pct, tolerance) ems_str = f"{ems_val:.2f}" if isinstance(ems_val, float) else str(ems_val) src_str = f"{src_val:.2f}" if isinstance(src_val, float) else str(src_val) if st == "FAIL": all_pass = False failed += 1 if st != "SKIP": checked += 1 print( f"{label:<{col_metric}} " f"{ems_str:>{col_val}} " f"{src_str:>{col_val}} " f"{_fmt_pct(pct):>{col_pct}} " f"{st:>{col_st}}" ) print(sep) print(f"\nTolerance: {tolerance * 100:.1f}%") print(f"Checked: {checked} | Passed: {checked - failed} | Failed: {failed}") if checked == 0: print("\n[WARN] No metrics were compared. Provide source values to validate.") return True if all_pass: print("\n[PASS] All metrics within tolerance.") else: print("\n[FAIL] One or more metrics exceed tolerance!") return all_pass # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Validate EMS platform data against source API or manual reference values." ) parser.add_argument( "--ems-url", default="http://localhost:8000", help="Base URL of the EMS backend (default: http://localhost:8000)", ) parser.add_argument( "--source-url", default=None, help="Base URL of the source API (e.g. iSolarCloud proxy). Not yet implemented — reserved for future use.", ) parser.add_argument( "--tolerance", type=float, default=0.05, help="Maximum allowed fractional difference, e.g. 0.05 = 5%% (default: 0.05)", ) parser.add_argument( "--username", default="admin", help="EMS login username (default: admin)", ) parser.add_argument( "--password", default=None, help="EMS login password (will prompt if not provided)", ) parser.add_argument( "--manual", action="store_true", help="Manually enter source reference values interactively", ) return parser.parse_args() def main() -> None: args = parse_args() password = args.password or getpass.getpass("EMS password: ") # ---- Authenticate ---- print(f"Connecting to EMS at {args.ems_url} ...") try: token = ems_login(args.ems_url, args.username, password) except requests.HTTPError as exc: print(f"[ERROR] Login failed: {exc}") sys.exit(1) except requests.ConnectionError: print(f"[ERROR] Cannot connect to {args.ems_url}") sys.exit(1) print(" Authenticated successfully.") # ---- Fetch EMS metrics ---- print("Fetching EMS metrics ...") ems_metrics = fetch_ems_metrics(args.ems_url, token) # ---- Get source metrics ---- if args.manual: source_metrics = prompt_source_values(ems_metrics) elif args.source_url: # Placeholder for automated source-API fetching print(f"[ERROR] Automated source-API mode (--source-url {args.source_url}) is not yet implemented.") print(" Use --manual mode to enter values interactively.") sys.exit(1) else: print("[ERROR] No source data provided. Use --manual or --source-url.") sys.exit(1) # ---- Compare ---- passed = compare_and_report(ems_metrics, source_metrics, args.tolerance) sys.exit(0 if passed else 1) if __name__ == "__main__": main()