ai-station/.venv/lib/python3.12/site-packages/traceloop/sdk/datasets/datasets.py

369 lines
13 KiB
Python

import csv
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, cast
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
from traceloop.sdk.client.http import HTTPClient
from traceloop.sdk.datasets.attachment import Attachment, ExternalAttachment
from traceloop.sdk.datasets.dataset import Dataset
from traceloop.sdk.datasets.model import (
ColumnDefinition,
ColumnType,
CreateDatasetRequest,
CreateDatasetResponse,
DatasetMetadata,
ValuesMap,
)
logger = logging.getLogger(__name__)
class Datasets:
"""
Datasets class dataset API communication
"""
_http: HTTPClient
def __init__(self, http: HTTPClient):
self._http = http
def get_all(self) -> List[DatasetMetadata]:
"""List all datasets metadata"""
result = self._http.get("datasets")
if result is None:
raise Exception("Failed to get datasets")
if isinstance(result, dict) and "datasets" in result:
return [DatasetMetadata(**dataset) for dataset in result["datasets"]]
return [DatasetMetadata(**dataset) for dataset in result]
def delete_by_slug(self, slug: str) -> None:
"""Delete dataset by slug without requiring an instance"""
success = self._http.delete(f"datasets/{slug}")
if not success:
raise Exception(f"Failed to delete dataset {slug}")
def get_by_slug(self, slug: str) -> "Dataset":
"""Get a dataset by slug and return a full Dataset instance"""
result = self._http.get(f"datasets/{slug}")
if result is None:
raise Exception(f"Failed to get dataset {slug}")
validated_data = CreateDatasetResponse(**result)
return Dataset.from_create_dataset_response(validated_data, self._http)
def create(self, dataset_request: CreateDatasetRequest) -> Dataset:
"""
Create a dataset with support for initial attachments.
If row values contain Attachment or ExternalAttachment objects,
they will be automatically uploaded/attached after dataset creation.
Args:
dataset_request: Dataset creation request, can contain Attachment objects in row values
Returns:
Created dataset with all attachments processed
Example:
dataset_request = CreateDatasetRequest(
slug="products",
name="Product Catalog",
columns=[
ColumnDefinition(slug="name", name="Name", type=ColumnType.STRING),
ColumnDefinition(slug="image", name="Image", type=ColumnType.FILE),
],
rows=[{
"name": "Product A",
"image": Attachment(file_path="/path/to/image.jpg", file_type=FileCellType.IMAGE)
}]
)
dataset = datasets.create(dataset_request)
"""
# Extract attachment objects from rows
attachments_to_process = self._extract_attachments(dataset_request)
# Replace attachment objects with None for initial creation
clean_request = self._prepare_request_for_creation(dataset_request)
# Create the dataset
response = self._create_dataset(clean_request)
dataset = Dataset.from_create_dataset_response(response, self._http)
# Process attachments if any
if attachments_to_process:
self._process_attachments(dataset, attachments_to_process)
return dataset
def from_csv(
self,
file_path: str,
slug: str,
name: Optional[str] = None,
description: Optional[str] = None,
) -> "Dataset":
"""Create dataset from CSV file"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"CSV file not found: {file_path}")
columns_definition: List[ColumnDefinition] = []
rows_with_names: List[ValuesMap] = []
with open(file_path, "r", encoding="utf-8") as csvfile:
# Detect delimiter
sample = csvfile.read(1024)
csvfile.seek(0)
sniffer = csv.Sniffer()
delimiter = sniffer.sniff(sample).delimiter
reader = csv.DictReader(csvfile, delimiter=delimiter)
# TODO: Handle None case for fieldnames more gracefully
if reader.fieldnames is None:
raise ValueError("CSV file has no headers")
for field_name in reader.fieldnames:
columns_definition.append(
ColumnDefinition(
slug=self._slugify(field_name),
name=field_name,
type=ColumnType.STRING,
)
)
for _, row_data in enumerate(reader):
rows_with_names.append(
{self._slugify(k): v for k, v in row_data.items()}
)
dataset_response = self._create_dataset(
CreateDatasetRequest(
slug=slug,
name=name,
description=description,
columns=columns_definition,
rows=rows_with_names,
)
)
dataset = Dataset.from_create_dataset_response(dataset_response, self._http)
return dataset
def from_dataframe(
self,
df: "pd.DataFrame",
slug: str,
name: Optional[str] = None,
description: Optional[str] = None,
) -> "Dataset":
"""Create dataset from pandas DataFrame"""
if not PANDAS_AVAILABLE:
raise ImportError(
"pandas is required for from_dataframe. Install with: pip install pandas"
)
# Create column definitions from DataFrame
columns_definition: List[ColumnDefinition] = []
for col_name in df.columns:
dtype = df[col_name].dtype
if pd.api.types.is_bool_dtype(dtype):
col_type = ColumnType.BOOLEAN
elif pd.api.types.is_numeric_dtype(dtype):
col_type = ColumnType.NUMBER
else:
col_type = ColumnType.STRING
columns_definition.append(
ColumnDefinition(
slug=self._slugify(col_name), name=col_name, type=col_type
)
)
# TODO: Pandas returns Hashable keys, should ensure they're strings
rows = [
{self._slugify(str(k)): v for k, v in row.items()}
for row in df.to_dict(orient="records")
]
dataset_response = self._create_dataset(
CreateDatasetRequest(
slug=slug,
name=name,
description=description,
columns=columns_definition,
rows=rows,
)
)
return Dataset.from_create_dataset_response(dataset_response, self._http)
def get_version_csv(self, slug: str, version: str) -> str:
"""Get a specific version of a dataset as a CSV string"""
result = self._http.get(f"datasets/{slug}/versions/{version}")
if result is None:
raise Exception(f"Failed to get dataset {slug} by version {version}")
return cast(str, result)
def get_version_jsonl(self, slug: str, version: str) -> str:
"""Get a specific version of a dataset as a JSONL string"""
result = self._http.get(f"datasets/{slug}/versions/{version}/jsonl")
if result is None:
raise Exception(f"Failed to get dataset {slug} by version {version}")
return cast(str, result)
def _create_dataset(self, input: CreateDatasetRequest) -> CreateDatasetResponse:
"""Create new dataset"""
data = input.model_dump()
result = self._http.post("datasets", data)
if result is None:
raise Exception("Failed to create dataset")
return CreateDatasetResponse(**result)
def _slugify(self, name: str) -> str:
"""Slugify a name"""
import re
if not name:
raise ValueError("Name cannot be empty")
slug = name.lower()
# Replace spaces and underscores with hyphens
slug = slug.replace(" ", "-").replace("_", "-")
# Remove any character that's not alphanumeric or hyphen
slug = re.sub(r"[^a-z0-9-]+", "", slug)
# Remove multiple consecutive hyphens
slug = re.sub(r"-+", "-", slug)
# Trim hyphens from start and end
slug = slug.strip("-")
if not slug:
raise ValueError(f"Name '{name}' cannot be slugified to a valid slug")
return slug
def _extract_attachments(
self, request: CreateDatasetRequest
) -> Dict[int, Dict[str, Any]]:
"""
Extract attachment objects from row values.
Returns:
Dictionary mapping row index to column slug to attachment object
"""
attachments: Dict[int, Dict[str, Any]] = {}
if request.rows:
for row_idx, row in enumerate(request.rows):
for col_slug, value in row.items():
if isinstance(value, (Attachment, ExternalAttachment)):
if row_idx not in attachments:
attachments[row_idx] = {}
attachments[row_idx][col_slug] = value
return attachments
def _prepare_request_for_creation(
self, request: CreateDatasetRequest
) -> CreateDatasetRequest:
"""
Replace attachment objects with None in row values for initial dataset creation.
Args:
request: Original dataset request with potential attachment objects
Returns:
Modified request with attachment objects replaced by None
"""
if not request.rows:
return request
# Create a deep copy of rows to avoid modifying the original
clean_rows = []
for row in request.rows:
clean_row: Dict[str, Any] = {}
for col_slug, value in row.items():
if isinstance(value, (Attachment, ExternalAttachment)):
clean_row[col_slug] = None
else:
clean_row[col_slug] = value
clean_rows.append(clean_row)
# Create a new request with cleaned rows
return CreateDatasetRequest(
slug=request.slug,
name=request.name,
description=request.description,
columns=request.columns,
rows=clean_rows,
)
def _process_attachments(
self, dataset: Dataset, attachments: Dict[int, Dict[str, Any]]
) -> None:
"""
Upload/attach all attachment objects to their respective cells.
Args:
dataset: The created dataset
attachments: Dictionary mapping row index to column slug to attachment object
"""
if not dataset.rows:
return
for row_idx, row_attachments in attachments.items():
if row_idx >= len(dataset.rows):
logger.warning(
f"Warning: Row index {row_idx} out of range, skipping attachments"
)
continue
row = dataset.rows[row_idx]
for col_slug, attachment in row_attachments.items():
try:
if isinstance(attachment, Attachment):
ref = attachment.upload(
self._http, dataset.slug, row.id, col_slug
)
elif isinstance(attachment, ExternalAttachment):
ref = attachment.attach(
self._http, dataset.slug, row.id, col_slug
)
else:
continue
# Update row values locally
row.values[col_slug] = {
"type": ref.file_type.value if ref.file_type else "file",
"status": "success",
"storage": ref.storage_type.value,
"storage_key": getattr(ref, "storage_key", None),
"url": getattr(ref, "url", None),
"metadata": ref.metadata,
}
except Exception as e:
logger.warning(
f"Warning: Failed to process attachment for row {row_idx}, column {col_slug}: {e}"
)
# Mark as failed in row values
row.values[col_slug] = {
"type": "file",
"status": "failed",
"error": str(e),
}