ai-station/.venv/lib/python3.12/site-packages/opentelemetry/instrumentation/agno/streaming.py

220 lines
7.9 KiB
Python

import logging
import time
from opentelemetry.instrumentation.agno.utils import should_send_prompts
from opentelemetry.metrics import Histogram
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.semconv_ai import SpanAttributes, TraceloopSpanKindValues
from opentelemetry.trace.status import Status, StatusCode
from wrapt import ObjectProxy
logger = logging.getLogger(__name__)
class AgnoAsyncStream(ObjectProxy):
"""Wrapper for Agno async streaming responses that handles instrumentation"""
def __init__(
self,
span,
response,
instance,
start_time,
duration_histogram: Histogram = None,
token_histogram: Histogram = None,
):
super().__init__(response)
self._self_span = span
self._self_instance = instance
self._self_start_time = start_time
self._self_duration_histogram = duration_histogram
self._self_token_histogram = token_histogram
self._self_events = []
self._self_final_result = None
self._self_instrumentation_completed = False
def __aiter__(self):
return self
async def __anext__(self):
try:
event = await self.__wrapped__.__anext__()
except StopAsyncIteration:
if not self._self_instrumentation_completed:
self._complete_instrumentation()
raise
except Exception as e:
if not self._self_instrumentation_completed:
if self._self_span and self._self_span.is_recording():
self._self_span.set_status(Status(StatusCode.ERROR, str(e)))
self._self_span.record_exception(e)
self._self_span.end()
self._self_instrumentation_completed = True
raise
self._self_events.append(event)
if hasattr(event, "event") and event.event == "run_response":
self._self_final_result = event
return event
def _complete_instrumentation(self):
"""Complete the instrumentation when stream is fully consumed"""
if self._self_instrumentation_completed:
return
try:
duration = time.time() - self._self_start_time
if self._self_final_result:
result = self._self_final_result
if hasattr(result, "content") and should_send_prompts():
self._self_span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content)
)
if hasattr(result, "run_id"):
self._self_span.set_attribute("agno.run.id", result.run_id)
if hasattr(result, "metrics"):
metrics = result.metrics
if hasattr(metrics, "input_tokens"):
self._self_span.set_attribute(
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
metrics.input_tokens,
)
if hasattr(metrics, "output_tokens"):
self._self_span.set_attribute(
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
metrics.output_tokens,
)
if hasattr(metrics, "total_tokens"):
self._self_span.set_attribute(
SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens
)
self._self_span.set_status(Status(StatusCode.OK))
if self._self_duration_histogram:
self._self_duration_histogram.record(
duration,
attributes={
GenAIAttributes.GEN_AI_SYSTEM: "agno",
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
},
)
except Exception as e:
logger.warning("Failed to complete instrumentation: %s", str(e))
finally:
if self._self_span.is_recording():
self._self_span.end()
self._self_instrumentation_completed = True
class AgnoStream(ObjectProxy):
"""Wrapper for Agno sync streaming responses that handles instrumentation"""
def __init__(
self,
span,
response,
instance,
start_time,
duration_histogram: Histogram = None,
token_histogram: Histogram = None,
):
super().__init__(response)
self._self_span = span
self._self_instance = instance
self._self_start_time = start_time
self._self_duration_histogram = duration_histogram
self._self_token_histogram = token_histogram
self._self_events = []
self._self_final_result = None
self._self_instrumentation_completed = False
def __iter__(self):
return self
def __next__(self):
try:
event = self.__wrapped__.__next__()
except StopIteration:
if not self._self_instrumentation_completed:
self._complete_instrumentation()
raise
except Exception as e:
if not self._self_instrumentation_completed:
if self._self_span and self._self_span.is_recording():
self._self_span.set_status(Status(StatusCode.ERROR, str(e)))
self._self_span.record_exception(e)
self._self_span.end()
self._self_instrumentation_completed = True
raise
self._self_events.append(event)
if hasattr(event, "event") and event.event == "run_response":
self._self_final_result = event
return event
def _complete_instrumentation(self):
"""Complete the instrumentation when stream is fully consumed"""
if self._self_instrumentation_completed:
return
try:
duration = time.time() - self._self_start_time
if self._self_final_result:
result = self._self_final_result
if hasattr(result, "content") and should_send_prompts():
self._self_span.set_attribute(
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content)
)
if hasattr(result, "run_id"):
self._self_span.set_attribute("agno.run.id", result.run_id)
if hasattr(result, "metrics"):
metrics = result.metrics
if hasattr(metrics, "input_tokens"):
self._self_span.set_attribute(
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
metrics.input_tokens,
)
if hasattr(metrics, "output_tokens"):
self._self_span.set_attribute(
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
metrics.output_tokens,
)
if hasattr(metrics, "total_tokens"):
self._self_span.set_attribute(
SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens
)
self._self_span.set_status(Status(StatusCode.OK))
if self._self_duration_histogram:
self._self_duration_histogram.record(
duration,
attributes={
GenAIAttributes.GEN_AI_SYSTEM: "agno",
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
},
)
except Exception as e:
logger.warning("Failed to complete instrumentation: %s", str(e))
finally:
if self._self_span.is_recording():
self._self_span.end()
self._self_instrumentation_completed = True