61 lines
2.4 KiB
Python
61 lines
2.4 KiB
Python
from typing import Any, Dict, Optional
|
|
import httpx
|
|
from .base import ApiClient
|
|
|
|
|
|
class JellyfinClient(ApiClient):
|
|
def __init__(self, base_url: Optional[str], api_key: Optional[str]):
|
|
super().__init__(base_url, api_key)
|
|
|
|
def configured(self) -> bool:
|
|
return bool(self.base_url and self.api_key)
|
|
|
|
async def get_users(self) -> Optional[Dict[str, Any]]:
|
|
if not self.base_url:
|
|
return None
|
|
url = f"{self.base_url}/Users"
|
|
headers = {"X-Emby-Token": self.api_key} if self.api_key else {}
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def authenticate_by_name(self, username: str, password: str) -> Optional[Dict[str, Any]]:
|
|
if not self.base_url:
|
|
return None
|
|
url = f"{self.base_url}/Users/AuthenticateByName"
|
|
headers = {"X-Emby-Token": self.api_key} if self.api_key else {}
|
|
payload = {"Username": username, "Pw": password}
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(url, headers=headers, json=payload)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def search_items(
|
|
self, term: str, item_types: Optional[list[str]] = None, limit: int = 20
|
|
) -> Optional[Dict[str, Any]]:
|
|
if not self.base_url or not self.api_key:
|
|
return None
|
|
url = f"{self.base_url}/Items"
|
|
params = {
|
|
"SearchTerm": term,
|
|
"IncludeItemTypes": ",".join(item_types or []),
|
|
"Recursive": "true",
|
|
"Limit": limit,
|
|
}
|
|
headers = {"X-Emby-Token": self.api_key}
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(url, headers=headers, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_system_info(self) -> Optional[Dict[str, Any]]:
|
|
if not self.base_url or not self.api_key:
|
|
return None
|
|
url = f"{self.base_url}/System/Info"
|
|
headers = {"X-Emby-Token": self.api_key}
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
return response.json()
|