from datetime import date, datetime from fastapi import APIRouter, Depends, Query, HTTPException from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel from app.core.database import get_db from app.core.deps import get_current_user from app.models.energy_strategy import ( TouPricing, TouPricingPeriod, EnergyStrategy, StrategyExecution, ) from app.models.user import User from app.services.energy_strategy import ( get_active_tou_pricing, get_tou_periods, calculate_monthly_cost_breakdown, get_recommendations, get_savings_report, simulate_strategy_impact, DEFAULT_PERIODS, PERIOD_LABELS, ) router = APIRouter(prefix="/strategy", tags=["策略优化"]) # ---- Schemas ---- class TouPricingCreate(BaseModel): name: str region: str = "北京" effective_date: str | None = None end_date: str | None = None class TouPricingPeriodCreate(BaseModel): period_type: str # sharp_peak, peak, flat, valley start_time: str # HH:MM end_time: str # HH:MM price_yuan_per_kwh: float month_range: str | None = None class TouPricingPeriodsSet(BaseModel): periods: list[TouPricingPeriodCreate] class EnergyStrategyCreate(BaseModel): name: str strategy_type: str # heat_storage, load_shift, pv_priority description: str | None = None parameters: dict | None = None priority: int = 0 class EnergyStrategyUpdate(BaseModel): name: str | None = None description: str | None = None parameters: dict | None = None is_enabled: bool | None = None priority: int | None = None class SimulateRequest(BaseModel): daily_consumption_kwh: float = 2000 pv_daily_kwh: float = 800 strategies: list[str] = ["heat_storage", "pv_priority", "load_shift"] # ---- TOU Pricing ---- @router.get("/pricing") async def list_tou_pricing( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取分时电价配置列表""" result = await db.execute( select(TouPricing).order_by(TouPricing.created_at.desc()) ) pricings = result.scalars().all() items = [] for p in pricings: periods = await get_tou_periods(db, p.id) items.append({ "id": p.id, "name": p.name, "region": p.region, "effective_date": str(p.effective_date) if p.effective_date else None, "end_date": str(p.end_date) if p.end_date else None, "is_active": p.is_active, "created_at": str(p.created_at), "periods": [ { "id": pp.id, "period_type": pp.period_type, "period_label": PERIOD_LABELS.get(pp.period_type, pp.period_type), "start_time": pp.start_time, "end_time": pp.end_time, "price_yuan_per_kwh": pp.price_yuan_per_kwh, "month_range": pp.month_range, } for pp in periods ], }) return items @router.post("/pricing") async def create_tou_pricing( data: TouPricingCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """创建分时电价配置""" pricing = TouPricing( name=data.name, region=data.region, effective_date=date.fromisoformat(data.effective_date) if data.effective_date else None, end_date=date.fromisoformat(data.end_date) if data.end_date else None, created_by=user.id, ) db.add(pricing) await db.flush() return {"id": pricing.id, "message": "分时电价配置创建成功"} @router.put("/pricing/{pricing_id}") async def update_tou_pricing( pricing_id: int, data: TouPricingCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """更新分时电价配置""" result = await db.execute(select(TouPricing).where(TouPricing.id == pricing_id)) pricing = result.scalar_one_or_none() if not pricing: raise HTTPException(status_code=404, detail="电价配置不存在") pricing.name = data.name pricing.region = data.region pricing.effective_date = date.fromisoformat(data.effective_date) if data.effective_date else None pricing.end_date = date.fromisoformat(data.end_date) if data.end_date else None return {"message": "电价配置更新成功"} @router.get("/pricing/{pricing_id}/periods") async def get_pricing_periods( pricing_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取电价时段""" periods = await get_tou_periods(db, pricing_id) return [ { "id": p.id, "period_type": p.period_type, "period_label": PERIOD_LABELS.get(p.period_type, p.period_type), "start_time": p.start_time, "end_time": p.end_time, "price_yuan_per_kwh": p.price_yuan_per_kwh, "month_range": p.month_range, } for p in periods ] @router.post("/pricing/{pricing_id}/periods") async def set_pricing_periods( pricing_id: int, data: TouPricingPeriodsSet, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """设置电价时段(替换所有现有时段)""" result = await db.execute(select(TouPricing).where(TouPricing.id == pricing_id)) if not result.scalar_one_or_none(): raise HTTPException(status_code=404, detail="电价配置不存在") # Delete existing periods existing = await db.execute( select(TouPricingPeriod).where(TouPricingPeriod.pricing_id == pricing_id) ) for p in existing.scalars().all(): await db.delete(p) # Create new periods for period in data.periods: pp = TouPricingPeriod( pricing_id=pricing_id, period_type=period.period_type, start_time=period.start_time, end_time=period.end_time, price_yuan_per_kwh=period.price_yuan_per_kwh, month_range=period.month_range, ) db.add(pp) return {"message": f"已设置{len(data.periods)}个时段"} # ---- Strategies ---- @router.get("/strategies") async def list_strategies( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取优化策略列表""" result = await db.execute( select(EnergyStrategy).order_by(EnergyStrategy.priority.desc()) ) strategies = result.scalars().all() if not strategies: # Return defaults return [ { "id": None, "name": "谷电蓄热", "strategy_type": "heat_storage", "description": "在低谷电价时段(23:00-7:00)预热水箱,减少尖峰时段热泵运行", "is_enabled": False, "priority": 3, "parameters": {"shift_ratio": 0.3, "valley_start": "23:00", "valley_end": "07:00"}, }, { "id": None, "name": "光伏自消纳优先", "strategy_type": "pv_priority", "description": "优先使用光伏发电供给园区负荷,减少向电网购电", "is_enabled": True, "priority": 2, "parameters": {"min_self_consumption_ratio": 0.7}, }, { "id": None, "name": "负荷转移", "strategy_type": "load_shift", "description": "将可调负荷从尖峰时段转移至平段或低谷时段", "is_enabled": False, "priority": 1, "parameters": {"max_shift_ratio": 0.15, "target_periods": ["flat", "valley"]}, }, ] return [ { "id": s.id, "name": s.name, "strategy_type": s.strategy_type, "description": s.description, "is_enabled": s.is_enabled, "priority": s.priority, "parameters": s.parameters or {}, } for s in strategies ] @router.post("/strategies") async def create_strategy( data: EnergyStrategyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """创建优化策略""" strategy = EnergyStrategy( name=data.name, strategy_type=data.strategy_type, description=data.description, parameters=data.parameters or {}, priority=data.priority, ) db.add(strategy) await db.flush() return {"id": strategy.id, "message": "策略创建成功"} @router.put("/strategies/{strategy_id}") async def update_strategy( strategy_id: int, data: EnergyStrategyUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """更新优化策略""" result = await db.execute(select(EnergyStrategy).where(EnergyStrategy.id == strategy_id)) strategy = result.scalar_one_or_none() if not strategy: raise HTTPException(status_code=404, detail="策略不存在") if data.name is not None: strategy.name = data.name if data.description is not None: strategy.description = data.description if data.parameters is not None: strategy.parameters = data.parameters if data.is_enabled is not None: strategy.is_enabled = data.is_enabled if data.priority is not None: strategy.priority = data.priority return {"message": "策略更新成功"} @router.put("/strategies/{strategy_id}/toggle") async def toggle_strategy( strategy_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """启用/停用策略""" result = await db.execute(select(EnergyStrategy).where(EnergyStrategy.id == strategy_id)) strategy = result.scalar_one_or_none() if not strategy: raise HTTPException(status_code=404, detail="策略不存在") strategy.is_enabled = not strategy.is_enabled return {"is_enabled": strategy.is_enabled, "message": f"策略已{'启用' if strategy.is_enabled else '停用'}"} # ---- Analysis ---- @router.get("/cost-analysis") async def cost_analysis( year: int = Query(default=2026), month: int = Query(default=4, ge=1, le=12), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """月度费用分析""" return await calculate_monthly_cost_breakdown(db, year, month) @router.get("/savings-report") async def savings_report( year: int = Query(default=2026), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """节约报告""" return await get_savings_report(db, year) @router.get("/recommendations") async def strategy_recommendations( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """获取当前推荐策略""" return await get_recommendations(db) @router.post("/simulate") async def simulate( data: SimulateRequest, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): """模拟策略影响""" pricing = await get_active_tou_pricing(db) if pricing: periods = await get_tou_periods(db, pricing.id) else: # Use default periods periods = [ TouPricingPeriod( period_type=p["period_type"], start_time=p["start_time"], end_time=p["end_time"], price_yuan_per_kwh=p["price"], ) for p in DEFAULT_PERIODS ] return simulate_strategy_impact( daily_consumption_kwh=data.daily_consumption_kwh, pv_daily_kwh=data.pv_daily_kwh, periods=periods, strategies=data.strategies, ) @router.get("/default-pricing") async def get_default_pricing( user: User = Depends(get_current_user), ): """获取北京工业默认电价""" return { "region": "北京", "type": "工业用电", "periods": [ {**p, "period_label": PERIOD_LABELS.get(p["period_type"], p["period_type"])} for p in DEFAULT_PERIODS ], }