Files
Magent/backend/app/clients/jellyfin.py
2026-01-22 22:49:57 +13:00

61 lines
2.4 KiB
Python

from typing import Any, Dict, Optional
import httpx
from .base import ApiClient
class JellyfinClient(ApiClient):
def __init__(self, base_url: Optional[str], api_key: Optional[str]):
super().__init__(base_url, api_key)
def configured(self) -> bool:
return bool(self.base_url and self.api_key)
async def get_users(self) -> Optional[Dict[str, Any]]:
if not self.base_url:
return None
url = f"{self.base_url}/Users"
headers = {"X-Emby-Token": self.api_key} if self.api_key else {}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
return response.json()
async def authenticate_by_name(self, username: str, password: str) -> Optional[Dict[str, Any]]:
if not self.base_url:
return None
url = f"{self.base_url}/Users/AuthenticateByName"
headers = {"X-Emby-Token": self.api_key} if self.api_key else {}
payload = {"Username": username, "Pw": password}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
async def search_items(
self, term: str, item_types: Optional[list[str]] = None, limit: int = 20
) -> Optional[Dict[str, Any]]:
if not self.base_url or not self.api_key:
return None
url = f"{self.base_url}/Items"
params = {
"SearchTerm": term,
"IncludeItemTypes": ",".join(item_types or []),
"Recursive": "true",
"Limit": limit,
}
headers = {"X-Emby-Token": self.api_key}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
async def get_system_info(self) -> Optional[Dict[str, Any]]:
if not self.base_url or not self.api_key:
return None
url = f"{self.base_url}/System/Info"
headers = {"X-Emby-Token": self.api_key}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
return response.json()