from typing import Any, Dict, Optional import httpx import logging from .base import ApiClient class QBittorrentClient(ApiClient): def __init__(self, base_url: Optional[str], username: Optional[str], password: Optional[str]): super().__init__(base_url, None) self.username = username self.password = password self.logger = logging.getLogger(__name__) def configured(self) -> bool: return bool(self.base_url and self.username and self.password) async def _login(self, client: httpx.AsyncClient) -> None: if not self.base_url or not self.username or not self.password: raise RuntimeError("qBittorrent not configured") response = await client.post( f"{self.base_url}/api/v2/auth/login", data={"username": self.username, "password": self.password}, headers={"Referer": self.base_url}, ) response.raise_for_status() if response.text.strip().lower() != "ok.": raise RuntimeError("qBittorrent login failed") async def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Optional[Any]: if not self.base_url: return None async with httpx.AsyncClient(timeout=10.0) as client: await self._login(client) response = await client.get(f"{self.base_url}{path}", params=params) response.raise_for_status() return response.json() async def _get_text(self, path: str, params: Optional[Dict[str, Any]] = None) -> Optional[str]: if not self.base_url: return None async with httpx.AsyncClient(timeout=10.0) as client: await self._login(client) response = await client.get(f"{self.base_url}{path}", params=params) response.raise_for_status() return response.text.strip() async def _post_form(self, path: str, data: Dict[str, Any]) -> None: if not self.base_url: return None async with httpx.AsyncClient(timeout=10.0) as client: await self._login(client) response = await client.post(f"{self.base_url}{path}", data=data) response.raise_for_status() async def get_torrents(self) -> Optional[Any]: return await self._get("/api/v2/torrents/info") async def get_torrents_by_hashes(self, hashes: str) -> Optional[Any]: return await self._get("/api/v2/torrents/info", params={"hashes": hashes}) async def get_torrents_by_category(self, category: str) -> Optional[Any]: return await self._get("/api/v2/torrents/info", params={"category": category}) async def get_app_version(self) -> Optional[Any]: return await self._get_text("/api/v2/app/version") async def resume_torrents(self, hashes: str) -> None: try: await self._post_form("/api/v2/torrents/resume", data={"hashes": hashes}) except httpx.HTTPStatusError as exc: if exc.response is not None and exc.response.status_code == 404: await self._post_form("/api/v2/torrents/start", data={"hashes": hashes}) return raise async def add_torrent_url(self, url: str, category: Optional[str] = None) -> None: url_host = None if isinstance(url, str) and "://" in url: url_host = url.split("://", 1)[-1].split("/", 1)[0] self.logger.warning( "qBittorrent add_torrent_url invoked: category=%s host=%s", category, url_host or "unknown", ) data: Dict[str, Any] = {"urls": url} if category: data["category"] = category await self._post_form("/api/v2/torrents/add", data=data)