from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, and_ from pydantic import BaseModel from app.core.database import get_db from app.core.deps import get_current_user, require_roles from app.models.charging import ( ChargingStation, ChargingPile, ChargingPriceStrategy, ChargingPriceParam, ChargingOrder, OccupancyOrder, ChargingBrand, ChargingMerchant, ) from app.models.user import User from app.services.audit import log_audit router = APIRouter(prefix="/charging", tags=["充电管理"]) # ─── Pydantic Schemas ─────────────────────────────────────────────── class StationCreate(BaseModel): name: str merchant_id: int | None = None type: str | None = None address: str | None = None latitude: float | None = None longitude: float | None = None price: float | None = None activity: str | None = None status: str = "active" total_piles: int = 0 available_piles: int = 0 total_power_kw: float = 0 photo_url: str | None = None operating_hours: str | None = None class StationUpdate(BaseModel): name: str | None = None merchant_id: int | None = None type: str | None = None address: str | None = None latitude: float | None = None longitude: float | None = None price: float | None = None activity: str | None = None status: str | None = None total_piles: int | None = None available_piles: int | None = None total_power_kw: float | None = None photo_url: str | None = None operating_hours: str | None = None class PileCreate(BaseModel): station_id: int encoding: str name: str | None = None type: str | None = None brand: str | None = None model: str | None = None rated_power_kw: float | None = None connector_type: str | None = None status: str = "active" work_status: str = "offline" class PileUpdate(BaseModel): station_id: int | None = None encoding: str | None = None name: str | None = None type: str | None = None brand: str | None = None model: str | None = None rated_power_kw: float | None = None connector_type: str | None = None status: str | None = None work_status: str | None = None class PriceParamCreate(BaseModel): start_time: str end_time: str period_mark: str | None = None elec_price: float service_price: float = 0 class PriceStrategyCreate(BaseModel): strategy_name: str station_id: int | None = None bill_model: str | None = None description: str | None = None status: str = "inactive" params: list[PriceParamCreate] = [] class PriceStrategyUpdate(BaseModel): strategy_name: str | None = None station_id: int | None = None bill_model: str | None = None description: str | None = None status: str | None = None params: list[PriceParamCreate] | None = None class MerchantCreate(BaseModel): name: str contact_person: str | None = None phone: str | None = None email: str | None = None address: str | None = None business_license: str | None = None status: str = "active" settlement_type: str | None = None class BrandCreate(BaseModel): brand_name: str logo_url: str | None = None country: str | None = None description: str | None = None # ─── Station Endpoints ─────────────────────────────────────────────── @router.get("/stations") async def list_stations( status: str | None = None, type: str | None = None, merchant_id: int | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): query = select(ChargingStation) if status: query = query.where(ChargingStation.status == status) if type: query = query.where(ChargingStation.type == type) if merchant_id: query = query.where(ChargingStation.merchant_id == merchant_id) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(ChargingStation.id.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) stations = result.scalars().all() return {"total": total, "items": [_station_to_dict(s) for s in stations]} @router.post("/stations") async def create_station( data: StationCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): station = ChargingStation(**data.model_dump(), created_by=user.id) db.add(station) await db.flush() await log_audit(db, user.id, "create", "charging", detail=f"创建充电站 {data.name}") return _station_to_dict(station) @router.put("/stations/{station_id}") async def update_station( station_id: int, data: StationUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingStation).where(ChargingStation.id == station_id)) station = result.scalar_one_or_none() if not station: raise HTTPException(status_code=404, detail="充电站不存在") updates = data.model_dump(exclude_unset=True) for k, v in updates.items(): setattr(station, k, v) await log_audit(db, user.id, "update", "charging", detail=f"更新充电站 {station.name}") return _station_to_dict(station) @router.delete("/stations/{station_id}") async def delete_station( station_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingStation).where(ChargingStation.id == station_id)) station = result.scalar_one_or_none() if not station: raise HTTPException(status_code=404, detail="充电站不存在") station.status = "disabled" await log_audit(db, user.id, "delete", "charging", detail=f"禁用充电站 {station.name}") return {"message": "已禁用"} @router.get("/stations/{station_id}/piles") async def list_station_piles( station_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute( select(ChargingPile).where(ChargingPile.station_id == station_id).order_by(ChargingPile.id) ) return [_pile_to_dict(p) for p in result.scalars().all()] # ─── Pile Endpoints ────────────────────────────────────────────────── @router.get("/piles") async def list_piles( station_id: int | None = None, status: str | None = None, work_status: str | None = None, type: str | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): query = select(ChargingPile) if station_id: query = query.where(ChargingPile.station_id == station_id) if status: query = query.where(ChargingPile.status == status) if work_status: query = query.where(ChargingPile.work_status == work_status) if type: query = query.where(ChargingPile.type == type) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(ChargingPile.id.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) piles = result.scalars().all() return {"total": total, "items": [_pile_to_dict(p) for p in piles]} @router.post("/piles") async def create_pile( data: PileCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): pile = ChargingPile(**data.model_dump()) db.add(pile) await db.flush() await log_audit(db, user.id, "create", "charging", detail=f"创建充电桩 {data.encoding}") return _pile_to_dict(pile) @router.put("/piles/{pile_id}") async def update_pile( pile_id: int, data: PileUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingPile).where(ChargingPile.id == pile_id)) pile = result.scalar_one_or_none() if not pile: raise HTTPException(status_code=404, detail="充电桩不存在") updates = data.model_dump(exclude_unset=True) for k, v in updates.items(): setattr(pile, k, v) await log_audit(db, user.id, "update", "charging", detail=f"更新充电桩 {pile.encoding}") return _pile_to_dict(pile) @router.delete("/piles/{pile_id}") async def delete_pile( pile_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingPile).where(ChargingPile.id == pile_id)) pile = result.scalar_one_or_none() if not pile: raise HTTPException(status_code=404, detail="充电桩不存在") pile.status = "disabled" await log_audit(db, user.id, "delete", "charging", detail=f"禁用充电桩 {pile.encoding}") return {"message": "已禁用"} # ─── Pricing Endpoints ─────────────────────────────────────────────── @router.get("/pricing") async def list_pricing( station_id: int | None = None, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): query = select(ChargingPriceStrategy) if station_id: query = query.where(ChargingPriceStrategy.station_id == station_id) result = await db.execute(query.order_by(ChargingPriceStrategy.id.desc())) strategies = result.scalars().all() items = [] for s in strategies: params_q = await db.execute( select(ChargingPriceParam).where(ChargingPriceParam.strategy_id == s.id).order_by(ChargingPriceParam.start_time) ) params = [_param_to_dict(p) for p in params_q.scalars().all()] d = _strategy_to_dict(s) d["params"] = params items.append(d) return items @router.post("/pricing") async def create_pricing( data: PriceStrategyCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): strategy = ChargingPriceStrategy( strategy_name=data.strategy_name, station_id=data.station_id, bill_model=data.bill_model, description=data.description, status=data.status, ) db.add(strategy) await db.flush() for p in data.params: param = ChargingPriceParam(strategy_id=strategy.id, **p.model_dump()) db.add(param) await db.flush() await log_audit(db, user.id, "create", "charging", detail=f"创建计费策略 {data.strategy_name}") return _strategy_to_dict(strategy) @router.put("/pricing/{strategy_id}") async def update_pricing( strategy_id: int, data: PriceStrategyUpdate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingPriceStrategy).where(ChargingPriceStrategy.id == strategy_id)) strategy = result.scalar_one_or_none() if not strategy: raise HTTPException(status_code=404, detail="计费策略不存在") updates = data.model_dump(exclude_unset=True, exclude={"params"}) for k, v in updates.items(): setattr(strategy, k, v) if data.params is not None: # Delete old params and recreate old_params = await db.execute( select(ChargingPriceParam).where(ChargingPriceParam.strategy_id == strategy_id) ) for old in old_params.scalars().all(): await db.delete(old) await db.flush() for p in data.params: param = ChargingPriceParam(strategy_id=strategy_id, **p.model_dump()) db.add(param) await db.flush() await log_audit(db, user.id, "update", "charging", detail=f"更新计费策略 {strategy.strategy_name}") return _strategy_to_dict(strategy) @router.delete("/pricing/{strategy_id}") async def delete_pricing( strategy_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingPriceStrategy).where(ChargingPriceStrategy.id == strategy_id)) strategy = result.scalar_one_or_none() if not strategy: raise HTTPException(status_code=404, detail="计费策略不存在") strategy.status = "inactive" await log_audit(db, user.id, "delete", "charging", detail=f"停用计费策略 {strategy.strategy_name}") return {"message": "已停用"} # ─── Order Endpoints ───────────────────────────────────────────────── @router.get("/orders") async def list_orders( order_status: str | None = None, station_id: int | None = None, start_date: str | None = None, end_date: str | None = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): query = select(ChargingOrder) if order_status: query = query.where(ChargingOrder.order_status == order_status) if station_id: query = query.where(ChargingOrder.station_id == station_id) if start_date: query = query.where(ChargingOrder.created_at >= start_date) if end_date: query = query.where(ChargingOrder.created_at <= end_date) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(ChargingOrder.created_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) orders = result.scalars().all() return {"total": total, "items": [_order_to_dict(o) for o in orders]} @router.get("/orders/realtime") async def realtime_orders( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute( select(ChargingOrder).where(ChargingOrder.order_status == "charging").order_by(ChargingOrder.start_time.desc()) ) return [_order_to_dict(o) for o in result.scalars().all()] @router.get("/orders/abnormal") async def abnormal_orders( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): query = select(ChargingOrder).where(ChargingOrder.order_status.in_(["failed", "refunded"])) count_q = select(func.count()).select_from(query.subquery()) total = (await db.execute(count_q)).scalar() query = query.order_by(ChargingOrder.created_at.desc()).offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) return {"total": total, "items": [_order_to_dict(o) for o in result.scalars().all()]} @router.get("/orders/{order_id}") async def get_order( order_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): result = await db.execute(select(ChargingOrder).where(ChargingOrder.id == order_id)) order = result.scalar_one_or_none() if not order: raise HTTPException(status_code=404, detail="订单不存在") return _order_to_dict(order) @router.post("/orders/{order_id}/settle") async def settle_order( order_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager")), ): result = await db.execute(select(ChargingOrder).where(ChargingOrder.id == order_id)) order = result.scalar_one_or_none() if not order: raise HTTPException(status_code=404, detail="订单不存在") order.settle_type = "manual" order.settle_time = datetime.now(timezone.utc) order.order_status = "completed" await log_audit(db, user.id, "update", "charging", detail=f"手动结算订单 {order.order_no}") return {"message": "已结算"} # ─── Dashboard ─────────────────────────────────────────────────────── @router.get("/dashboard") async def charging_dashboard( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), ): now = datetime.now(timezone.utc) month_start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) # Total revenue (completed orders) rev_q = await db.execute( select(func.sum(ChargingOrder.paid_price)).where(ChargingOrder.order_status == "completed") ) total_revenue = rev_q.scalar() or 0 # Total energy delivered energy_q = await db.execute( select(func.sum(ChargingOrder.energy)).where(ChargingOrder.order_status == "completed") ) total_energy = energy_q.scalar() or 0 # Active sessions active_q = await db.execute( select(func.count(ChargingOrder.id)).where(ChargingOrder.order_status == "charging") ) active_sessions = active_q.scalar() or 0 # Utilization rate: charging piles / total active piles total_piles_q = await db.execute( select(func.count(ChargingPile.id)).where(ChargingPile.status == "active") ) total_piles = total_piles_q.scalar() or 0 charging_piles_q = await db.execute( select(func.count(ChargingPile.id)).where(ChargingPile.work_status == "charging") ) charging_piles = charging_piles_q.scalar() or 0 utilization_rate = round(charging_piles / total_piles * 100, 1) if total_piles > 0 else 0 # Revenue trend (last 30 days) thirty_days_ago = now - timedelta(days=30) trend_q = await db.execute( select( func.date(ChargingOrder.created_at).label("date"), func.sum(ChargingOrder.paid_price).label("revenue"), func.sum(ChargingOrder.energy).label("energy"), ).where( and_(ChargingOrder.order_status == "completed", ChargingOrder.created_at >= thirty_days_ago) ).group_by(func.date(ChargingOrder.created_at)).order_by(func.date(ChargingOrder.created_at)) ) revenue_trend = [{"date": str(r[0]), "revenue": round(r[1] or 0, 2), "energy": round(r[2] or 0, 2)} for r in trend_q.all()] # Station ranking by revenue ranking_q = await db.execute( select( ChargingOrder.station_name, func.sum(ChargingOrder.paid_price).label("revenue"), func.count(ChargingOrder.id).label("orders"), ).where(ChargingOrder.order_status == "completed") .group_by(ChargingOrder.station_name) .order_by(func.sum(ChargingOrder.paid_price).desc()) .limit(10) ) station_ranking = [{"station": r[0] or "未知", "revenue": round(r[1] or 0, 2), "orders": r[2]} for r in ranking_q.all()] # Pile status distribution pile_status_q = await db.execute( select(ChargingPile.work_status, func.count(ChargingPile.id)) .where(ChargingPile.status == "active") .group_by(ChargingPile.work_status) ) pile_status = {row[0]: row[1] for row in pile_status_q.all()} return { "total_revenue": round(total_revenue, 2), "total_energy": round(total_energy, 2), "active_sessions": active_sessions, "utilization_rate": utilization_rate, "revenue_trend": revenue_trend, "station_ranking": station_ranking, "pile_status": { "idle": pile_status.get("idle", 0), "charging": pile_status.get("charging", 0), "fault": pile_status.get("fault", 0), "offline": pile_status.get("offline", 0), }, } # ─── Merchant CRUD ─────────────────────────────────────────────────── @router.get("/merchants") async def list_merchants(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(ChargingMerchant).order_by(ChargingMerchant.id.desc())) return [_merchant_to_dict(m) for m in result.scalars().all()] @router.post("/merchants") async def create_merchant(data: MerchantCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): merchant = ChargingMerchant(**data.model_dump()) db.add(merchant) await db.flush() return _merchant_to_dict(merchant) @router.put("/merchants/{merchant_id}") async def update_merchant(merchant_id: int, data: MerchantCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): result = await db.execute(select(ChargingMerchant).where(ChargingMerchant.id == merchant_id)) merchant = result.scalar_one_or_none() if not merchant: raise HTTPException(status_code=404, detail="运营商不存在") for k, v in data.model_dump(exclude_unset=True).items(): setattr(merchant, k, v) return _merchant_to_dict(merchant) @router.delete("/merchants/{merchant_id}") async def delete_merchant(merchant_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): result = await db.execute(select(ChargingMerchant).where(ChargingMerchant.id == merchant_id)) merchant = result.scalar_one_or_none() if not merchant: raise HTTPException(status_code=404, detail="运营商不存在") merchant.status = "disabled" return {"message": "已禁用"} # ─── Brand CRUD ────────────────────────────────────────────────────── @router.get("/brands") async def list_brands(db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)): result = await db.execute(select(ChargingBrand).order_by(ChargingBrand.id.desc())) return [_brand_to_dict(b) for b in result.scalars().all()] @router.post("/brands") async def create_brand(data: BrandCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): brand = ChargingBrand(**data.model_dump()) db.add(brand) await db.flush() return _brand_to_dict(brand) @router.put("/brands/{brand_id}") async def update_brand(brand_id: int, data: BrandCreate, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): result = await db.execute(select(ChargingBrand).where(ChargingBrand.id == brand_id)) brand = result.scalar_one_or_none() if not brand: raise HTTPException(status_code=404, detail="品牌不存在") for k, v in data.model_dump(exclude_unset=True).items(): setattr(brand, k, v) return _brand_to_dict(brand) @router.delete("/brands/{brand_id}") async def delete_brand(brand_id: int, db: AsyncSession = Depends(get_db), user: User = Depends(require_roles("admin", "energy_manager"))): result = await db.execute(select(ChargingBrand).where(ChargingBrand.id == brand_id)) brand = result.scalar_one_or_none() if not brand: raise HTTPException(status_code=404, detail="品牌不存在") await db.delete(brand) return {"message": "已删除"} # ─── Dict Helpers ──────────────────────────────────────────────────── def _station_to_dict(s: ChargingStation) -> dict: return { "id": s.id, "name": s.name, "merchant_id": s.merchant_id, "type": s.type, "address": s.address, "latitude": s.latitude, "longitude": s.longitude, "price": s.price, "activity": s.activity, "status": s.status, "total_piles": s.total_piles, "available_piles": s.available_piles, "total_power_kw": s.total_power_kw, "photo_url": s.photo_url, "operating_hours": s.operating_hours, "created_by": s.created_by, "created_at": str(s.created_at) if s.created_at else None, } def _pile_to_dict(p: ChargingPile) -> dict: return { "id": p.id, "station_id": p.station_id, "encoding": p.encoding, "name": p.name, "type": p.type, "brand": p.brand, "model": p.model, "rated_power_kw": p.rated_power_kw, "connector_type": p.connector_type, "status": p.status, "work_status": p.work_status, "created_at": str(p.created_at) if p.created_at else None, } def _strategy_to_dict(s: ChargingPriceStrategy) -> dict: return { "id": s.id, "strategy_name": s.strategy_name, "station_id": s.station_id, "bill_model": s.bill_model, "description": s.description, "status": s.status, "created_at": str(s.created_at) if s.created_at else None, } def _param_to_dict(p: ChargingPriceParam) -> dict: return { "id": p.id, "strategy_id": p.strategy_id, "start_time": p.start_time, "end_time": p.end_time, "period_mark": p.period_mark, "elec_price": p.elec_price, "service_price": p.service_price, } def _order_to_dict(o: ChargingOrder) -> dict: return { "id": o.id, "order_no": o.order_no, "user_id": o.user_id, "user_name": o.user_name, "phone": o.phone, "station_id": o.station_id, "station_name": o.station_name, "pile_id": o.pile_id, "pile_name": o.pile_name, "start_time": str(o.start_time) if o.start_time else None, "end_time": str(o.end_time) if o.end_time else None, "car_no": o.car_no, "car_vin": o.car_vin, "charge_method": o.charge_method, "settle_type": o.settle_type, "pay_type": o.pay_type, "settle_time": str(o.settle_time) if o.settle_time else None, "settle_price": o.settle_price, "paid_price": o.paid_price, "discount_amt": o.discount_amt, "elec_amt": o.elec_amt, "serve_amt": o.serve_amt, "order_status": o.order_status, "charge_duration": o.charge_duration, "energy": o.energy, "start_soc": o.start_soc, "end_soc": o.end_soc, "abno_cause": o.abno_cause, "order_source": o.order_source, "created_at": str(o.created_at) if o.created_at else None, } def _merchant_to_dict(m: ChargingMerchant) -> dict: return { "id": m.id, "name": m.name, "contact_person": m.contact_person, "phone": m.phone, "email": m.email, "address": m.address, "business_license": m.business_license, "status": m.status, "settlement_type": m.settlement_type, } def _brand_to_dict(b: ChargingBrand) -> dict: return { "id": b.id, "brand_name": b.brand_name, "logo_url": b.logo_url, "country": b.country, "description": b.description, }