186 lines
6.3 KiB
Python
186 lines
6.3 KiB
Python
import requests
|
|
from typing import Dict, Any, Optional, Union
|
|
|
|
from .exceptions import UnauthorizedError
|
|
|
|
|
|
class CredentialsManagementClient:
|
|
def __init__(self, base_url: str, api_key: Optional[str] = None):
|
|
"""
|
|
Initialize the CredentialsManagementClient.
|
|
|
|
Args:
|
|
base_url (str): The base URL of the LiteLLM proxy server (e.g., "http://localhost:8000")
|
|
api_key (Optional[str]): API key for authentication. If provided, it will be sent as a Bearer token.
|
|
"""
|
|
self._base_url = base_url.rstrip("/") # Remove trailing slash if present
|
|
self._api_key = api_key
|
|
|
|
def _get_headers(self) -> Dict[str, str]:
|
|
"""
|
|
Get the headers for API requests, including authorization if api_key is set.
|
|
|
|
Returns:
|
|
Dict[str, str]: Headers to use for API requests
|
|
"""
|
|
headers = {"Content-Type": "application/json"}
|
|
if self._api_key:
|
|
headers["Authorization"] = f"Bearer {self._api_key}"
|
|
return headers
|
|
|
|
def list(
|
|
self,
|
|
return_request: bool = False,
|
|
) -> Union[Dict[str, Any], requests.Request]:
|
|
"""
|
|
List all credentials.
|
|
|
|
Args:
|
|
return_request (bool): If True, returns the prepared request object instead of executing it
|
|
|
|
Returns:
|
|
Union[Dict[str, Any], requests.Request]: Either the response from the server or
|
|
a prepared request object if return_request is True
|
|
|
|
Raises:
|
|
UnauthorizedError: If the request fails with a 401 status code
|
|
requests.exceptions.RequestException: If the request fails with any other error
|
|
"""
|
|
url = f"{self._base_url}/credentials"
|
|
|
|
request = requests.Request("GET", url, headers=self._get_headers())
|
|
|
|
if return_request:
|
|
return request
|
|
|
|
session = requests.Session()
|
|
try:
|
|
response = session.send(request.prepare())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise UnauthorizedError(e)
|
|
raise
|
|
|
|
def create(
|
|
self,
|
|
credential_name: str,
|
|
credential_info: Dict[str, Any],
|
|
credential_values: Dict[str, Any],
|
|
return_request: bool = False,
|
|
) -> Union[Dict[str, Any], requests.Request]:
|
|
"""
|
|
Create a new credential.
|
|
|
|
Args:
|
|
credential_name (str): Name of the credential
|
|
credential_info (Dict[str, Any]): Additional information about the credential
|
|
credential_values (Dict[str, Any]): Values for the credential
|
|
return_request (bool): If True, returns the prepared request object instead of executing it
|
|
|
|
Returns:
|
|
Union[Dict[str, Any], requests.Request]: Either the response from the server or
|
|
a prepared request object if return_request is True
|
|
|
|
Raises:
|
|
UnauthorizedError: If the request fails with a 401 status code
|
|
requests.exceptions.RequestException: If the request fails with any other error
|
|
"""
|
|
url = f"{self._base_url}/credentials"
|
|
|
|
data = {
|
|
"credential_name": credential_name,
|
|
"credential_info": credential_info,
|
|
"credential_values": credential_values,
|
|
}
|
|
|
|
request = requests.Request("POST", url, headers=self._get_headers(), json=data)
|
|
|
|
if return_request:
|
|
return request
|
|
|
|
session = requests.Session()
|
|
try:
|
|
response = session.send(request.prepare())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise UnauthorizedError(e)
|
|
raise
|
|
|
|
def delete(
|
|
self,
|
|
credential_name: str,
|
|
return_request: bool = False,
|
|
) -> Union[Dict[str, Any], requests.Request]:
|
|
"""
|
|
Delete a credential by name.
|
|
|
|
Args:
|
|
credential_name (str): Name of the credential to delete
|
|
return_request (bool): If True, returns the prepared request object instead of executing it
|
|
|
|
Returns:
|
|
Union[Dict[str, Any], requests.Request]: Either the response from the server or
|
|
a prepared request object if return_request is True
|
|
|
|
Raises:
|
|
UnauthorizedError: If the request fails with a 401 status code
|
|
requests.exceptions.RequestException: If the request fails with any other error
|
|
"""
|
|
url = f"{self._base_url}/credentials/{credential_name}"
|
|
|
|
request = requests.Request("DELETE", url, headers=self._get_headers())
|
|
|
|
if return_request:
|
|
return request
|
|
|
|
session = requests.Session()
|
|
try:
|
|
response = session.send(request.prepare())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise UnauthorizedError(e)
|
|
raise
|
|
|
|
def get(
|
|
self,
|
|
credential_name: str,
|
|
return_request: bool = False,
|
|
) -> Union[Dict[str, Any], requests.Request]:
|
|
"""
|
|
Get a credential by name.
|
|
|
|
Args:
|
|
credential_name (str): Name of the credential to retrieve
|
|
return_request (bool): If True, returns the prepared request object instead of executing it
|
|
|
|
Returns:
|
|
Union[Dict[str, Any], requests.Request]: Either the response from the server or
|
|
a prepared request object if return_request is True
|
|
|
|
Raises:
|
|
UnauthorizedError: If the request fails with a 401 status code
|
|
requests.exceptions.RequestException: If the request fails with any other error
|
|
"""
|
|
url = f"{self._base_url}/credentials/by_name/{credential_name}"
|
|
|
|
request = requests.Request("GET", url, headers=self._get_headers())
|
|
|
|
if return_request:
|
|
return request
|
|
|
|
session = requests.Session()
|
|
try:
|
|
response = session.send(request.prepare())
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
raise UnauthorizedError(e)
|
|
raise
|