Initial commit
This commit is contained in:
60
backend/app/clients/jellyfin.py
Normal file
60
backend/app/clients/jellyfin.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from typing import Any, Dict, Optional
|
||||
import httpx
|
||||
from .base import ApiClient
|
||||
|
||||
|
||||
class JellyfinClient(ApiClient):
|
||||
def __init__(self, base_url: Optional[str], api_key: Optional[str]):
|
||||
super().__init__(base_url, api_key)
|
||||
|
||||
def configured(self) -> bool:
|
||||
return bool(self.base_url and self.api_key)
|
||||
|
||||
async def get_users(self) -> Optional[Dict[str, Any]]:
|
||||
if not self.base_url:
|
||||
return None
|
||||
url = f"{self.base_url}/Users"
|
||||
headers = {"X-Emby-Token": self.api_key} if self.api_key else {}
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def authenticate_by_name(self, username: str, password: str) -> Optional[Dict[str, Any]]:
|
||||
if not self.base_url:
|
||||
return None
|
||||
url = f"{self.base_url}/Users/AuthenticateByName"
|
||||
headers = {"X-Emby-Token": self.api_key} if self.api_key else {}
|
||||
payload = {"Username": username, "Pw": password}
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def search_items(
|
||||
self, term: str, item_types: Optional[list[str]] = None, limit: int = 20
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
if not self.base_url or not self.api_key:
|
||||
return None
|
||||
url = f"{self.base_url}/Items"
|
||||
params = {
|
||||
"SearchTerm": term,
|
||||
"IncludeItemTypes": ",".join(item_types or []),
|
||||
"Recursive": "true",
|
||||
"Limit": limit,
|
||||
}
|
||||
headers = {"X-Emby-Token": self.api_key}
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.get(url, headers=headers, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def get_system_info(self) -> Optional[Dict[str, Any]]:
|
||||
if not self.base_url or not self.api_key:
|
||||
return None
|
||||
url = f"{self.base_url}/System/Info"
|
||||
headers = {"X-Emby-Token": self.api_key}
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
response = await client.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
Reference in New Issue
Block a user