Harden auth and outbound admin surfaces
This commit is contained in:
+16
-1
@@ -21,6 +21,8 @@ SQLITE_BUSY_TIMEOUT_MS = 5_000
|
||||
SQLITE_CACHE_SIZE_KIB = 32_768
|
||||
SQLITE_MMAP_SIZE_BYTES = 256 * 1024 * 1024
|
||||
_DB_UNSET = object()
|
||||
_DEFAULT_JWT_SECRET = "change-me"
|
||||
_DEFAULT_ADMIN_PASSWORD = "adminadmin"
|
||||
|
||||
|
||||
def _db_path() -> str:
|
||||
@@ -178,6 +180,11 @@ def _normalize_stored_email(value: Optional[Any]) -> Optional[str]:
|
||||
return candidate
|
||||
|
||||
|
||||
def _has_secure_bootstrap_admin_credentials() -> bool:
|
||||
password = str(settings.admin_password or "")
|
||||
return bool(password and password != _DEFAULT_ADMIN_PASSWORD)
|
||||
|
||||
|
||||
def init_db() -> None:
|
||||
with _connect() as conn:
|
||||
conn.execute(
|
||||
@@ -767,7 +774,7 @@ def get_recent_actions(request_id: str, limit: int = 10) -> list[dict[str, Any]]
|
||||
|
||||
|
||||
def ensure_admin_user() -> None:
|
||||
if not settings.admin_username or not settings.admin_password:
|
||||
if not settings.admin_username or not _has_secure_bootstrap_admin_credentials():
|
||||
return
|
||||
existing = get_user_by_username(settings.admin_username)
|
||||
if existing:
|
||||
@@ -775,6 +782,14 @@ def ensure_admin_user() -> None:
|
||||
create_user(settings.admin_username, settings.admin_password, role="admin")
|
||||
|
||||
|
||||
def has_admin_user() -> bool:
|
||||
with _connect() as conn:
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM users WHERE LOWER(role) = 'admin' LIMIT 1"
|
||||
).fetchone()
|
||||
return bool(row)
|
||||
|
||||
|
||||
def create_user(
|
||||
username: str,
|
||||
password: str,
|
||||
|
||||
Reference in New Issue
Block a user