5 Commits

Author SHA1 Message Date
Du Wenbo
a05b25bcc2 fix: realtime + KPI power dedup by station prefix (v1.4.3)
Realtime endpoint was summing ALL device power readings, causing
double-counting when multiple devices share the same Sungrow station.
E.g. 10 devices × station-level power = 5x inflated total.

Fix: GROUP BY station prefix (first 3 chars of device name) and
take MAX per station. Same fix applied to KPI daily_generation.

Result: 5,550 kW → 1,931 kW (matches iSolarCloud's 2,049 kW
within the 15-min collection timing window).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 14:18:49 +08:00
Du Wenbo
8e5e52e8ee fix: carbon fallback, energy history parsing, generation dedup (v1.4.2)
- Carbon overview: fallback to compute from energy_data × emission_factors
  when carbon_emissions table is empty
- Energy history: parse start_time/end_time as datetime (was raw string → 500)
- Dashboard generation: dedup by station prefix to prevent inflated totals
  (93K → 14.8K kWh)
- Realtime window: already at 20min to cover 15min collector interval

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 09:55:48 +08:00
Du Wenbo
72f4269cd4 fix(models): add alembic migration 009 for missing tables (v1.4.1)
Migration adds tables that existed in models/ but were never
included in alembic history:
- ai_ops: device_health_scores, anomaly_detections, diagnostic_reports,
  maintenance_predictions, ops_insights
- energy_strategy: tou_pricing, tou_pricing_periods, energy_strategies,
  strategy_executions, monthly_cost_reports
- weather: weather_data, weather_config
- prediction: prediction_tasks, prediction_results, optimization_schedules

Without this migration, fresh deploys would 500 on these endpoints:
- /api/v1/ai-ops/health, /ai-ops/dashboard
- /api/v1/strategy/pricing
- /api/v1/prediction/forecast
- /api/v1/weather/current

Discovered during Z-Park demo deployment on xie_openclaw1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 09:47:34 +08:00
Du Wenbo
56132bae32 chore: add validate_data.py for buyoff data accuracy checks
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 19:05:56 +08:00
Du Wenbo
475313855d feat: add version API and solar KPI endpoint (v1.4.0)
New endpoints:
- GET /api/v1/version — returns VERSIONS.json (no auth required)
  For field engineers to check platform version from login page
- GET /api/v1/kpi/solar — returns PR, self-consumption rate,
  equivalent utilization hours, and daily revenue
  Handles station-level vs device-level data deduplication

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 22:35:08 +08:00
13 changed files with 1091 additions and 38 deletions

View File

@@ -1 +1 @@
1.3.0
1.4.1

View File

@@ -1,6 +1,6 @@
{
"project": "ems-core",
"project_version": "1.3.0",
"project_version": "1.4.1",
"last_updated": "2026-04-06",
"notes": "Generic defaults, dashboard energy fallback, PV device type filter fix"
"notes": "Add alembic migration 009 for ai_ops/strategy/weather/prediction tables"
}

View File

@@ -0,0 +1,287 @@
"""Add ai_ops, energy_strategy, weather, prediction tables
Revision ID: 009_aiops_strategy
Revises: 008_management
Create Date: 2026-04-10
Adds tables for features that were previously missing from alembic history:
- AI Ops: device_health_scores, anomaly_detections, diagnostic_reports,
maintenance_predictions, ops_insights
- Energy Strategy: tou_pricing, tou_pricing_periods, energy_strategies,
strategy_executions, monthly_cost_reports
- Weather: weather_data, weather_config
- Prediction: prediction_tasks, prediction_results, optimization_schedules
"""
from alembic import op
import sqlalchemy as sa
revision = "009_aiops_strategy"
down_revision = "008_management"
branch_labels = None
depends_on = None
def upgrade() -> None:
# =========================================================================
# AI Ops tables
# =========================================================================
op.create_table(
"device_health_scores",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id"), nullable=False),
sa.Column("timestamp", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("health_score", sa.Float, nullable=False),
sa.Column("status", sa.String(20), default="healthy"),
sa.Column("factors", sa.JSON),
sa.Column("trend", sa.String(20), default="stable"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_device_health_scores_device_id", "device_health_scores", ["device_id"])
op.create_index("ix_device_health_scores_timestamp", "device_health_scores", ["timestamp"])
op.create_table(
"anomaly_detections",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id"), nullable=False),
sa.Column("detected_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("anomaly_type", sa.String(50), nullable=False),
sa.Column("severity", sa.String(20), default="warning"),
sa.Column("description", sa.Text),
sa.Column("metric_name", sa.String(50)),
sa.Column("expected_value", sa.Float),
sa.Column("actual_value", sa.Float),
sa.Column("deviation_percent", sa.Float),
sa.Column("status", sa.String(20), default="detected"),
sa.Column("resolution_notes", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_anomaly_detections_device_id", "anomaly_detections", ["device_id"])
op.create_index("ix_anomaly_detections_detected_at", "anomaly_detections", ["detected_at"])
op.create_table(
"diagnostic_reports",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id"), nullable=False),
sa.Column("generated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("report_type", sa.String(20), default="routine"),
sa.Column("findings", sa.JSON),
sa.Column("recommendations", sa.JSON),
sa.Column("estimated_impact", sa.JSON),
sa.Column("status", sa.String(20), default="generated"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_diagnostic_reports_device_id", "diagnostic_reports", ["device_id"])
op.create_table(
"maintenance_predictions",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id"), nullable=False),
sa.Column("predicted_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("component", sa.String(100)),
sa.Column("failure_mode", sa.String(200)),
sa.Column("probability", sa.Float),
sa.Column("predicted_failure_date", sa.DateTime(timezone=True)),
sa.Column("recommended_action", sa.Text),
sa.Column("urgency", sa.String(20), default="medium"),
sa.Column("estimated_downtime_hours", sa.Float),
sa.Column("estimated_repair_cost", sa.Float),
sa.Column("status", sa.String(20), default="predicted"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_maintenance_predictions_device_id", "maintenance_predictions", ["device_id"])
op.create_table(
"ops_insights",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("insight_type", sa.String(50), nullable=False),
sa.Column("title", sa.String(200), nullable=False),
sa.Column("description", sa.Text),
sa.Column("data", sa.JSON),
sa.Column("impact_level", sa.String(20), default="medium"),
sa.Column("actionable", sa.Boolean, default=False),
sa.Column("recommended_action", sa.Text),
sa.Column("generated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("valid_until", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# =========================================================================
# Energy Strategy tables
# =========================================================================
op.create_table(
"tou_pricing",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("name", sa.String(200), nullable=False),
sa.Column("region", sa.String(100), default="北京"),
sa.Column("effective_date", sa.Date),
sa.Column("end_date", sa.Date),
sa.Column("is_active", sa.Boolean, default=True),
sa.Column("created_by", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_table(
"tou_pricing_periods",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("pricing_id", sa.Integer, sa.ForeignKey("tou_pricing.id", ondelete="CASCADE"), nullable=False),
sa.Column("period_type", sa.String(20), nullable=False),
sa.Column("start_time", sa.String(10), nullable=False),
sa.Column("end_time", sa.String(10), nullable=False),
sa.Column("price_yuan_per_kwh", sa.Float, nullable=False),
sa.Column("month_range", sa.String(50)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_table(
"energy_strategies",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("name", sa.String(200), nullable=False),
sa.Column("strategy_type", sa.String(50), nullable=False),
sa.Column("description", sa.String(500)),
sa.Column("parameters", sa.JSON),
sa.Column("is_enabled", sa.Boolean, default=False),
sa.Column("priority", sa.Integer, default=0),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_table(
"strategy_executions",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("strategy_id", sa.Integer, sa.ForeignKey("energy_strategies.id"), nullable=False),
sa.Column("date", sa.Date, nullable=False),
sa.Column("actions_taken", sa.JSON),
sa.Column("savings_kwh", sa.Float, default=0),
sa.Column("savings_yuan", sa.Float, default=0),
sa.Column("status", sa.String(20), default="planned"),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_table(
"monthly_cost_reports",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("year_month", sa.String(7), nullable=False, unique=True),
sa.Column("total_consumption_kwh", sa.Float, default=0),
sa.Column("total_cost_yuan", sa.Float, default=0),
sa.Column("peak_consumption", sa.Float, default=0),
sa.Column("valley_consumption", sa.Float, default=0),
sa.Column("flat_consumption", sa.Float, default=0),
sa.Column("sharp_peak_consumption", sa.Float, default=0),
sa.Column("pv_self_consumption", sa.Float, default=0),
sa.Column("pv_feed_in", sa.Float, default=0),
sa.Column("optimized_cost", sa.Float, default=0),
sa.Column("baseline_cost", sa.Float, default=0),
sa.Column("savings_yuan", sa.Float, default=0),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# =========================================================================
# Weather tables
# =========================================================================
op.create_table(
"weather_data",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("timestamp", sa.DateTime(timezone=True), nullable=False),
sa.Column("data_type", sa.String(20), nullable=False),
sa.Column("temperature", sa.Float),
sa.Column("humidity", sa.Float),
sa.Column("solar_radiation", sa.Float),
sa.Column("cloud_cover", sa.Float),
sa.Column("wind_speed", sa.Float),
sa.Column("source", sa.String(20), default="mock"),
sa.Column("fetched_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_weather_data_timestamp", "weather_data", ["timestamp"])
op.create_table(
"weather_config",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("api_provider", sa.String(50), default="mock"),
sa.Column("api_key", sa.String(200)),
sa.Column("location_lat", sa.Float, default=39.9),
sa.Column("location_lon", sa.Float, default=116.4),
sa.Column("fetch_interval_minutes", sa.Integer, default=30),
sa.Column("is_enabled", sa.Boolean, default=True),
sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# =========================================================================
# Prediction tables
# =========================================================================
op.create_table(
"prediction_tasks",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id")),
sa.Column("prediction_type", sa.String(50), nullable=False),
sa.Column("horizon_hours", sa.Integer, default=24),
sa.Column("status", sa.String(20), default="pending"),
sa.Column("parameters", sa.JSON),
sa.Column("error_message", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.Column("completed_at", sa.DateTime(timezone=True)),
)
op.create_table(
"prediction_results",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("task_id", sa.Integer, sa.ForeignKey("prediction_tasks.id"), nullable=False),
sa.Column("timestamp", sa.DateTime(timezone=True), nullable=False),
sa.Column("predicted_value", sa.Float, nullable=False),
sa.Column("confidence_lower", sa.Float),
sa.Column("confidence_upper", sa.Float),
sa.Column("actual_value", sa.Float),
sa.Column("unit", sa.String(20)),
)
op.create_index("ix_prediction_results_task_id", "prediction_results", ["task_id"])
op.create_index("ix_prediction_results_timestamp", "prediction_results", ["timestamp"])
op.create_table(
"optimization_schedules",
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("device_id", sa.Integer, sa.ForeignKey("devices.id")),
sa.Column("date", sa.DateTime(timezone=True), nullable=False),
sa.Column("schedule_data", sa.JSON),
sa.Column("expected_savings_kwh", sa.Float, default=0),
sa.Column("expected_savings_yuan", sa.Float, default=0),
sa.Column("status", sa.String(20), default="pending"),
sa.Column("approved_by", sa.Integer, sa.ForeignKey("users.id")),
sa.Column("approved_at", sa.DateTime(timezone=True)),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index("ix_optimization_schedules_date", "optimization_schedules", ["date"])
def downgrade() -> None:
# Prediction
op.drop_index("ix_optimization_schedules_date", table_name="optimization_schedules")
op.drop_table("optimization_schedules")
op.drop_index("ix_prediction_results_timestamp", table_name="prediction_results")
op.drop_index("ix_prediction_results_task_id", table_name="prediction_results")
op.drop_table("prediction_results")
op.drop_table("prediction_tasks")
# Weather
op.drop_table("weather_config")
op.drop_index("ix_weather_data_timestamp", table_name="weather_data")
op.drop_table("weather_data")
# Energy Strategy
op.drop_table("monthly_cost_reports")
op.drop_table("strategy_executions")
op.drop_table("energy_strategies")
op.drop_table("tou_pricing_periods")
op.drop_table("tou_pricing")
# AI Ops
op.drop_table("ops_insights")
op.drop_index("ix_maintenance_predictions_device_id", table_name="maintenance_predictions")
op.drop_table("maintenance_predictions")
op.drop_index("ix_diagnostic_reports_device_id", table_name="diagnostic_reports")
op.drop_table("diagnostic_reports")
op.drop_index("ix_anomaly_detections_detected_at", table_name="anomaly_detections")
op.drop_index("ix_anomaly_detections_device_id", table_name="anomaly_detections")
op.drop_table("anomaly_detections")
op.drop_index("ix_device_health_scores_timestamp", table_name="device_health_scores")
op.drop_index("ix_device_health_scores_device_id", table_name="device_health_scores")
op.drop_table("device_health_scores")

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding
from app.api.v1 import auth, users, devices, energy, monitoring, alarms, reports, carbon, dashboard, collectors, websocket, audit, settings, charging, quota, cost, maintenance, management, prediction, energy_strategy, weather, ai_ops, branding, version, kpi, meters
api_router = APIRouter(prefix="/api/v1")
@@ -26,3 +26,6 @@ api_router.include_router(energy_strategy.router)
api_router.include_router(weather.router)
api_router.include_router(ai_ops.router)
api_router.include_router(branding.router)
api_router.include_router(version.router)
api_router.include_router(kpi.router)
api_router.include_router(meters.router)

View File

@@ -52,7 +52,10 @@ class ReportGenerate(BaseModel):
@router.get("/overview")
async def carbon_overview(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""碳排放总览"""
"""碳排放总览 - 优先从carbon_emissions表读取为空时从energy_data实时计算"""
from app.models.energy import EnergyData
from app.models.device import Device
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
@@ -70,6 +73,52 @@ async def carbon_overview(db: AsyncSession = Depends(get_db), user: User = Depen
month = await sum_carbon(month_start, now)
year = await sum_carbon(year_start, now)
# Fallback: if carbon_emissions is empty, compute reduction from PV generation
has_carbon_data = (today["emission"] + today["reduction"] +
month["emission"] + month["reduction"] +
year["emission"] + year["reduction"]) > 0
if not has_carbon_data:
# Get grid emission factor (华北电网 0.582 kgCO2/kWh)
factor_q = await db.execute(
select(EmissionFactor.factor).where(
EmissionFactor.energy_type == "electricity"
).order_by(EmissionFactor.id).limit(1)
)
grid_factor = factor_q.scalar() or 0.582 # default fallback
# Compute PV generation from energy_data using latest daily_energy per station
# Device names like AP1xx belong to station 1, AP2xx to station 2
# To avoid double-counting station-level data written to multiple devices,
# we group by station prefix (first 3 chars of device name) and take MAX
async def compute_pv_reduction(start, end):
q = await db.execute(
select(
func.substring(Device.name, text("1"), text("3")).label("station"),
func.max(EnergyData.value).label("max_energy"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= start,
EnergyData.timestamp < end,
EnergyData.data_type == "daily_energy",
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
)
).group_by(text("station"))
)
total_kwh = sum(row[1] or 0 for row in q.all())
# Carbon reduction (kg CO2) = generation (kWh) * grid emission factor
return round(total_kwh * grid_factor / 1000, 4) # convert to tons
today_reduction = await compute_pv_reduction(today_start, now)
month_reduction = await compute_pv_reduction(month_start, now)
year_reduction = await compute_pv_reduction(year_start, now)
today = {"emission": 0, "reduction": today_reduction}
month = {"emission": 0, "reduction": month_reduction}
year = {"emission": 0, "reduction": year_reduction}
# 各scope分布
scope_q = await db.execute(
select(CarbonEmission.scope, func.sum(CarbonEmission.emission))

View File

@@ -40,28 +40,24 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
# Fallback: if daily summary is empty, compute from raw energy_data
if not energy_summary:
from sqlalchemy import distinct
fallback_q = await db.execute(
select(
func.sum(EnergyData.value),
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
)
).group_by(EnergyData.device_id).order_by(EnergyData.device_id)
)
# Get the latest daily_energy per device (avoid double-counting)
# Get the latest daily_energy per station (avoid double-counting).
# The collector writes station-level daily_energy to individual device rows,
# so multiple devices from the same station share the same value.
# Group by station prefix (first 3 chars of device name, e.g. "AP1", "AP2")
# and take MAX per station to deduplicate.
latest_energy_q = await db.execute(
select(
EnergyData.device_id,
func.substring(Device.name, text("1"), text("3")).label("station"),
func.max(EnergyData.value).label("max_energy"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
)
).group_by(EnergyData.device_id)
).group_by(text("station"))
)
total_gen = sum(row[1] or 0 for row in latest_energy_q.all())
if total_gen > 0:
@@ -109,21 +105,58 @@ async def get_overview(db: AsyncSession = Depends(get_db), user: User = Depends(
@router.get("/realtime")
async def get_realtime_data(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""实时功率数据 - 获取最近的采集数据"""
"""实时功率数据 - 获取最近的采集数据,按站去重防止重复计数"""
now = datetime.now(timezone.utc)
five_min_ago = now - timedelta(minutes=5)
latest_q = await db.execute(
select(EnergyData).where(
and_(EnergyData.timestamp >= five_min_ago, EnergyData.data_type == "power")
).order_by(EnergyData.timestamp.desc()).limit(50)
)
data_points = latest_q.scalars().all()
window_start = now - timedelta(minutes=20)
# Get latest power per station (dedup by device name prefix)
# Sungrow collectors report station-level power, so multiple devices
# sharing the same station (AP1xx = Phase 1, AP2xx = Phase 2) report
# identical values. GROUP BY station prefix and take MAX to avoid
# double-counting.
from sqlalchemy import text as sa_text
pv_ids = await _get_pv_device_ids(db)
hp_ids = await _get_hp_device_ids(db)
pv_power = sum(d.value for d in data_points if d.device_id in pv_ids)
heatpump_power = sum(d.value for d in data_points if d.device_id in hp_ids)
# PV power: dedup by station prefix
if pv_ids:
pv_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.device_id.in_(pv_ids),
)
).group_by(sa_text("1"))
)
pv_power = sum(row[1] or 0 for row in pv_q.all())
else:
pv_power = 0
# Heat pump power: dedup by station prefix
if hp_ids:
hp_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("power"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= window_start,
EnergyData.data_type == "power",
EnergyData.device_id.in_(hp_ids),
)
).group_by(sa_text("1"))
)
heatpump_power = sum(row[1] or 0 for row in hp_q.all())
else:
heatpump_power = 0
return {
"timestamp": str(now),

View File

@@ -32,13 +32,27 @@ async def query_history(
user: User = Depends(get_current_user),
):
"""历史数据查询"""
# Parse time strings to datetime for proper PostgreSQL timestamp comparison
start_dt = None
end_dt = None
if start_time:
try:
start_dt = datetime.fromisoformat(start_time)
except ValueError:
start_dt = datetime.strptime(start_time, "%Y-%m-%d")
if end_time:
try:
end_dt = datetime.fromisoformat(end_time)
except ValueError:
end_dt = datetime.strptime(end_time, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
query = select(EnergyData).where(EnergyData.data_type == data_type)
if device_id:
query = query.where(EnergyData.device_id == device_id)
if start_time:
query = query.where(EnergyData.timestamp >= start_time)
if end_time:
query = query.where(EnergyData.timestamp <= end_time)
if start_dt:
query = query.where(EnergyData.timestamp >= start_dt)
if end_dt:
query = query.where(EnergyData.timestamp <= end_dt)
if granularity == "raw":
query = query.order_by(EnergyData.timestamp.desc()).offset((page - 1) * page_size).limit(page_size)
@@ -74,10 +88,10 @@ async def query_history(
).where(EnergyData.data_type == data_type)
if device_id:
agg_query = agg_query.where(EnergyData.device_id == device_id)
if start_time:
agg_query = agg_query.where(EnergyData.timestamp >= start_time)
if end_time:
agg_query = agg_query.where(EnergyData.timestamp <= end_time)
if start_dt:
agg_query = agg_query.where(EnergyData.timestamp >= start_dt)
if end_dt:
agg_query = agg_query.where(EnergyData.timestamp <= end_dt)
agg_query = agg_query.group_by(text('time_bucket')).order_by(text('time_bucket'))
result = await db.execute(agg_query)
return [{"time": str(r[0]), "avg": round(r[1], 2), "max": round(r[2], 2), "min": round(r[3], 2)}

94
backend/app/api/v1/kpi.py Normal file
View File

@@ -0,0 +1,94 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.device import Device
from app.models.energy import EnergyData
from app.models.user import User
router = APIRouter(prefix="/kpi", tags=["关键指标"])
@router.get("/solar")
async def get_solar_kpis(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
"""Solar performance KPIs - PR, self-consumption, equivalent hours, revenue"""
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
# Get PV devices and their rated power
pv_q = await db.execute(
select(Device.id, Device.rated_power).where(
Device.device_type.in_(["pv_inverter", "sungrow_inverter"]),
Device.is_active == True,
)
)
pv_devices = pv_q.all()
pv_ids = [d[0] for d in pv_devices]
total_rated_kw = sum(d[1] or 0 for d in pv_devices) # kW
if not pv_ids or total_rated_kw == 0:
return {
"pr": 0, "self_consumption_rate": 0,
"equivalent_hours": 0, "revenue_today": 0,
"total_rated_kw": 0, "daily_generation_kwh": 0,
}
# Get latest daily_energy per station (dedup by device name prefix)
# Sungrow collectors report station-level data per device, so multiple
# devices sharing the same station report identical values.
# Group by station prefix (first 3 chars of name, e.g. "AP1" vs "AP2")
# and take MAX per station to avoid double-counting.
from sqlalchemy import text as sa_text
daily_gen_q = await db.execute(
select(
func.substring(Device.name, 1, 3).label("station"),
func.max(EnergyData.value).label("max_energy"),
).select_from(EnergyData).join(
Device, EnergyData.device_id == Device.id
).where(
and_(
EnergyData.timestamp >= today_start,
EnergyData.data_type == "daily_energy",
EnergyData.device_id.in_(pv_ids),
)
).group_by(sa_text("1"))
)
daily_values = daily_gen_q.all()
if not daily_values:
daily_generation_kwh = 0
else:
daily_generation_kwh = sum(row[1] or 0 for row in daily_values)
# Performance Ratio (PR) = actual output / (rated capacity * peak sun hours)
# Approximate peak sun hours from time of day (simplified)
hours_since_sunrise = max(0, min(12, (now.hour + now.minute / 60) - 6)) # approx 6am sunrise
theoretical_kwh = total_rated_kw * hours_since_sunrise * 0.8 # 0.8 = typical irradiance factor
pr = (daily_generation_kwh / theoretical_kwh * 100) if theoretical_kwh > 0 else 0
pr = min(100, round(pr, 1)) # Cap at 100%
# Self-consumption rate (without grid export meter, assume 100% self-consumed for now)
# TODO: integrate grid export meter data when available
self_consumption_rate = 100.0
# Equivalent utilization hours = daily generation / rated capacity
equivalent_hours = round(daily_generation_kwh / total_rated_kw, 2) if total_rated_kw > 0 else 0
# Revenue = daily generation * electricity price
# TODO: get actual price from electricity_pricing table
# Default industrial TOU average price in Beijing: ~0.65 CNY/kWh
avg_price = 0.65
revenue_today = round(daily_generation_kwh * avg_price, 2)
return {
"pr": pr,
"self_consumption_rate": round(self_consumption_rate, 1),
"equivalent_hours": equivalent_hours,
"revenue_today": revenue_today,
"total_rated_kw": total_rated_kw,
"daily_generation_kwh": round(daily_generation_kwh, 2),
"avg_price_per_kwh": avg_price,
"pv_device_count": len(pv_ids),
}

View File

@@ -0,0 +1,201 @@
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select, desc
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import get_settings
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.meter import MeterReading
from app.models.user import User
router = APIRouter(prefix="/meters", tags=["电表管理"])
settings = get_settings()
_customer_config = settings.load_customer_config()
# --------------- Pydantic schemas ---------------
class MeterInfo(BaseModel):
id: int
name: str
meter_no: str | None = None
modbus_addr: int | None = None
class MeterReadingResponse(BaseModel):
time: datetime
meter_id: int
meter_name: str | None = None
forward_active_energy: float | None = None
reverse_active_energy: float | None = None
active_power: float | None = None
reactive_power: float | None = None
power_factor: float | None = None
voltage_a: float | None = None
voltage_b: float | None = None
voltage_c: float | None = None
current_a: float | None = None
current_b: float | None = None
current_c: float | None = None
class MeterLatestResponse(BaseModel):
meter: MeterInfo
latest: MeterReadingResponse | None = None
class MeterOverviewItem(BaseModel):
meter: MeterInfo
latest: MeterReadingResponse | None = None
class MeterOverviewResponse(BaseModel):
meters: list[MeterOverviewItem]
total_forward_energy: float
total_reverse_energy: float
total_active_power: float
class MeterListResponse(BaseModel):
items: list[MeterInfo]
total: int
# --------------- Helpers ---------------
def _get_meter_configs() -> list[dict]:
"""Load meter list from customer config.yaml."""
return _customer_config.get("meters", [])
def _meter_config_to_info(cfg: dict) -> MeterInfo:
return MeterInfo(
id=cfg["id"],
name=cfg.get("name", f"Meter-{cfg['id']}"),
meter_no=cfg.get("meter_no"),
modbus_addr=cfg.get("modbus_addr"),
)
def _reading_to_response(r: MeterReading) -> MeterReadingResponse:
return MeterReadingResponse(
time=r.time,
meter_id=r.meter_id,
meter_name=r.meter_name,
forward_active_energy=r.forward_active_energy,
reverse_active_energy=r.reverse_active_energy,
active_power=r.active_power,
reactive_power=r.reactive_power,
power_factor=r.power_factor,
voltage_a=r.voltage_a,
voltage_b=r.voltage_b,
voltage_c=r.voltage_c,
current_a=r.current_a,
current_b=r.current_b,
current_c=r.current_c,
)
# --------------- Endpoints ---------------
@router.get("", response_model=MeterListResponse)
async def list_meters(
user: User = Depends(get_current_user),
):
"""列出所有已配置的电表"""
configs = _get_meter_configs()
items = [_meter_config_to_info(c) for c in configs]
return MeterListResponse(items=items, total=len(items))
@router.get("/overview", response_model=MeterOverviewResponse)
async def meter_overview(
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""仪表盘概览 - 所有电表最新值及汇总"""
configs = _get_meter_configs()
overview_items: list[MeterOverviewItem] = []
total_forward = 0.0
total_reverse = 0.0
total_power = 0.0
for cfg in configs:
info = _meter_config_to_info(cfg)
result = await db.execute(
select(MeterReading)
.where(MeterReading.meter_id == cfg["id"])
.order_by(desc(MeterReading.time))
.limit(1)
)
reading = result.scalar_one_or_none()
latest = _reading_to_response(reading) if reading else None
if reading:
total_forward += reading.forward_active_energy or 0
total_reverse += reading.reverse_active_energy or 0
total_power += reading.active_power or 0
overview_items.append(MeterOverviewItem(meter=info, latest=latest))
return MeterOverviewResponse(
meters=overview_items,
total_forward_energy=total_forward,
total_reverse_energy=total_reverse,
total_active_power=total_power,
)
@router.get("/{meter_id}/latest", response_model=MeterLatestResponse)
async def get_meter_latest(
meter_id: int,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""获取指定电表的最新读数"""
configs = _get_meter_configs()
cfg = next((c for c in configs if c["id"] == meter_id), None)
if not cfg:
raise HTTPException(status_code=404, detail="电表不存在")
result = await db.execute(
select(MeterReading)
.where(MeterReading.meter_id == meter_id)
.order_by(desc(MeterReading.time))
.limit(1)
)
reading = result.scalar_one_or_none()
return MeterLatestResponse(
meter=_meter_config_to_info(cfg),
latest=_reading_to_response(reading) if reading else None,
)
@router.get("/{meter_id}/readings", response_model=list[MeterReadingResponse])
async def get_meter_readings(
meter_id: int,
start: Optional[datetime] = Query(None, description="开始时间 ISO8601"),
end: Optional[datetime] = Query(None, description="结束时间 ISO8601"),
limit: int = Query(100, ge=1, le=10000, description="返回条数上限"),
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user),
):
"""查询历史读数 (时间范围 + limit)"""
configs = _get_meter_configs()
cfg = next((c for c in configs if c["id"] == meter_id), None)
if not cfg:
raise HTTPException(status_code=404, detail="电表不存在")
query = select(MeterReading).where(MeterReading.meter_id == meter_id)
if start:
query = query.where(MeterReading.time >= start)
if end:
query = query.where(MeterReading.time <= end)
query = query.order_by(desc(MeterReading.time)).limit(limit)
result = await db.execute(query)
readings = result.scalars().all()
return [_reading_to_response(r) for r in readings]

View File

@@ -0,0 +1,32 @@
import os
import json
from fastapi import APIRouter
router = APIRouter(prefix="/version", tags=["版本信息"])
@router.get("")
async def get_version():
"""Return platform version information for display on login/dashboard"""
# Read VERSIONS.json from project root (2 levels up from backend/)
backend_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
# Try multiple paths for VERSIONS.json
for path in [
os.path.join(backend_dir, "VERSIONS.json"), # standalone
os.path.join(backend_dir, "..", "VERSIONS.json"), # inside core/ subtree
os.path.join(backend_dir, "..", "..", "VERSIONS.json"), # customer project root
]:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
versions = json.load(f)
return versions
# Fallback: read VERSION file
version_file = os.path.join(backend_dir, "VERSION")
version = "unknown"
if os.path.exists(version_file):
with open(version_file, 'r') as f:
version = f.read().strip()
return {"project_version": version, "project": "ems-core"}

View File

@@ -20,6 +20,7 @@ from app.models.prediction import PredictionTask, PredictionResult, Optimization
from app.models.energy_strategy import TouPricing, TouPricingPeriod, EnergyStrategy, StrategyExecution, MonthlyCostReport
from app.models.weather import WeatherData, WeatherConfig
from app.models.ai_ops import DeviceHealthScore, AnomalyDetection, DiagnosticReport, MaintenancePrediction, OpsInsight
from app.models.meter import MeterReading
__all__ = [
"User", "Role", "AuditLog",
@@ -40,4 +41,5 @@ __all__ = [
"TouPricing", "TouPricingPeriod", "EnergyStrategy", "StrategyExecution", "MonthlyCostReport",
"WeatherData", "WeatherConfig",
"DeviceHealthScore", "AnomalyDetection", "DiagnosticReport", "MaintenancePrediction", "OpsInsight",
"MeterReading",
]

View File

@@ -0,0 +1,24 @@
from sqlalchemy import Column, Integer, String, Float, DateTime, JSON
from sqlalchemy.sql import func
from app.core.database import Base
class MeterReading(Base):
"""电表读数时序数据"""
__tablename__ = "meter_readings"
time = Column(DateTime(timezone=True), primary_key=True, nullable=False)
meter_id = Column(Integer, primary_key=True, nullable=False)
meter_name = Column(String(100))
forward_active_energy = Column(Float) # 正向有功电能 kWh
reverse_active_energy = Column(Float) # 反向有功电能 kWh
active_power = Column(Float) # 有功功率 kW
reactive_power = Column(Float) # 无功功率 kvar
power_factor = Column(Float) # 功率因数
voltage_a = Column(Float) # A相电压 V
voltage_b = Column(Float) # B相电压 V
voltage_c = Column(Float) # C相电压 V
current_a = Column(Float) # A相电流 A
current_b = Column(Float) # B相电流 A
current_c = Column(Float) # C相电流 A
raw_json = Column(JSON) # 原始数据包

314
scripts/validate_data.py Normal file
View File

@@ -0,0 +1,314 @@
"""EMS 数据验证脚本 — 对比平台数据与源API确保偏差在容许范围内
Used during deployment buyoff to verify that EMS dashboard numbers match
the upstream source (e.g. iSolarCloud). Supports both automated source-API
comparison and manual interactive mode.
Exit code 0 = all checks within tolerance
Exit code 1 = one or more checks exceeded tolerance
"""
import argparse
import getpass
import sys
from typing import Optional
import requests
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _pct_diff(ems_val: float, source_val: float) -> Optional[float]:
"""Return absolute percentage difference, or None if source is zero."""
if source_val == 0:
return None if ems_val == 0 else float("inf")
return abs(ems_val - source_val) / abs(source_val)
def _fmt_pct(pct: Optional[float]) -> str:
if pct is None:
return "N/A"
if pct == float("inf"):
return "INF"
return f"{pct * 100:.2f}%"
def _status(pct: Optional[float], tolerance: float) -> str:
if pct is None:
return "SKIP"
if pct <= tolerance:
return "PASS"
return "FAIL"
# ---------------------------------------------------------------------------
# EMS data fetching
# ---------------------------------------------------------------------------
def ems_login(base_url: str, username: str, password: str) -> str:
"""Authenticate against the EMS backend and return a bearer token."""
url = f"{base_url}/api/v1/auth/login"
resp = requests.post(url, json={"username": username, "password": password}, timeout=10)
resp.raise_for_status()
data = resp.json()
token = data.get("access_token") or data.get("token") or data.get("data", {}).get("access_token")
if not token:
raise RuntimeError(f"Login succeeded but no token found in response: {list(data.keys())}")
return token
def fetch_ems_metrics(base_url: str, token: str) -> dict:
"""Pull all relevant metrics from the EMS backend."""
headers = {"Authorization": f"Bearer {token}"}
metrics: dict = {}
# --- /api/v1/dashboard/realtime ---
try:
resp = requests.get(f"{base_url}/api/v1/dashboard/realtime", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["pv_power_kw"] = float(d.get("pv_power", 0))
except Exception as exc:
print(f" [warn] dashboard/realtime failed: {exc}")
metrics["pv_power_kw"] = None
# --- /api/v1/dashboard/overview ---
try:
resp = requests.get(f"{base_url}/api/v1/dashboard/overview", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["energy_today_kwh"] = float(d.get("energy_today", 0))
metrics["total_generation_kwh"] = float(d.get("total_generation", 0))
except Exception as exc:
print(f" [warn] dashboard/overview failed: {exc}")
metrics["energy_today_kwh"] = None
metrics["total_generation_kwh"] = None
# --- /api/v1/devices/stats ---
try:
resp = requests.get(f"{base_url}/api/v1/devices/stats", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["online_count"] = int(d.get("online_count", 0))
metrics["total_count"] = int(d.get("total_count", 0))
except Exception as exc:
print(f" [warn] devices/stats failed: {exc}")
metrics["online_count"] = None
metrics["total_count"] = None
# --- /api/v1/kpi/solar ---
try:
resp = requests.get(f"{base_url}/api/v1/kpi/solar", headers=headers, timeout=10)
resp.raise_for_status()
d = resp.json().get("data", resp.json())
metrics["pr"] = float(d.get("pr", 0))
metrics["equivalent_hours"] = float(d.get("equivalent_hours", 0))
metrics["revenue"] = float(d.get("revenue", 0))
metrics["self_consumption_rate"] = float(d.get("self_consumption_rate", 0))
except Exception as exc:
print(f" [warn] kpi/solar failed: {exc}")
metrics["pr"] = None
metrics["equivalent_hours"] = None
metrics["revenue"] = None
metrics["self_consumption_rate"] = None
return metrics
# ---------------------------------------------------------------------------
# Source data (manual mode)
# ---------------------------------------------------------------------------
METRIC_LABELS = {
"pv_power_kw": "Real-time PV Power (kW)",
"energy_today_kwh": "Today Generation (kWh)",
"total_generation_kwh": "Total Generation (kWh)",
"online_count": "Devices Online",
"total_count": "Devices Total",
"pr": "Performance Ratio",
"equivalent_hours": "Equivalent Hours (h)",
"revenue": "Revenue (CNY)",
"self_consumption_rate": "Self-consumption Rate",
}
def prompt_source_values(ems_metrics: dict) -> dict:
"""Interactively ask the user for source reference values."""
print("\n--- Manual Source Entry ---")
print("Enter the reference value from the source system for each metric.")
print("Press Enter to skip a metric.\n")
source: dict = {}
for key, label in METRIC_LABELS.items():
if ems_metrics.get(key) is None:
continue
raw = input(f" {label} [{key}]: ").strip()
if raw == "":
source[key] = None
else:
try:
source[key] = float(raw)
except ValueError:
print(f" -> invalid number, skipping {key}")
source[key] = None
return source
# ---------------------------------------------------------------------------
# Comparison & reporting
# ---------------------------------------------------------------------------
def compare_and_report(ems: dict, source: dict, tolerance: float) -> bool:
"""Print a comparison table and return True if all checks pass."""
col_metric = 30
col_val = 14
col_pct = 10
col_st = 6
sep = "-" * (col_metric + col_val * 2 + col_pct + col_st + 8)
print("\n" + "=" * len(sep))
print(" EMS Data Validation Report")
print("=" * len(sep))
header = (
f"{'Metric':<{col_metric}} "
f"{'EMS':>{col_val}} "
f"{'Source':>{col_val}} "
f"{'Diff%':>{col_pct}} "
f"{'Status':>{col_st}}"
)
print(header)
print(sep)
all_pass = True
checked = 0
failed = 0
for key, label in METRIC_LABELS.items():
ems_val = ems.get(key)
src_val = source.get(key)
if ems_val is None or src_val is None:
pct = None
st = "SKIP"
ems_str = str(ems_val) if ems_val is not None else "-"
src_str = str(src_val) if src_val is not None else "-"
else:
pct = _pct_diff(ems_val, src_val)
st = _status(pct, tolerance)
ems_str = f"{ems_val:.2f}" if isinstance(ems_val, float) else str(ems_val)
src_str = f"{src_val:.2f}" if isinstance(src_val, float) else str(src_val)
if st == "FAIL":
all_pass = False
failed += 1
if st != "SKIP":
checked += 1
print(
f"{label:<{col_metric}} "
f"{ems_str:>{col_val}} "
f"{src_str:>{col_val}} "
f"{_fmt_pct(pct):>{col_pct}} "
f"{st:>{col_st}}"
)
print(sep)
print(f"\nTolerance: {tolerance * 100:.1f}%")
print(f"Checked: {checked} | Passed: {checked - failed} | Failed: {failed}")
if checked == 0:
print("\n[WARN] No metrics were compared. Provide source values to validate.")
return True
if all_pass:
print("\n[PASS] All metrics within tolerance.")
else:
print("\n[FAIL] One or more metrics exceed tolerance!")
return all_pass
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Validate EMS platform data against source API or manual reference values."
)
parser.add_argument(
"--ems-url",
default="http://localhost:8000",
help="Base URL of the EMS backend (default: http://localhost:8000)",
)
parser.add_argument(
"--source-url",
default=None,
help="Base URL of the source API (e.g. iSolarCloud proxy). Not yet implemented — reserved for future use.",
)
parser.add_argument(
"--tolerance",
type=float,
default=0.05,
help="Maximum allowed fractional difference, e.g. 0.05 = 5%% (default: 0.05)",
)
parser.add_argument(
"--username",
default="admin",
help="EMS login username (default: admin)",
)
parser.add_argument(
"--password",
default=None,
help="EMS login password (will prompt if not provided)",
)
parser.add_argument(
"--manual",
action="store_true",
help="Manually enter source reference values interactively",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
password = args.password or getpass.getpass("EMS password: ")
# ---- Authenticate ----
print(f"Connecting to EMS at {args.ems_url} ...")
try:
token = ems_login(args.ems_url, args.username, password)
except requests.HTTPError as exc:
print(f"[ERROR] Login failed: {exc}")
sys.exit(1)
except requests.ConnectionError:
print(f"[ERROR] Cannot connect to {args.ems_url}")
sys.exit(1)
print(" Authenticated successfully.")
# ---- Fetch EMS metrics ----
print("Fetching EMS metrics ...")
ems_metrics = fetch_ems_metrics(args.ems_url, token)
# ---- Get source metrics ----
if args.manual:
source_metrics = prompt_source_values(ems_metrics)
elif args.source_url:
# Placeholder for automated source-API fetching
print(f"[ERROR] Automated source-API mode (--source-url {args.source_url}) is not yet implemented.")
print(" Use --manual mode to enter values interactively.")
sys.exit(1)
else:
print("[ERROR] No source data provided. Use --manual or --source-url.")
sys.exit(1)
# ---- Compare ----
passed = compare_and_report(ems_metrics, source_metrics, args.tolerance)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()