from __future__ import annotations import ipaddress import socket from functools import lru_cache from typing import Iterable from urllib.parse import urlparse from .config import settings _METADATA_HOSTS = { "169.254.169.254", "metadata.google.internal", "metadata.azure.internal", } def _normalize_text(value: object) -> str: if value is None: return "" return str(value).strip() def _split_csv(value: object) -> list[str]: raw = _normalize_text(value) if not raw: return [] return [part.strip() for part in raw.split(",") if part.strip()] def _ip_is_sensitive(ip_obj: ipaddress._BaseAddress) -> bool: return bool( ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast or ip_obj.is_unspecified or ip_obj.is_reserved or ip_obj.is_private ) @lru_cache(maxsize=256) def _resolve_host_ips(host: str) -> tuple[ipaddress._BaseAddress, ...]: resolved: list[ipaddress._BaseAddress] = [] for family, _, _, _, sockaddr in socket.getaddrinfo(host, None): if family == socket.AF_INET: resolved.append(ipaddress.ip_address(sockaddr[0])) elif family == socket.AF_INET6: resolved.append(ipaddress.ip_address(sockaddr[0])) return tuple(resolved) def _is_trusted_proxy_host(host: str, trusted_proxies: Iterable[str]) -> bool: candidate = _normalize_text(host) if not candidate: return False try: host_ip = ipaddress.ip_address(candidate) except ValueError: return candidate.lower() in {entry.lower() for entry in trusted_proxies} for entry in trusted_proxies: raw = _normalize_text(entry) if not raw: continue try: if "/" in raw: if host_ip in ipaddress.ip_network(raw, strict=False): return True elif host_ip == ipaddress.ip_address(raw): return True except ValueError: continue return False def request_trusts_forwarded_headers(client_host: str | None) -> bool: if not settings.magent_proxy_enabled or not settings.magent_proxy_trust_forwarded_headers: return False trusted = _split_csv(settings.magent_proxy_trusted_proxies) if not trusted: return False return _is_trusted_proxy_host(client_host or "", trusted) def validate_notification_target_url( url: str, *, allow_private: bool | None = None, ) -> str: raw = _normalize_text(url) if not raw: raise ValueError("URL cannot be empty.") parsed = urlparse(raw) if parsed.scheme not in {"http", "https"}: raise ValueError("URL must use http:// or https://.") if parsed.username or parsed.password: raise ValueError("URL must not embed credentials.") hostname = _normalize_text(parsed.hostname).lower() if not hostname: raise ValueError("URL must include a valid host.") allow_private_targets = ( settings.magent_allow_private_notification_targets if allow_private is None else bool(allow_private) ) if hostname in _METADATA_HOSTS: raise ValueError("Metadata service targets are not allowed.") if hostname == "localhost" and not allow_private_targets: raise ValueError("Local notification targets are not allowed.") try: host_ip = ipaddress.ip_address(hostname) except ValueError: host_ip = None if host_ip is not None: if _ip_is_sensitive(host_ip) and not allow_private_targets: raise ValueError("Private or local notification targets are not allowed.") return raw try: resolved_ips = _resolve_host_ips(hostname) except socket.gaierror as exc: raise ValueError("Host could not be resolved.") from exc if not resolved_ips: raise ValueError("Host could not be resolved.") if not allow_private_targets and any(_ip_is_sensitive(ip_obj) for ip_obj in resolved_ips): raise ValueError("Private or local notification targets are not allowed.") return raw