Shared backend + frontend for multi-customer EMS deployments. - 12 enterprise modules: quota, cost, charging, maintenance, analysis, etc. - 120+ API endpoints, 37 database tables - Customer config mechanism (CUSTOMER env var + YAML config) - Collectors: Modbus TCP, MQTT, HTTP API, Sungrow iSolarCloud - Frontend: React 19 + Ant Design + ECharts + Three.js - Infrastructure: Redis cache, rate limiting, aggregation engine Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
67 lines
2.3 KiB
Python
67 lines
2.3 KiB
Python
import pytest
|
|
from conftest import auth_header
|
|
|
|
|
|
class TestLogin:
|
|
async def test_login_valid_credentials(self, client, admin_user):
|
|
resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
data={"username": "testadmin", "password": "admin123"},
|
|
)
|
|
assert resp.status_code == 200
|
|
body = resp.json()
|
|
assert "access_token" in body
|
|
assert body["token_type"] == "bearer"
|
|
assert body["user"]["username"] == "testadmin"
|
|
assert body["user"]["role"] == "admin"
|
|
|
|
async def test_login_wrong_password(self, client, admin_user):
|
|
resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
data={"username": "testadmin", "password": "wrongpass"},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
async def test_login_nonexistent_user(self, client):
|
|
resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
data={"username": "nobody", "password": "whatever"},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
async def test_login_inactive_user(self, client, db_session):
|
|
from app.core.security import hash_password
|
|
from app.models.user import User
|
|
user = User(
|
|
username="inactive", hashed_password=hash_password("pass123"),
|
|
role="visitor", is_active=False,
|
|
)
|
|
db_session.add(user)
|
|
await db_session.commit()
|
|
resp = await client.post(
|
|
"/api/v1/auth/login",
|
|
data={"username": "inactive", "password": "pass123"},
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
|
|
class TestMe:
|
|
async def test_me_with_valid_token(self, client, admin_user, admin_token):
|
|
resp = await client.get("/api/v1/auth/me", headers=auth_header(admin_token))
|
|
assert resp.status_code == 200
|
|
body = resp.json()
|
|
assert body["username"] == "testadmin"
|
|
assert body["role"] == "admin"
|
|
assert body["is_active"] is True
|
|
|
|
async def test_me_without_token(self, client):
|
|
resp = await client.get("/api/v1/auth/me")
|
|
assert resp.status_code == 401
|
|
|
|
async def test_me_with_invalid_token(self, client):
|
|
resp = await client.get(
|
|
"/api/v1/auth/me",
|
|
headers=auth_header("invalid.token.here"),
|
|
)
|
|
assert resp.status_code == 401
|