Files
Magent/backend/app/clients/base.py

109 lines
4.0 KiB
Python

from typing import Any, Dict, Optional
import logging
import time
import httpx
from ..logging_config import sanitize_headers, sanitize_value
class ApiClient:
def __init__(self, base_url: Optional[str], api_key: Optional[str] = None):
self.base_url = base_url.rstrip("/") if base_url else None
self.api_key = api_key
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
def configured(self) -> bool:
return bool(self.base_url)
def headers(self) -> Dict[str, str]:
return {"X-Api-Key": self.api_key} if self.api_key else {}
def _response_summary(self, response: Optional[httpx.Response]) -> Optional[Any]:
if response is None:
return None
try:
payload = sanitize_value(response.json())
except ValueError:
payload = sanitize_value(response.text)
if isinstance(payload, str) and len(payload) > 500:
return f"{payload[:500]}..."
return payload
async def _request(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
payload: Optional[Dict[str, Any]] = None,
) -> Optional[Any]:
if not self.base_url:
self.logger.warning("client request skipped method=%s path=%s reason=not-configured", method, path)
return None
url = f"{self.base_url}{path}"
started_at = time.perf_counter()
self.logger.debug(
"outbound request started method=%s url=%s params=%s payload=%s headers=%s",
method,
url,
sanitize_value(params),
sanitize_value(payload),
sanitize_headers(self.headers()),
)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.request(
method,
url,
headers=self.headers(),
params=params,
json=payload,
)
response.raise_for_status()
duration_ms = round((time.perf_counter() - started_at) * 1000, 2)
self.logger.debug(
"outbound request completed method=%s url=%s status=%s duration_ms=%s",
method,
url,
response.status_code,
duration_ms,
)
if not response.content:
return None
return response.json()
except httpx.HTTPStatusError as exc:
duration_ms = round((time.perf_counter() - started_at) * 1000, 2)
response = exc.response
status = response.status_code if response is not None else "unknown"
log_fn = self.logger.error if isinstance(status, int) and status >= 500 else self.logger.warning
log_fn(
"outbound request returned error method=%s url=%s status=%s duration_ms=%s response=%s",
method,
url,
status,
duration_ms,
self._response_summary(response),
)
raise
except Exception:
duration_ms = round((time.perf_counter() - started_at) * 1000, 2)
self.logger.exception(
"outbound request failed method=%s url=%s duration_ms=%s",
method,
url,
duration_ms,
)
raise
async def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Optional[Any]:
return await self._request("GET", path, params=params)
async def post(self, path: str, payload: Optional[Dict[str, Any]] = None) -> Optional[Any]:
return await self._request("POST", path, payload=payload)
async def put(self, path: str, payload: Optional[Dict[str, Any]] = None) -> Optional[Any]:
return await self._request("PUT", path, payload=payload)
async def delete(self, path: str) -> Optional[Any]:
return await self._request("DELETE", path)