"""Carbon Asset Management Service. Provides carbon accounting, CCER/green certificate management, report generation, target tracking, and benchmark comparison. """ import logging from datetime import date, datetime, timezone from typing import Optional from sqlalchemy import select, func, and_, case from sqlalchemy.ext.asyncio import AsyncSession from app.models.carbon import ( CarbonEmission, EmissionFactor, CarbonTarget, CarbonReduction, GreenCertificate, CarbonReport, CarbonBenchmark, ) from app.models.energy import EnergyData logger = logging.getLogger("carbon_asset") # China national grid emission factor 2023 (tCO2/MWh) GRID_EMISSION_FACTOR = 0.5810 # Average CO2 absorption per tree per year (tons) TREE_ABSORPTION = 0.02 # --------------------------------------------------------------------------- # Carbon Accounting # --------------------------------------------------------------------------- async def calculate_scope2_emission( db: AsyncSession, start: date, end: date, ) -> float: """Calculate Scope 2 emission from grid electricity (tons CO2).""" result = await db.execute( select(func.sum(CarbonEmission.emission)) .where(and_( CarbonEmission.scope == 2, func.date(CarbonEmission.date) >= start, func.date(CarbonEmission.date) <= end, )) ) val = result.scalar() or 0 return round(val / 1000, 4) # kg -> tons async def calculate_scope1_emission( db: AsyncSession, start: date, end: date, ) -> float: """Calculate Scope 1 direct emissions (natural gas etc.) in tons CO2.""" result = await db.execute( select(func.sum(CarbonEmission.emission)) .where(and_( CarbonEmission.scope == 1, func.date(CarbonEmission.date) >= start, func.date(CarbonEmission.date) <= end, )) ) val = result.scalar() or 0 return round(val / 1000, 4) async def calculate_total_reduction( db: AsyncSession, start: date, end: date, ) -> float: """Total carbon reduction in tons.""" result = await db.execute( select(func.sum(CarbonEmission.reduction)) .where(and_( func.date(CarbonEmission.date) >= start, func.date(CarbonEmission.date) <= end, )) ) val = result.scalar() or 0 return round(val / 1000, 4) async def calculate_pv_reduction( db: AsyncSession, start: date, end: date, ) -> float: """Carbon reduction from PV self-consumption (tons). Formula: reduction = pv_generation_mwh * GRID_EMISSION_FACTOR """ result = await db.execute( select(func.sum(CarbonEmission.reduction)) .where(and_( CarbonEmission.category == "pv_generation", func.date(CarbonEmission.date) >= start, func.date(CarbonEmission.date) <= end, )) ) val = result.scalar() or 0 return round(val / 1000, 4) # --------------------------------------------------------------------------- # Reduction tracking # --------------------------------------------------------------------------- async def get_reduction_summary( db: AsyncSession, start: date, end: date, ) -> list[dict]: """Reduction summary grouped by source type.""" result = await db.execute( select( CarbonReduction.source_type, func.sum(CarbonReduction.reduction_tons), func.sum(CarbonReduction.equivalent_trees), func.count(CarbonReduction.id), ) .where(and_( CarbonReduction.date >= start, CarbonReduction.date <= end, )) .group_by(CarbonReduction.source_type) ) return [ { "source_type": row[0], "reduction_tons": round(row[1] or 0, 4), "equivalent_trees": round(row[2] or 0, 1), "count": row[3], } for row in result.all() ] async def trigger_reduction_calculation( db: AsyncSession, start: date, end: date, ) -> dict: """Compute reduction activities from energy data and persist them. Calculates PV generation reduction and heat pump COP savings. """ # PV reduction from carbon_emissions records pv_tons = await calculate_pv_reduction(db, start, end) total_reduction = await calculate_total_reduction(db, start, end) heat_pump_tons = max(0, total_reduction - pv_tons) records_created = 0 for source, tons in [("pv_generation", pv_tons), ("heat_pump_cop", heat_pump_tons)]: if tons > 0: existing = await db.execute( select(CarbonReduction).where(and_( CarbonReduction.source_type == source, CarbonReduction.date == end, )) ) if existing.scalar_one_or_none() is None: db.add(CarbonReduction( source_type=source, date=end, reduction_tons=tons, equivalent_trees=round(tons / TREE_ABSORPTION, 1), methodology=f"Grid factor {GRID_EMISSION_FACTOR} tCO2/MWh", verified=False, )) records_created += 1 return { "period": f"{start} ~ {end}", "pv_reduction_tons": pv_tons, "heat_pump_reduction_tons": heat_pump_tons, "total_reduction_tons": total_reduction, "records_created": records_created, } # --------------------------------------------------------------------------- # CCER / Green Certificate Management # --------------------------------------------------------------------------- async def calculate_ccer_eligible( db: AsyncSession, start: date, end: date, ) -> dict: """Calculate eligible CCER reduction from PV generation.""" pv_tons = await calculate_pv_reduction(db, start, end) return { "eligible_ccer_tons": pv_tons, "grid_emission_factor": GRID_EMISSION_FACTOR, "period": f"{start} ~ {end}", } async def get_certificate_portfolio_value(db: AsyncSession) -> dict: """Total value of active green certificates.""" result = await db.execute( select( GreenCertificate.status, func.count(GreenCertificate.id), func.sum(GreenCertificate.energy_mwh), func.sum(GreenCertificate.price_yuan), ).group_by(GreenCertificate.status) ) rows = result.all() total_value = 0.0 total_mwh = 0.0 by_status = {} for row in rows: status, cnt, mwh, value = row by_status[status] = { "count": cnt, "energy_mwh": round(mwh or 0, 2), "value_yuan": round(value or 0, 2), } total_value += value or 0 total_mwh += mwh or 0 return { "total_certificates": sum(v["count"] for v in by_status.values()), "total_energy_mwh": round(total_mwh, 2), "total_value_yuan": round(total_value, 2), "by_status": by_status, } # --------------------------------------------------------------------------- # Carbon Report Generation # --------------------------------------------------------------------------- async def generate_carbon_report( db: AsyncSession, report_type: str, period_start: date, period_end: date, ) -> CarbonReport: """Generate a carbon footprint report for the given period.""" scope1 = await calculate_scope1_emission(db, period_start, period_end) scope2 = await calculate_scope2_emission(db, period_start, period_end) reduction = await calculate_total_reduction(db, period_start, period_end) total = scope1 + scope2 net = total - reduction # Reduction breakdown reduction_summary = await get_reduction_summary(db, period_start, period_end) # Monthly breakdown monthly = await _monthly_breakdown(db, period_start, period_end) report_data = { "scope_breakdown": {"scope1": scope1, "scope2": scope2}, "reduction_summary": reduction_summary, "monthly_breakdown": monthly, "grid_emission_factor": GRID_EMISSION_FACTOR, "net_emission_tons": round(net, 4), "green_rate": round((reduction / total * 100) if total > 0 else 0, 1), } report = CarbonReport( report_type=report_type, period_start=period_start, period_end=period_end, scope1_tons=scope1, scope2_tons=scope2, total_tons=round(total, 4), reduction_tons=round(reduction, 4), net_tons=round(net, 4), report_data=report_data, ) db.add(report) return report async def _monthly_breakdown( db: AsyncSession, start: date, end: date, ) -> list[dict]: """Monthly emission and reduction totals for the period.""" from app.core.config import get_settings settings = get_settings() if settings.is_sqlite: month_expr = func.strftime('%Y-%m', CarbonEmission.date).label('month') else: month_expr = func.to_char( func.date_trunc('month', CarbonEmission.date), 'YYYY-MM' ).label('month') result = await db.execute( select( month_expr, func.sum(CarbonEmission.emission), func.sum(CarbonEmission.reduction), ) .where(and_( func.date(CarbonEmission.date) >= start, func.date(CarbonEmission.date) <= end, )) .group_by('month') .order_by('month') ) return [ { "month": row[0], "emission_kg": round(row[1] or 0, 2), "reduction_kg": round(row[2] or 0, 2), "emission_tons": round((row[1] or 0) / 1000, 4), "reduction_tons": round((row[2] or 0) / 1000, 4), } for row in result.all() ] # --------------------------------------------------------------------------- # Carbon Target Tracking # --------------------------------------------------------------------------- async def get_target_progress(db: AsyncSession, year: int) -> dict: """Calculate progress against annual and monthly targets.""" # Annual target annual_q = await db.execute( select(CarbonTarget).where(and_( CarbonTarget.year == year, CarbonTarget.month.is_(None), )) ) annual = annual_q.scalar_one_or_none() # All monthly targets for this year monthly_q = await db.execute( select(CarbonTarget).where(and_( CarbonTarget.year == year, CarbonTarget.month.isnot(None), )).order_by(CarbonTarget.month) ) monthlies = monthly_q.scalars().all() # Current year actuals year_start = date(year, 1, 1) year_end = date(year, 12, 31) scope1 = await calculate_scope1_emission(db, year_start, year_end) scope2 = await calculate_scope2_emission(db, year_start, year_end) reduction = await calculate_total_reduction(db, year_start, year_end) total_emission = scope1 + scope2 net = total_emission - reduction annual_data = None if annual: progress = (net / annual.target_emission_tons * 100) if annual.target_emission_tons > 0 else 0 status = "on_track" if progress <= 80 else ("warning" if progress <= 100 else "exceeded") annual_data = { "id": annual.id, "target_tons": annual.target_emission_tons, "actual_tons": round(net, 4), "progress_pct": round(progress, 1), "status": status, } monthly_data = [] for m in monthlies: m_start = date(year, m.month, 1) if m.month == 12: m_end = date(year, 12, 31) else: m_end = date(year, m.month + 1, 1) s1 = await calculate_scope1_emission(db, m_start, m_end) s2 = await calculate_scope2_emission(db, m_start, m_end) red = await calculate_total_reduction(db, m_start, m_end) m_net = s1 + s2 - red pct = (m_net / m.target_emission_tons * 100) if m.target_emission_tons > 0 else 0 monthly_data.append({ "id": m.id, "month": m.month, "target_tons": m.target_emission_tons, "actual_tons": round(m_net, 4), "progress_pct": round(pct, 1), "status": "on_track" if pct <= 80 else ("warning" if pct <= 100 else "exceeded"), }) return { "year": year, "total_emission_tons": round(total_emission, 4), "total_reduction_tons": round(reduction, 4), "net_emission_tons": round(net, 4), "annual_target": annual_data, "monthly_targets": monthly_data, } # --------------------------------------------------------------------------- # Benchmark Comparison # --------------------------------------------------------------------------- async def compare_with_benchmarks( db: AsyncSession, year: int, ) -> dict: """Compare actual emissions with industry benchmarks.""" benchmarks_q = await db.execute( select(CarbonBenchmark).where(CarbonBenchmark.year == year) ) benchmarks = benchmarks_q.scalars().all() year_start = date(year, 1, 1) year_end = date(year, 12, 31) scope1 = await calculate_scope1_emission(db, year_start, year_end) scope2 = await calculate_scope2_emission(db, year_start, year_end) total = scope1 + scope2 reduction = await calculate_total_reduction(db, year_start, year_end) comparisons = [] for b in benchmarks: comparisons.append({ "industry": b.industry, "metric": b.metric_name, "benchmark_value": b.benchmark_value, "unit": b.unit, "source": b.source, }) return { "year": year, "actual_emission_tons": round(total, 4), "actual_reduction_tons": round(reduction, 4), "net_tons": round(total - reduction, 4), "benchmarks": comparisons, } # --------------------------------------------------------------------------- # Dashboard aggregation # --------------------------------------------------------------------------- async def get_carbon_dashboard(db: AsyncSession) -> dict: """Comprehensive carbon dashboard data.""" now = datetime.now(timezone.utc) year = now.year year_start = date(year, 1, 1) today = now.date() scope1 = await calculate_scope1_emission(db, year_start, today) scope2 = await calculate_scope2_emission(db, year_start, today) total_emission = scope1 + scope2 reduction = await calculate_total_reduction(db, year_start, today) net = total_emission - reduction green_rate = round((reduction / total_emission * 100) if total_emission > 0 else 0, 1) # Target progress target_progress = await get_target_progress(db, year) # Monthly trend monthly = await _monthly_breakdown(db, year_start, today) # Reduction summary reduction_summary = await get_reduction_summary(db, year_start, today) # Certificate value cert_value = await get_certificate_portfolio_value(db) return { "kpi": { "total_emission_tons": round(total_emission, 4), "total_reduction_tons": round(reduction, 4), "net_emission_tons": round(net, 4), "green_rate": green_rate, "scope1_tons": scope1, "scope2_tons": scope2, "equivalent_trees": round(reduction / TREE_ABSORPTION, 0) if reduction > 0 else 0, }, "target_progress": target_progress.get("annual_target"), "monthly_trend": monthly, "reduction_by_source": reduction_summary, "certificate_portfolio": cert_value, }