ai-station/.venv/lib/python3.12/site-packages/opentelemetry/instrumentation/haystack/wrap_openai.py

126 lines
4.2 KiB
Python

import logging
from opentelemetry import context as context_api
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.semconv_ai import SpanAttributes, LLMRequestTypeValues
from opentelemetry.instrumentation.haystack.utils import (
dont_throw,
with_tracer_wrapper,
set_span_attribute,
)
logger = logging.getLogger(__name__)
@dont_throw
def _set_input_attributes(span, llm_request_type, kwargs):
if llm_request_type == LLMRequestTypeValues.COMPLETION:
set_span_attribute(
span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.user", kwargs.get("prompt")
)
elif llm_request_type == LLMRequestTypeValues.CHAT:
set_span_attribute(
span,
f"{GenAIAttributes.GEN_AI_PROMPT}.0.user",
[message.content for message in kwargs.get("messages")],
)
if "generation_kwargs" in kwargs and kwargs["generation_kwargs"] is not None:
generation_kwargs = kwargs["generation_kwargs"]
if "model" in generation_kwargs:
set_span_attribute(
span, GenAIAttributes.GEN_AI_REQUEST_MODEL, generation_kwargs["model"]
)
if "temperature" in generation_kwargs:
set_span_attribute(
span,
GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE,
generation_kwargs["temperature"],
)
if "top_p" in generation_kwargs:
set_span_attribute(
span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, generation_kwargs["top_p"]
)
if "frequency_penalty" in generation_kwargs:
set_span_attribute(
span,
SpanAttributes.LLM_FREQUENCY_PENALTY,
generation_kwargs["frequency_penalty"],
)
if "presence_penalty" in generation_kwargs:
set_span_attribute(
span,
SpanAttributes.LLM_PRESENCE_PENALTY,
generation_kwargs["presence_penalty"],
)
return
def _set_span_completions(span, llm_request_type, choices):
if choices is None:
return
for index, message in enumerate(choices):
prefix = f"{GenAIAttributes.GEN_AI_COMPLETION}.{index}"
if llm_request_type == LLMRequestTypeValues.CHAT:
if message is not None:
set_span_attribute(span, f"{prefix}.role", "assistant")
set_span_attribute(span, f"{prefix}.content", message)
elif llm_request_type == LLMRequestTypeValues.COMPLETION:
set_span_attribute(span, f"{prefix}.content", message)
@dont_throw
def _set_response_attributes(span, llm_request_type, response):
_set_span_completions(span, llm_request_type, response)
def _llm_request_type_by_object(object_name):
if object_name == "OpenAIGenerator":
return LLMRequestTypeValues.COMPLETION
elif object_name == "OpenAIChatGenerator":
return LLMRequestTypeValues.CHAT
else:
return LLMRequestTypeValues.UNKNOWN
@with_tracer_wrapper
def wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return wrapped(*args, **kwargs)
llm_request_type = _llm_request_type_by_object(to_wrap.get("object"))
with tracer.start_as_current_span(
(
SpanAttributes.HAYSTACK_OPENAI_CHAT
if llm_request_type == LLMRequestTypeValues.CHAT
else SpanAttributes.HAYSTACK_OPENAI_COMPLETION
),
kind=SpanKind.CLIENT,
attributes={
GenAIAttributes.GEN_AI_SYSTEM: "OpenAI",
SpanAttributes.LLM_REQUEST_TYPE: llm_request_type.value,
},
) as span:
if span.is_recording():
_set_input_attributes(span, llm_request_type, kwargs)
response = wrapped(*args, **kwargs)
if response:
if span.is_recording():
_set_response_attributes(span, llm_request_type, response)
span.set_status(Status(StatusCode.OK))
return response