519 lines
21 KiB
Python
519 lines
21 KiB
Python
|
|
import cuid
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
from typing import Any, List, Callable, Optional, Tuple, Dict, Awaitable, Union
|
||
|
|
from traceloop.sdk.client.http import HTTPClient
|
||
|
|
from traceloop.sdk.datasets.datasets import Datasets
|
||
|
|
from traceloop.sdk.evaluator.evaluator import Evaluator, validate_and_normalize_task_output
|
||
|
|
from traceloop.sdk.experiment.model import (
|
||
|
|
InitExperimentRequest,
|
||
|
|
ExperimentInitResponse,
|
||
|
|
CreateTaskRequest,
|
||
|
|
CreateTaskResponse,
|
||
|
|
EvaluatorSpec,
|
||
|
|
TaskResponse,
|
||
|
|
RunInGithubRequest,
|
||
|
|
RunInGithubResponse,
|
||
|
|
TaskResult,
|
||
|
|
GithubContext,
|
||
|
|
)
|
||
|
|
from traceloop.sdk.evaluator.config import EvaluatorDetails
|
||
|
|
import httpx
|
||
|
|
|
||
|
|
|
||
|
|
class Experiment:
|
||
|
|
"""Main Experiment class for creating experiment contexts"""
|
||
|
|
|
||
|
|
_datasets: Datasets
|
||
|
|
_evaluator: Evaluator
|
||
|
|
_http_client: HTTPClient
|
||
|
|
|
||
|
|
def __init__(self, http_client: HTTPClient, async_http_client: httpx.AsyncClient, experiment_slug: str):
|
||
|
|
self._datasets = Datasets(http_client)
|
||
|
|
self._evaluator = Evaluator(async_http_client)
|
||
|
|
self._http_client = http_client
|
||
|
|
self._experiment_slug = experiment_slug
|
||
|
|
|
||
|
|
async def run(
|
||
|
|
self,
|
||
|
|
task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]],
|
||
|
|
evaluators: List[EvaluatorSpec],
|
||
|
|
dataset_slug: Optional[str] = None,
|
||
|
|
dataset_version: Optional[str] = None,
|
||
|
|
experiment_slug: Optional[str] = None,
|
||
|
|
experiment_metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
related_ref: Optional[Dict[str, str]] = None,
|
||
|
|
aux: Optional[Dict[str, str]] = None,
|
||
|
|
stop_on_error: bool = False,
|
||
|
|
wait_for_results: bool = True,
|
||
|
|
) -> Tuple[List[TaskResponse], List[str]] | RunInGithubResponse:
|
||
|
|
"""Run an experiment with the given task and evaluators
|
||
|
|
|
||
|
|
Args:
|
||
|
|
task: Async function to run on each dataset row
|
||
|
|
evaluators: List of evaluator slugs or EvaluatorDetails objects to run
|
||
|
|
dataset_slug: Slug of the dataset to use
|
||
|
|
dataset_version: Version of the dataset to use
|
||
|
|
experiment_slug: Slug for this experiment run
|
||
|
|
experiment_metadata: Metadata for this experiment (an experiment holds all the experiment runs)
|
||
|
|
related_ref: Related reference for this experiment run
|
||
|
|
aux: Auxiliary information for this experiment run
|
||
|
|
stop_on_error: Whether to stop on first error (default: False)
|
||
|
|
wait_for_results: Whether to wait for async tasks to complete (default: True)
|
||
|
|
Returns:
|
||
|
|
Tuple of (results, errors). Returns ([], []) if wait_for_results is False
|
||
|
|
"""
|
||
|
|
if os.getenv("GITHUB_ACTIONS"):
|
||
|
|
return await self._run_in_github(
|
||
|
|
task=task,
|
||
|
|
dataset_slug=dataset_slug,
|
||
|
|
dataset_version=dataset_version,
|
||
|
|
evaluators=evaluators,
|
||
|
|
experiment_slug=experiment_slug,
|
||
|
|
related_ref=related_ref,
|
||
|
|
aux=aux,
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
return await self._run_locally(
|
||
|
|
task=task,
|
||
|
|
evaluators=evaluators,
|
||
|
|
dataset_slug=dataset_slug,
|
||
|
|
dataset_version=dataset_version,
|
||
|
|
experiment_slug=experiment_slug,
|
||
|
|
experiment_metadata=experiment_metadata,
|
||
|
|
related_ref=related_ref,
|
||
|
|
aux=aux,
|
||
|
|
stop_on_error=stop_on_error,
|
||
|
|
wait_for_results=wait_for_results,
|
||
|
|
)
|
||
|
|
|
||
|
|
async def _run_locally(
|
||
|
|
self,
|
||
|
|
task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]],
|
||
|
|
evaluators: List[EvaluatorSpec],
|
||
|
|
dataset_slug: Optional[str] = None,
|
||
|
|
dataset_version: Optional[str] = None,
|
||
|
|
experiment_slug: Optional[str] = None,
|
||
|
|
experiment_metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
related_ref: Optional[Dict[str, str]] = None,
|
||
|
|
aux: Optional[Dict[str, str]] = None,
|
||
|
|
stop_on_error: bool = False,
|
||
|
|
wait_for_results: bool = True,
|
||
|
|
) -> Tuple[List[TaskResponse], List[str]]:
|
||
|
|
"""Run an experiment with the given task and evaluators
|
||
|
|
|
||
|
|
Args:
|
||
|
|
dataset_slug: Slug of the dataset to use
|
||
|
|
task: Async function to run on each dataset row
|
||
|
|
evaluators: List of evaluator slugs to run
|
||
|
|
experiment_slug: Slug for this experiment run
|
||
|
|
experiment_metadata: Metadata for this experiment (an experiment holds all the experiment runs)
|
||
|
|
related_ref: Related reference for this experiment run
|
||
|
|
aux: Auxiliary information for this experiment run
|
||
|
|
stop_on_error: Whether to stop on first error (default: False)
|
||
|
|
wait_for_results: Whether to wait for async tasks to complete (default: True)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Tuple of (results, errors). Returns ([], []) if wait_for_results is False
|
||
|
|
"""
|
||
|
|
|
||
|
|
if not experiment_slug:
|
||
|
|
experiment_slug = self._experiment_slug or "exp-" + str(cuid.cuid())[:11]
|
||
|
|
|
||
|
|
experiment_run_metadata = {
|
||
|
|
key: value
|
||
|
|
for key, value in [("related_ref", related_ref), ("aux", aux)]
|
||
|
|
if value is not None
|
||
|
|
}
|
||
|
|
|
||
|
|
# Convert evaluators to tuples of (slug, version, config)
|
||
|
|
evaluator_details: Optional[List[Tuple[str, Optional[str], Optional[Dict[str, Any]]]]] = None
|
||
|
|
if evaluators:
|
||
|
|
evaluator_details = []
|
||
|
|
for evaluator in evaluators:
|
||
|
|
if isinstance(evaluator, str):
|
||
|
|
# Simple string slug
|
||
|
|
evaluator_details.append((evaluator, None, None))
|
||
|
|
elif isinstance(evaluator, EvaluatorDetails):
|
||
|
|
# EvaluatorDetails object with config
|
||
|
|
evaluator_details.append((evaluator.slug, evaluator.version, evaluator.config))
|
||
|
|
|
||
|
|
experiment = self._init_experiment(
|
||
|
|
experiment_slug,
|
||
|
|
dataset_slug=dataset_slug,
|
||
|
|
dataset_version=dataset_version,
|
||
|
|
evaluator_slugs=[slug for slug, _, _ in evaluator_details]
|
||
|
|
if evaluator_details
|
||
|
|
else None,
|
||
|
|
experiment_metadata=experiment_metadata,
|
||
|
|
experiment_run_metadata=experiment_run_metadata,
|
||
|
|
)
|
||
|
|
|
||
|
|
run_id = experiment.run.id
|
||
|
|
|
||
|
|
rows = []
|
||
|
|
if dataset_slug and dataset_version:
|
||
|
|
jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version)
|
||
|
|
rows = self._parse_jsonl_to_rows(jsonl_data)
|
||
|
|
|
||
|
|
results: List[TaskResponse] = []
|
||
|
|
errors: List[str] = []
|
||
|
|
|
||
|
|
evaluators_to_validate = [evaluator for evaluator in evaluators if isinstance(evaluator, EvaluatorDetails)]
|
||
|
|
|
||
|
|
async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResponse:
|
||
|
|
try:
|
||
|
|
task_result = await task(row)
|
||
|
|
|
||
|
|
# Validate task output with EvaluatorDetails and normalize field names using synonyms
|
||
|
|
if evaluators_to_validate:
|
||
|
|
task_result = validate_and_normalize_task_output(task_result, evaluators_to_validate)
|
||
|
|
|
||
|
|
task_id = self._create_task(
|
||
|
|
experiment_slug=experiment_slug,
|
||
|
|
experiment_run_id=run_id,
|
||
|
|
task_input=row,
|
||
|
|
task_output=task_result,
|
||
|
|
).id
|
||
|
|
|
||
|
|
eval_results: Dict[str, Union[Dict[str, Any], str]] = {}
|
||
|
|
if evaluator_details:
|
||
|
|
for evaluator_slug, evaluator_version, evaluator_config in evaluator_details:
|
||
|
|
try:
|
||
|
|
if wait_for_results:
|
||
|
|
eval_result = (
|
||
|
|
await self._evaluator.run_experiment_evaluator(
|
||
|
|
evaluator_slug=evaluator_slug,
|
||
|
|
evaluator_version=evaluator_version,
|
||
|
|
evaluator_config=evaluator_config,
|
||
|
|
task_id=task_id,
|
||
|
|
experiment_id=experiment.experiment.id,
|
||
|
|
experiment_run_id=run_id,
|
||
|
|
input=task_result,
|
||
|
|
timeout_in_sec=120,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
eval_results[evaluator_slug] = eval_result.result
|
||
|
|
else:
|
||
|
|
await self._evaluator.trigger_experiment_evaluator(
|
||
|
|
evaluator_slug=evaluator_slug,
|
||
|
|
evaluator_version=evaluator_version,
|
||
|
|
evaluator_config=evaluator_config,
|
||
|
|
task_id=task_id,
|
||
|
|
experiment_id=experiment.experiment.id,
|
||
|
|
experiment_run_id=run_id,
|
||
|
|
input=task_result,
|
||
|
|
)
|
||
|
|
|
||
|
|
msg = f"Triggered execution of {evaluator_slug}"
|
||
|
|
eval_results[evaluator_slug] = msg
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = f"Error: {str(e)}"
|
||
|
|
eval_results[evaluator_slug] = error_msg
|
||
|
|
# Log the error so user can see it
|
||
|
|
print(f"\033[91m❌ Evaluator '{evaluator_slug}' failed: {str(e)}\033[0m")
|
||
|
|
|
||
|
|
return TaskResponse(
|
||
|
|
task_result=task_result,
|
||
|
|
evaluations=eval_results,
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = f"Error processing row: {str(e)}"
|
||
|
|
# Print error to console so user can see it
|
||
|
|
print(f"\033[91m❌ Task execution failed: {str(e)}\033[0m")
|
||
|
|
if stop_on_error:
|
||
|
|
raise e
|
||
|
|
return TaskResponse(error=error_msg)
|
||
|
|
|
||
|
|
semaphore = asyncio.Semaphore(50)
|
||
|
|
|
||
|
|
async def run_with_semaphore(row: Optional[Dict[str, Any]]) -> TaskResponse:
|
||
|
|
async with semaphore:
|
||
|
|
return await run_single_row(row)
|
||
|
|
|
||
|
|
tasks = [asyncio.create_task(run_with_semaphore(row)) for row in rows]
|
||
|
|
|
||
|
|
if not wait_for_results:
|
||
|
|
# Still need to execute tasks to trigger evaluators, but don't wait for completion
|
||
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
|
return [], []
|
||
|
|
|
||
|
|
for completed_task in asyncio.as_completed(tasks):
|
||
|
|
try:
|
||
|
|
result = await completed_task
|
||
|
|
if result.error:
|
||
|
|
errors.append(result.error)
|
||
|
|
else:
|
||
|
|
results.append(result)
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = f"Task execution error: {str(e)}"
|
||
|
|
errors.append(error_msg)
|
||
|
|
if stop_on_error:
|
||
|
|
break
|
||
|
|
|
||
|
|
return results, errors
|
||
|
|
|
||
|
|
async def _run_in_github(
|
||
|
|
self,
|
||
|
|
task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]],
|
||
|
|
evaluators: List[EvaluatorSpec],
|
||
|
|
dataset_slug: Optional[str] = None,
|
||
|
|
dataset_version: Optional[str] = None,
|
||
|
|
experiment_slug: Optional[str] = None,
|
||
|
|
experiment_metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
related_ref: Optional[Dict[str, str]] = None,
|
||
|
|
aux: Optional[Dict[str, str]] = None,
|
||
|
|
) -> RunInGithubResponse:
|
||
|
|
"""Execute tasks locally and submit results to backend for GitHub CI/CD
|
||
|
|
|
||
|
|
This method:
|
||
|
|
1. Fetches the dataset
|
||
|
|
2. Executes all tasks locally
|
||
|
|
3. Sends task results to backend
|
||
|
|
4. Backend runs evaluators and posts PR comment
|
||
|
|
|
||
|
|
Args:
|
||
|
|
task: Async function to run on each dataset row
|
||
|
|
dataset_slug: Slug of the dataset to use
|
||
|
|
dataset_version: Version of the dataset
|
||
|
|
evaluators: List of evaluator slugs or (slug, version) tuples to run
|
||
|
|
experiment_slug: Slug for this experiment run
|
||
|
|
experiment_metadata: Metadata for this experiment (an experiment holds all the experiment runs)
|
||
|
|
related_ref: Additional reference information for this experiment run
|
||
|
|
aux: Auxiliary information for this experiment run
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
RunInGithubResponse with experiment_id, run_id, and status
|
||
|
|
|
||
|
|
Raises:
|
||
|
|
RuntimeError: If not running in GitHub Actions environment
|
||
|
|
Exception: If the API request fails
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Check if running in GitHub Actions
|
||
|
|
if not os.getenv("GITHUB_ACTIONS"):
|
||
|
|
raise RuntimeError(
|
||
|
|
"run_in_github() can only be used in GitHub Actions CI/CD environment. "
|
||
|
|
"To run experiments locally, use the run() method instead."
|
||
|
|
)
|
||
|
|
|
||
|
|
if not experiment_slug:
|
||
|
|
experiment_slug = self._experiment_slug or "exp-" + str(cuid.cuid())[:11]
|
||
|
|
|
||
|
|
# Fetch dataset rows
|
||
|
|
rows = []
|
||
|
|
if dataset_slug and dataset_version:
|
||
|
|
jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version)
|
||
|
|
rows = self._parse_jsonl_to_rows(jsonl_data)
|
||
|
|
|
||
|
|
task_results = await self._execute_tasks(rows, task, evaluators)
|
||
|
|
|
||
|
|
# Construct GitHub context
|
||
|
|
repository = os.getenv("GITHUB_REPOSITORY")
|
||
|
|
server_url = os.getenv("GITHUB_SERVER_URL", "https://github.com")
|
||
|
|
github_event_name = os.getenv("GITHUB_EVENT_NAME", "")
|
||
|
|
|
||
|
|
# Verify this is running in a pull request context
|
||
|
|
if github_event_name != "pull_request":
|
||
|
|
raise RuntimeError(
|
||
|
|
f"run_in_github() can only be used in pull_request workflow. "
|
||
|
|
f"Current event: {github_event_name}. "
|
||
|
|
"To run experiments locally, use the run() method instead."
|
||
|
|
)
|
||
|
|
|
||
|
|
# Extract PR number from GITHUB_REF (format: "refs/pull/123/merge")
|
||
|
|
github_ref = os.getenv("GITHUB_REF", "")
|
||
|
|
pr_number = None
|
||
|
|
if github_ref.startswith("refs/pull/"):
|
||
|
|
pr_number = github_ref.split("/")[2]
|
||
|
|
|
||
|
|
if not repository or not github_ref or not pr_number:
|
||
|
|
raise RuntimeError(
|
||
|
|
"GITHUB_REPOSITORY and GITHUB_REF must be set in the environment. "
|
||
|
|
"To run experiments locally, use the run() method instead."
|
||
|
|
)
|
||
|
|
|
||
|
|
pr_url = f"{server_url}/{repository}/pull/{pr_number}"
|
||
|
|
|
||
|
|
github_context = GithubContext(
|
||
|
|
repository=repository,
|
||
|
|
pr_url=pr_url,
|
||
|
|
commit_hash=os.getenv("GITHUB_SHA", ""),
|
||
|
|
actor=os.getenv("GITHUB_ACTOR", ""),
|
||
|
|
)
|
||
|
|
|
||
|
|
experiment_metadata = dict(
|
||
|
|
experiment_metadata or {},
|
||
|
|
created_from="github"
|
||
|
|
)
|
||
|
|
|
||
|
|
experiment_run_metadata = {
|
||
|
|
key: value
|
||
|
|
for key, value in [("related_ref", related_ref), ("aux", aux)]
|
||
|
|
if value is not None
|
||
|
|
}
|
||
|
|
|
||
|
|
# Extract evaluator slugs
|
||
|
|
evaluator_slugs = None
|
||
|
|
if evaluators:
|
||
|
|
evaluator_slugs = []
|
||
|
|
for evaluator in evaluators:
|
||
|
|
if isinstance(evaluator, str):
|
||
|
|
evaluator_slugs.append(evaluator)
|
||
|
|
elif isinstance(evaluator, EvaluatorDetails):
|
||
|
|
evaluator_slugs.append(evaluator.slug)
|
||
|
|
|
||
|
|
# Prepare request payload
|
||
|
|
request_body = RunInGithubRequest(
|
||
|
|
experiment_slug=experiment_slug,
|
||
|
|
dataset_slug=dataset_slug,
|
||
|
|
dataset_version=dataset_version,
|
||
|
|
evaluator_slugs=evaluator_slugs,
|
||
|
|
task_results=task_results,
|
||
|
|
github_context=github_context,
|
||
|
|
experiment_metadata=experiment_metadata,
|
||
|
|
experiment_run_metadata=experiment_run_metadata,
|
||
|
|
)
|
||
|
|
|
||
|
|
response = self._http_client.post(
|
||
|
|
"/experiments/run-in-github",
|
||
|
|
request_body.model_dump(mode="json", exclude_none=True),
|
||
|
|
)
|
||
|
|
|
||
|
|
if response is None:
|
||
|
|
raise Exception(
|
||
|
|
f"Failed to submit experiment '{experiment_slug}' for GitHub execution. "
|
||
|
|
)
|
||
|
|
|
||
|
|
return RunInGithubResponse(**response)
|
||
|
|
|
||
|
|
def _init_experiment(
|
||
|
|
self,
|
||
|
|
experiment_slug: str,
|
||
|
|
dataset_slug: Optional[str] = None,
|
||
|
|
dataset_version: Optional[str] = None,
|
||
|
|
evaluator_slugs: Optional[List[str]] = None,
|
||
|
|
experiment_metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
experiment_run_metadata: Optional[Dict[str, Any]] = None,
|
||
|
|
) -> ExperimentInitResponse:
|
||
|
|
"""Get experiment by slug from API"""
|
||
|
|
body = InitExperimentRequest(
|
||
|
|
slug=experiment_slug,
|
||
|
|
dataset_slug=dataset_slug,
|
||
|
|
dataset_version=dataset_version,
|
||
|
|
evaluator_slugs=evaluator_slugs,
|
||
|
|
experiment_metadata=experiment_metadata,
|
||
|
|
experiment_run_metadata=experiment_run_metadata,
|
||
|
|
)
|
||
|
|
response = self._http_client.put(
|
||
|
|
"/experiments/initialize", body.model_dump(mode="json")
|
||
|
|
)
|
||
|
|
if response is None:
|
||
|
|
raise Exception(
|
||
|
|
f"Failed to create or fetch experiment with slug '{experiment_slug}'"
|
||
|
|
)
|
||
|
|
return ExperimentInitResponse(**response)
|
||
|
|
|
||
|
|
def _create_task(
|
||
|
|
self,
|
||
|
|
experiment_slug: str,
|
||
|
|
experiment_run_id: str,
|
||
|
|
task_input: Optional[Dict[str, Any]],
|
||
|
|
task_output: Dict[str, Any],
|
||
|
|
) -> CreateTaskResponse:
|
||
|
|
body = CreateTaskRequest(
|
||
|
|
input=task_input,
|
||
|
|
output=task_output,
|
||
|
|
)
|
||
|
|
response = self._http_client.post(
|
||
|
|
f"/experiments/{experiment_slug}/runs/{experiment_run_id}/task",
|
||
|
|
body.model_dump(mode="json"),
|
||
|
|
)
|
||
|
|
if response is None:
|
||
|
|
raise Exception(f"Failed to create task for experiment '{experiment_slug}'")
|
||
|
|
return CreateTaskResponse(**response)
|
||
|
|
|
||
|
|
def _parse_jsonl_to_rows(self, jsonl_data: str) -> List[Dict[str, Any]]:
|
||
|
|
"""Parse JSONL string into list of {col_name: col_value} dictionaries"""
|
||
|
|
rows = []
|
||
|
|
lines = jsonl_data.strip().split("\n")
|
||
|
|
|
||
|
|
# Skip the first line (columns definition)
|
||
|
|
for line in lines[1:]:
|
||
|
|
if line.strip():
|
||
|
|
try:
|
||
|
|
row_data = json.loads(line)
|
||
|
|
rows.append(row_data)
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
# Skip invalid JSON lines
|
||
|
|
continue
|
||
|
|
|
||
|
|
return rows
|
||
|
|
|
||
|
|
async def _execute_tasks(
|
||
|
|
self,
|
||
|
|
rows: List[Dict[str, Any]],
|
||
|
|
task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]],
|
||
|
|
evaluators: Optional[List[EvaluatorSpec]] = None,
|
||
|
|
) -> List[TaskResult]:
|
||
|
|
"""Execute tasks locally with concurrency control
|
||
|
|
|
||
|
|
Args:
|
||
|
|
rows: List of dataset rows to process
|
||
|
|
task: Function to run on each row
|
||
|
|
evaluators: List of evaluators to validate task output against
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
List of TaskResult objects with inputs, outputs, and errors
|
||
|
|
"""
|
||
|
|
task_results: List[TaskResult] = []
|
||
|
|
|
||
|
|
# Extract EvaluatorDetails from evaluators list
|
||
|
|
evaluators_to_validate = []
|
||
|
|
if evaluators:
|
||
|
|
for evaluator in evaluators:
|
||
|
|
if isinstance(evaluator, EvaluatorDetails):
|
||
|
|
evaluators_to_validate.append(evaluator)
|
||
|
|
|
||
|
|
async def run_single_row(row: Optional[Dict[str, Any]]) -> TaskResult:
|
||
|
|
try:
|
||
|
|
task_output = await task(row)
|
||
|
|
|
||
|
|
# Validate task output schema and normalize field names using synonyms
|
||
|
|
if evaluators_to_validate:
|
||
|
|
try:
|
||
|
|
task_output = validate_and_normalize_task_output(task_output, evaluators_to_validate)
|
||
|
|
except ValueError as validation_error:
|
||
|
|
print(f"\033[91m❌ Task validation failed: {str(validation_error)}\033[0m")
|
||
|
|
raise ValueError(str(validation_error))
|
||
|
|
|
||
|
|
return TaskResult(
|
||
|
|
input=row,
|
||
|
|
output=task_output,
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
if isinstance(e, ValueError):
|
||
|
|
raise e
|
||
|
|
print(f"\033[91m❌ Task execution error: {str(e)}\033[0m")
|
||
|
|
return TaskResult(
|
||
|
|
input=row,
|
||
|
|
error=str(e),
|
||
|
|
)
|
||
|
|
|
||
|
|
# Execute tasks with concurrency control
|
||
|
|
semaphore = asyncio.Semaphore(50)
|
||
|
|
|
||
|
|
async def run_with_semaphore(row: Dict[str, Any]) -> TaskResult:
|
||
|
|
async with semaphore:
|
||
|
|
return await run_single_row(row)
|
||
|
|
|
||
|
|
tasks = [asyncio.create_task(run_with_semaphore(row)) for row in rows]
|
||
|
|
|
||
|
|
for completed_task in asyncio.as_completed(tasks):
|
||
|
|
result = await completed_task
|
||
|
|
task_results.append(result)
|
||
|
|
|
||
|
|
return task_results
|