ai-station/.venv/lib/python3.12/site-packages/literalai/exporter.py

252 lines
9.5 KiB
Python

import json
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional, Sequence, Union, cast
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from literalai.event_processor import EventProcessor
from literalai.helper import utc_now
from literalai.observability.generation import GenerationType
from literalai.observability.step import Step, StepDict
from literalai.prompt_engineering.prompt import PromptDict
# TODO: Suppport Gemini models https://github.com/traceloop/openllmetry/issues/2419
# TODO: Support llamaindex workflow https://github.com/traceloop/openllmetry/pull/2421
class LoggingSpanExporter(SpanExporter):
def __init__(
self,
logger_name: str = "span_exporter",
event_processor: Optional[EventProcessor] = None,
):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
self.event_processor = event_processor
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Export the spans by logging them."""
try:
for span in spans:
if (
span.attributes
and span.attributes.get("gen_ai.request.model", None) is not None
and self.event_processor is not None
):
step = self._create_step_from_span(span)
self.event_processor.add_event(cast(StepDict, step.to_dict()))
return SpanExportResult.SUCCESS
except Exception as e:
self.logger.error(f"Failed to export spans: {e}")
return SpanExportResult.FAILURE
def shutdown(self):
"""Shuts down the exporter."""
if self.event_processor is not None:
return self.event_processor.flush_and_stop()
def force_flush(self, timeout_millis: float = 30000) -> bool:
"""Force flush the exporter."""
return True
def _create_step_from_span(self, span: ReadableSpan) -> Step:
"""Convert a span to a Step object"""
attributes = span.attributes or {}
start_time = (
datetime.fromtimestamp(span.start_time / 1e9, tz=timezone.utc).isoformat()
if span.start_time
else utc_now()
)
end_time = (
datetime.fromtimestamp(span.end_time / 1e9, tz=timezone.utc).isoformat()
if span.end_time
else utc_now()
)
duration, token_throughput = self._calculate_duration_and_throughput(
span.start_time,
span.end_time,
int(str(attributes.get("llm.usage.total_tokens", 0))),
)
generation_type = attributes.get("llm.request.type")
is_chat = generation_type == "chat"
span_props = {
"parent_id": attributes.get(
"traceloop.association.properties.literal.parent_id"
),
"thread_id": attributes.get(
"traceloop.association.properties.literal.thread_id"
),
"root_run_id": attributes.get(
"traceloop.association.properties.literal.root_run_id"
),
"metadata": attributes.get(
"traceloop.association.properties.literal.metadata"
),
"tags": attributes.get("traceloop.association.properties.literal.tags"),
"name": attributes.get("traceloop.association.properties.literal.name"),
}
span_props = {
k: str(v) for k, v in span_props.items() if v is not None and v != "None"
}
serialized_prompt = attributes.get(
"traceloop.association.properties.literal.prompt"
)
prompt = cast(
Optional[PromptDict],
(
self._extract_json(str(serialized_prompt))
if serialized_prompt and serialized_prompt != "None"
else None
),
)
messages = self._extract_messages(cast(Dict, attributes)) if is_chat else []
message_completions = (
self._extract_messages(cast(Dict, attributes), "gen_ai.completion.")
if is_chat
else []
)
message_completion = message_completions[-1] if message_completions else None
previous_messages = (
messages + message_completions[:-1] if message_completions else messages
)
generation_content = {
"duration": duration,
"messages": previous_messages,
"message_completion": message_completion,
"prompt": attributes.get("gen_ai.prompt.0.user"),
"promptId": prompt.get("id") if prompt else None,
"completion": attributes.get("gen_ai.completion.0.content"),
"model": attributes.get("gen_ai.request.model"),
"provider": attributes.get("gen_ai.system"),
"tokenThroughputInSeconds": token_throughput,
"variables": prompt.get("variables") if prompt else None,
}
generation_settings = {
"max_tokens": attributes.get("gen_ai.request.max_tokens"),
"stream": attributes.get("llm.is_streaming"),
"token_count": attributes.get("llm.usage.total_tokens"),
"input_token_count": attributes.get("gen_ai.usage.prompt_tokens"),
"output_token_count": attributes.get("gen_ai.usage.completion_tokens"),
"frequency_penalty": attributes.get("gen_ai.request.frequency_penalty"),
"presence_penalty": attributes.get("gen_ai.request.presence_penalty"),
"temperature": attributes.get("gen_ai.request.temperature"),
"top_p": attributes.get("gen_ai.request.top_p"),
}
step_dict = {
"id": str(span.context.span_id) if span.context else None,
"name": span_props.get("name", span.name),
"type": "llm",
"metadata": self._extract_json(str(span_props.get("metadata", "{}"))),
"startTime": start_time,
"endTime": end_time,
"threadId": span_props.get("thread_id"),
"parentId": span_props.get("parent_id"),
"rootRunId": span_props.get("root_run_id"),
"tags": self._extract_json(str(span_props.get("tags", "[]"))),
"input": {
"content": (
generation_content["messages"]
if is_chat
else generation_content["prompt"]
)
},
"output": {
"content": (
generation_content["message_completion"]
if is_chat
else generation_content["completion"]
)
},
"generation": {
"type": GenerationType.CHAT if is_chat else GenerationType.COMPLETION,
"prompt": generation_content["prompt"] if not is_chat else None,
"completion": generation_content["completion"] if not is_chat else None,
"model": generation_content["model"],
"provider": generation_content["provider"],
"settings": generation_settings,
"tokenCount": generation_settings["token_count"],
"inputTokenCount": generation_settings["input_token_count"],
"outputTokenCount": generation_settings["output_token_count"],
"messages": generation_content["messages"],
"messageCompletion": generation_content["message_completion"],
},
}
step = Step.from_dict(cast(StepDict, step_dict))
if not span.status.is_ok:
step.error = span.status.description or "Unknown error"
return step
def _extract_messages(
self, data: Dict, prefix: str = "gen_ai.prompt."
) -> List[Dict]:
messages = []
index = 0
while True:
role_key = f"{prefix}{index}.role"
content_key = f"{prefix}{index}.content"
if role_key not in data or content_key not in data:
break
if data[role_key] == "placeholder":
break
messages.append(
{
"role": data[role_key],
"content": self._extract_json(data[content_key]),
}
)
index += 1
return messages
def _extract_json(self, data: str) -> Union[Dict, List, str]:
try:
content = json.loads(data)
except Exception:
content = data
return content
def _calculate_duration_and_throughput(
self,
start_time_ns: Optional[int],
end_time_ns: Optional[int],
total_tokens: Optional[int],
) -> tuple[float, Optional[float]]:
"""Calculate duration in seconds and token throughput per second."""
duration_ns = (
end_time_ns - start_time_ns if start_time_ns and end_time_ns else 0
)
duration_seconds = duration_ns / 1e9
token_throughput = None
if total_tokens is not None and duration_seconds > 0:
token_throughput = total_tokens / duration_seconds
return duration_seconds, token_throughput