ai-station/.venv/lib/python3.12/site-packages/literalai/api/helpers/step_helpers.py

142 lines
3.8 KiB
Python

from typing import Any, Dict, List, Optional, Union
from literalai.api.helpers import gql
from literalai.my_types import PaginatedResponse
from literalai.observability.filter import steps_filters, steps_order_by
from literalai.observability.step import Step, StepDict, StepType
def create_step_helper(
thread_id: Optional[str] = None,
type: Optional[StepType] = "undefined",
start_time: Optional[str] = None,
end_time: Optional[str] = None,
input: Optional[Dict] = None,
output: Optional[Dict] = None,
metadata: Optional[Dict] = None,
parent_id: Optional[str] = None,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
root_run_id: Optional[str] = None,
):
variables = {
"threadId": thread_id,
"type": type,
"startTime": start_time,
"endTime": end_time,
"input": input,
"output": output,
"metadata": metadata,
"parentId": parent_id,
"name": name,
"tags": tags,
"root_run_id": root_run_id,
}
def process_response(response):
return Step.from_dict(response["data"]["createStep"])
description = "create step"
return gql.CREATE_STEP, description, variables, process_response
def update_step_helper(
id: str,
type: Optional[StepType] = None,
input: Optional[str] = None,
output: Optional[str] = None,
metadata: Optional[Dict] = None,
name: Optional[str] = None,
tags: Optional[List[str]] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
parent_id: Optional[str] = None,
):
variables = {
"id": id,
"type": type,
"input": input,
"output": output,
"metadata": metadata,
"name": name,
"tags": tags,
"startTime": start_time,
"endTime": end_time,
"parentId": parent_id,
}
def process_response(response):
return Step.from_dict(response["data"]["updateStep"])
description = "update step"
return gql.UPDATE_STEP, description, variables, process_response
def get_steps_helper(
first: Optional[int] = None,
after: Optional[str] = None,
before: Optional[str] = None,
filters: Optional[steps_filters] = None,
order_by: Optional[steps_order_by] = None,
):
variables: Dict[str, Any] = {}
if first:
variables["first"] = first
if after:
variables["after"] = after
if before:
variables["before"] = before
if filters:
variables["filters"] = filters
if order_by:
variables["orderBy"] = order_by
def process_response(response):
processed_response = response["data"]["steps"]
processed_response["data"] = [
edge["node"] for edge in processed_response["edges"]
]
return PaginatedResponse[Step].from_dict(processed_response, Step)
description = "get steps"
return gql.GET_STEPS, description, variables, process_response
def get_step_helper(id: str):
variables = {"id": id}
def process_response(response):
step = response["data"]["step"]
return Step.from_dict(step) if step else None
description = "get step"
return gql.GET_STEP, description, variables, process_response
def delete_step_helper(id: str):
variables = {"id": id}
def process_response(response):
return bool(response["data"]["deleteStep"])
description = "delete step"
return gql.DELETE_STEP, description, variables, process_response
def send_steps_helper(steps: List[Union[StepDict, "Step"]]):
query = gql.steps_query_builder(steps)
variables = gql.steps_variables_builder(steps)
description = "send steps"
def process_response(response: Dict):
return response
return query, description, variables, process_response