Files

463 lines
15 KiB
Python
Raw Permalink Normal View History

"""Carbon Asset Management Service.
Provides carbon accounting, CCER/green certificate management,
report generation, target tracking, and benchmark comparison.
"""
import logging
from datetime import date, datetime, timezone
from typing import Optional
from sqlalchemy import select, func, and_, case
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.carbon import (
CarbonEmission, EmissionFactor, CarbonTarget, CarbonReduction,
GreenCertificate, CarbonReport, CarbonBenchmark,
)
from app.models.energy import EnergyData
logger = logging.getLogger("carbon_asset")
# China national grid emission factor 2023 (tCO2/MWh)
GRID_EMISSION_FACTOR = 0.5810
# Average CO2 absorption per tree per year (tons)
TREE_ABSORPTION = 0.02
# ---------------------------------------------------------------------------
# Carbon Accounting
# ---------------------------------------------------------------------------
async def calculate_scope2_emission(
db: AsyncSession, start: date, end: date,
) -> float:
"""Calculate Scope 2 emission from grid electricity (tons CO2)."""
result = await db.execute(
select(func.sum(CarbonEmission.emission))
.where(and_(
CarbonEmission.scope == 2,
func.date(CarbonEmission.date) >= start,
func.date(CarbonEmission.date) <= end,
))
)
val = result.scalar() or 0
return round(val / 1000, 4) # kg -> tons
async def calculate_scope1_emission(
db: AsyncSession, start: date, end: date,
) -> float:
"""Calculate Scope 1 direct emissions (natural gas etc.) in tons CO2."""
result = await db.execute(
select(func.sum(CarbonEmission.emission))
.where(and_(
CarbonEmission.scope == 1,
func.date(CarbonEmission.date) >= start,
func.date(CarbonEmission.date) <= end,
))
)
val = result.scalar() or 0
return round(val / 1000, 4)
async def calculate_total_reduction(
db: AsyncSession, start: date, end: date,
) -> float:
"""Total carbon reduction in tons."""
result = await db.execute(
select(func.sum(CarbonEmission.reduction))
.where(and_(
func.date(CarbonEmission.date) >= start,
func.date(CarbonEmission.date) <= end,
))
)
val = result.scalar() or 0
return round(val / 1000, 4)
async def calculate_pv_reduction(
db: AsyncSession, start: date, end: date,
) -> float:
"""Carbon reduction from PV self-consumption (tons).
Formula: reduction = pv_generation_mwh * GRID_EMISSION_FACTOR
"""
result = await db.execute(
select(func.sum(CarbonEmission.reduction))
.where(and_(
CarbonEmission.category == "pv_generation",
func.date(CarbonEmission.date) >= start,
func.date(CarbonEmission.date) <= end,
))
)
val = result.scalar() or 0
return round(val / 1000, 4)
# ---------------------------------------------------------------------------
# Reduction tracking
# ---------------------------------------------------------------------------
async def get_reduction_summary(
db: AsyncSession, start: date, end: date,
) -> list[dict]:
"""Reduction summary grouped by source type."""
result = await db.execute(
select(
CarbonReduction.source_type,
func.sum(CarbonReduction.reduction_tons),
func.sum(CarbonReduction.equivalent_trees),
func.count(CarbonReduction.id),
)
.where(and_(
CarbonReduction.date >= start,
CarbonReduction.date <= end,
))
.group_by(CarbonReduction.source_type)
)
return [
{
"source_type": row[0],
"reduction_tons": round(row[1] or 0, 4),
"equivalent_trees": round(row[2] or 0, 1),
"count": row[3],
}
for row in result.all()
]
async def trigger_reduction_calculation(
db: AsyncSession, start: date, end: date,
) -> dict:
"""Compute reduction activities from energy data and persist them.
Calculates PV generation reduction and heat pump COP savings.
"""
# PV reduction from carbon_emissions records
pv_tons = await calculate_pv_reduction(db, start, end)
total_reduction = await calculate_total_reduction(db, start, end)
heat_pump_tons = max(0, total_reduction - pv_tons)
records_created = 0
for source, tons in [("pv_generation", pv_tons), ("heat_pump_cop", heat_pump_tons)]:
if tons > 0:
existing = await db.execute(
select(CarbonReduction).where(and_(
CarbonReduction.source_type == source,
CarbonReduction.date == end,
))
)
if existing.scalar_one_or_none() is None:
db.add(CarbonReduction(
source_type=source,
date=end,
reduction_tons=tons,
equivalent_trees=round(tons / TREE_ABSORPTION, 1),
methodology=f"Grid factor {GRID_EMISSION_FACTOR} tCO2/MWh",
verified=False,
))
records_created += 1
return {
"period": f"{start} ~ {end}",
"pv_reduction_tons": pv_tons,
"heat_pump_reduction_tons": heat_pump_tons,
"total_reduction_tons": total_reduction,
"records_created": records_created,
}
# ---------------------------------------------------------------------------
# CCER / Green Certificate Management
# ---------------------------------------------------------------------------
async def calculate_ccer_eligible(
db: AsyncSession, start: date, end: date,
) -> dict:
"""Calculate eligible CCER reduction from PV generation."""
pv_tons = await calculate_pv_reduction(db, start, end)
return {
"eligible_ccer_tons": pv_tons,
"grid_emission_factor": GRID_EMISSION_FACTOR,
"period": f"{start} ~ {end}",
}
async def get_certificate_portfolio_value(db: AsyncSession) -> dict:
"""Total value of active green certificates."""
result = await db.execute(
select(
GreenCertificate.status,
func.count(GreenCertificate.id),
func.sum(GreenCertificate.energy_mwh),
func.sum(GreenCertificate.price_yuan),
).group_by(GreenCertificate.status)
)
rows = result.all()
total_value = 0.0
total_mwh = 0.0
by_status = {}
for row in rows:
status, cnt, mwh, value = row
by_status[status] = {
"count": cnt,
"energy_mwh": round(mwh or 0, 2),
"value_yuan": round(value or 0, 2),
}
total_value += value or 0
total_mwh += mwh or 0
return {
"total_certificates": sum(v["count"] for v in by_status.values()),
"total_energy_mwh": round(total_mwh, 2),
"total_value_yuan": round(total_value, 2),
"by_status": by_status,
}
# ---------------------------------------------------------------------------
# Carbon Report Generation
# ---------------------------------------------------------------------------
async def generate_carbon_report(
db: AsyncSession,
report_type: str,
period_start: date,
period_end: date,
) -> CarbonReport:
"""Generate a carbon footprint report for the given period."""
scope1 = await calculate_scope1_emission(db, period_start, period_end)
scope2 = await calculate_scope2_emission(db, period_start, period_end)
reduction = await calculate_total_reduction(db, period_start, period_end)
total = scope1 + scope2
net = total - reduction
# Reduction breakdown
reduction_summary = await get_reduction_summary(db, period_start, period_end)
# Monthly breakdown
monthly = await _monthly_breakdown(db, period_start, period_end)
report_data = {
"scope_breakdown": {"scope1": scope1, "scope2": scope2},
"reduction_summary": reduction_summary,
"monthly_breakdown": monthly,
"grid_emission_factor": GRID_EMISSION_FACTOR,
"net_emission_tons": round(net, 4),
"green_rate": round((reduction / total * 100) if total > 0 else 0, 1),
}
report = CarbonReport(
report_type=report_type,
period_start=period_start,
period_end=period_end,
scope1_tons=scope1,
scope2_tons=scope2,
total_tons=round(total, 4),
reduction_tons=round(reduction, 4),
net_tons=round(net, 4),
report_data=report_data,
)
db.add(report)
return report
async def _monthly_breakdown(
db: AsyncSession, start: date, end: date,
) -> list[dict]:
"""Monthly emission and reduction totals for the period."""
from app.core.config import get_settings
settings = get_settings()
if settings.is_sqlite:
month_expr = func.strftime('%Y-%m', CarbonEmission.date).label('month')
else:
month_expr = func.to_char(
func.date_trunc('month', CarbonEmission.date), 'YYYY-MM'
).label('month')
result = await db.execute(
select(
month_expr,
func.sum(CarbonEmission.emission),
func.sum(CarbonEmission.reduction),
)
.where(and_(
func.date(CarbonEmission.date) >= start,
func.date(CarbonEmission.date) <= end,
))
.group_by('month')
.order_by('month')
)
return [
{
"month": row[0],
"emission_kg": round(row[1] or 0, 2),
"reduction_kg": round(row[2] or 0, 2),
"emission_tons": round((row[1] or 0) / 1000, 4),
"reduction_tons": round((row[2] or 0) / 1000, 4),
}
for row in result.all()
]
# ---------------------------------------------------------------------------
# Carbon Target Tracking
# ---------------------------------------------------------------------------
async def get_target_progress(db: AsyncSession, year: int) -> dict:
"""Calculate progress against annual and monthly targets."""
# Annual target
annual_q = await db.execute(
select(CarbonTarget).where(and_(
CarbonTarget.year == year,
CarbonTarget.month.is_(None),
))
)
annual = annual_q.scalar_one_or_none()
# All monthly targets for this year
monthly_q = await db.execute(
select(CarbonTarget).where(and_(
CarbonTarget.year == year,
CarbonTarget.month.isnot(None),
)).order_by(CarbonTarget.month)
)
monthlies = monthly_q.scalars().all()
# Current year actuals
year_start = date(year, 1, 1)
year_end = date(year, 12, 31)
scope1 = await calculate_scope1_emission(db, year_start, year_end)
scope2 = await calculate_scope2_emission(db, year_start, year_end)
reduction = await calculate_total_reduction(db, year_start, year_end)
total_emission = scope1 + scope2
net = total_emission - reduction
annual_data = None
if annual:
progress = (net / annual.target_emission_tons * 100) if annual.target_emission_tons > 0 else 0
status = "on_track" if progress <= 80 else ("warning" if progress <= 100 else "exceeded")
annual_data = {
"id": annual.id,
"target_tons": annual.target_emission_tons,
"actual_tons": round(net, 4),
"progress_pct": round(progress, 1),
"status": status,
}
monthly_data = []
for m in monthlies:
m_start = date(year, m.month, 1)
if m.month == 12:
m_end = date(year, 12, 31)
else:
m_end = date(year, m.month + 1, 1)
s1 = await calculate_scope1_emission(db, m_start, m_end)
s2 = await calculate_scope2_emission(db, m_start, m_end)
red = await calculate_total_reduction(db, m_start, m_end)
m_net = s1 + s2 - red
pct = (m_net / m.target_emission_tons * 100) if m.target_emission_tons > 0 else 0
monthly_data.append({
"id": m.id,
"month": m.month,
"target_tons": m.target_emission_tons,
"actual_tons": round(m_net, 4),
"progress_pct": round(pct, 1),
"status": "on_track" if pct <= 80 else ("warning" if pct <= 100 else "exceeded"),
})
return {
"year": year,
"total_emission_tons": round(total_emission, 4),
"total_reduction_tons": round(reduction, 4),
"net_emission_tons": round(net, 4),
"annual_target": annual_data,
"monthly_targets": monthly_data,
}
# ---------------------------------------------------------------------------
# Benchmark Comparison
# ---------------------------------------------------------------------------
async def compare_with_benchmarks(
db: AsyncSession, year: int,
) -> dict:
"""Compare actual emissions with industry benchmarks."""
benchmarks_q = await db.execute(
select(CarbonBenchmark).where(CarbonBenchmark.year == year)
)
benchmarks = benchmarks_q.scalars().all()
year_start = date(year, 1, 1)
year_end = date(year, 12, 31)
scope1 = await calculate_scope1_emission(db, year_start, year_end)
scope2 = await calculate_scope2_emission(db, year_start, year_end)
total = scope1 + scope2
reduction = await calculate_total_reduction(db, year_start, year_end)
comparisons = []
for b in benchmarks:
comparisons.append({
"industry": b.industry,
"metric": b.metric_name,
"benchmark_value": b.benchmark_value,
"unit": b.unit,
"source": b.source,
})
return {
"year": year,
"actual_emission_tons": round(total, 4),
"actual_reduction_tons": round(reduction, 4),
"net_tons": round(total - reduction, 4),
"benchmarks": comparisons,
}
# ---------------------------------------------------------------------------
# Dashboard aggregation
# ---------------------------------------------------------------------------
async def get_carbon_dashboard(db: AsyncSession) -> dict:
"""Comprehensive carbon dashboard data."""
now = datetime.now(timezone.utc)
year = now.year
year_start = date(year, 1, 1)
today = now.date()
scope1 = await calculate_scope1_emission(db, year_start, today)
scope2 = await calculate_scope2_emission(db, year_start, today)
total_emission = scope1 + scope2
reduction = await calculate_total_reduction(db, year_start, today)
net = total_emission - reduction
green_rate = round((reduction / total_emission * 100) if total_emission > 0 else 0, 1)
# Target progress
target_progress = await get_target_progress(db, year)
# Monthly trend
monthly = await _monthly_breakdown(db, year_start, today)
# Reduction summary
reduction_summary = await get_reduction_summary(db, year_start, today)
# Certificate value
cert_value = await get_certificate_portfolio_value(db)
return {
"kpi": {
"total_emission_tons": round(total_emission, 4),
"total_reduction_tons": round(reduction, 4),
"net_emission_tons": round(net, 4),
"green_rate": green_rate,
"scope1_tons": scope1,
"scope2_tons": scope2,
"equivalent_trees": round(reduction / TREE_ABSORPTION, 0) if reduction > 0 else 0,
},
"target_progress": target_progress.get("annual_target"),
"monthly_trend": monthly,
"reduction_by_source": reduction_summary,
"certificate_portfolio": cert_value,
}