133 lines
4.0 KiB
Python
133 lines
4.0 KiB
Python
from __future__ import annotations
|
|
|
|
import ipaddress
|
|
import socket
|
|
from functools import lru_cache
|
|
from typing import Iterable
|
|
from urllib.parse import urlparse
|
|
|
|
from .config import settings
|
|
|
|
_METADATA_HOSTS = {
|
|
"169.254.169.254",
|
|
"metadata.google.internal",
|
|
"metadata.azure.internal",
|
|
}
|
|
|
|
|
|
def _normalize_text(value: object) -> str:
|
|
if value is None:
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
|
|
def _split_csv(value: object) -> list[str]:
|
|
raw = _normalize_text(value)
|
|
if not raw:
|
|
return []
|
|
return [part.strip() for part in raw.split(",") if part.strip()]
|
|
|
|
|
|
def _ip_is_sensitive(ip_obj: ipaddress._BaseAddress) -> bool:
|
|
return bool(
|
|
ip_obj.is_loopback
|
|
or ip_obj.is_link_local
|
|
or ip_obj.is_multicast
|
|
or ip_obj.is_unspecified
|
|
or ip_obj.is_reserved
|
|
or ip_obj.is_private
|
|
)
|
|
|
|
|
|
@lru_cache(maxsize=256)
|
|
def _resolve_host_ips(host: str) -> tuple[ipaddress._BaseAddress, ...]:
|
|
resolved: list[ipaddress._BaseAddress] = []
|
|
for family, _, _, _, sockaddr in socket.getaddrinfo(host, None):
|
|
if family == socket.AF_INET:
|
|
resolved.append(ipaddress.ip_address(sockaddr[0]))
|
|
elif family == socket.AF_INET6:
|
|
resolved.append(ipaddress.ip_address(sockaddr[0]))
|
|
return tuple(resolved)
|
|
|
|
|
|
def _is_trusted_proxy_host(host: str, trusted_proxies: Iterable[str]) -> bool:
|
|
candidate = _normalize_text(host)
|
|
if not candidate:
|
|
return False
|
|
try:
|
|
host_ip = ipaddress.ip_address(candidate)
|
|
except ValueError:
|
|
return candidate.lower() in {entry.lower() for entry in trusted_proxies}
|
|
|
|
for entry in trusted_proxies:
|
|
raw = _normalize_text(entry)
|
|
if not raw:
|
|
continue
|
|
try:
|
|
if "/" in raw:
|
|
if host_ip in ipaddress.ip_network(raw, strict=False):
|
|
return True
|
|
elif host_ip == ipaddress.ip_address(raw):
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
|
|
def request_trusts_forwarded_headers(client_host: str | None) -> bool:
|
|
if not settings.magent_proxy_enabled or not settings.magent_proxy_trust_forwarded_headers:
|
|
return False
|
|
trusted = _split_csv(settings.magent_proxy_trusted_proxies)
|
|
if not trusted:
|
|
return False
|
|
return _is_trusted_proxy_host(client_host or "", trusted)
|
|
|
|
|
|
def validate_notification_target_url(
|
|
url: str,
|
|
*,
|
|
allow_private: bool | None = None,
|
|
) -> str:
|
|
raw = _normalize_text(url)
|
|
if not raw:
|
|
raise ValueError("URL cannot be empty.")
|
|
|
|
parsed = urlparse(raw)
|
|
if parsed.scheme not in {"http", "https"}:
|
|
raise ValueError("URL must use http:// or https://.")
|
|
if parsed.username or parsed.password:
|
|
raise ValueError("URL must not embed credentials.")
|
|
hostname = _normalize_text(parsed.hostname).lower()
|
|
if not hostname:
|
|
raise ValueError("URL must include a valid host.")
|
|
|
|
allow_private_targets = (
|
|
settings.magent_allow_private_notification_targets
|
|
if allow_private is None
|
|
else bool(allow_private)
|
|
)
|
|
if hostname in _METADATA_HOSTS:
|
|
raise ValueError("Metadata service targets are not allowed.")
|
|
if hostname == "localhost" and not allow_private_targets:
|
|
raise ValueError("Local notification targets are not allowed.")
|
|
|
|
try:
|
|
host_ip = ipaddress.ip_address(hostname)
|
|
except ValueError:
|
|
host_ip = None
|
|
|
|
if host_ip is not None:
|
|
if _ip_is_sensitive(host_ip) and not allow_private_targets:
|
|
raise ValueError("Private or local notification targets are not allowed.")
|
|
return raw
|
|
|
|
try:
|
|
resolved_ips = _resolve_host_ips(hostname)
|
|
except socket.gaierror as exc:
|
|
raise ValueError("Host could not be resolved.") from exc
|
|
if not resolved_ips:
|
|
raise ValueError("Host could not be resolved.")
|
|
if not allow_private_targets and any(_ip_is_sensitive(ip_obj) for ip_obj in resolved_ips):
|
|
raise ValueError("Private or local notification targets are not allowed.")
|
|
return raw
|