ai-station/.venv/lib/python3.12/site-packages/opentelemetry/instrumentation/langchain/span_utils.py

418 lines
14 KiB
Python

import json
import time
from dataclasses import dataclass, field
from typing import Any, Optional
from uuid import UUID
from langchain_core.messages import (
BaseMessage,
)
from langchain_core.outputs import (
LLMResult,
)
from opentelemetry.context.context import Context
from opentelemetry.instrumentation.langchain.utils import (
CallbackFilteredJSONEncoder,
should_send_prompts,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
from opentelemetry.metrics import Histogram
from opentelemetry.semconv_ai import (
SpanAttributes,
)
from opentelemetry.trace.span import Span
@dataclass
class SpanHolder:
span: Span
token: Any
context: Context
children: list[UUID]
workflow_name: str
entity_name: str
entity_path: str
start_time: float = field(default_factory=time.time)
request_model: Optional[str] = None
def _message_type_to_role(message_type: str) -> str:
if message_type == "human":
return "user"
elif message_type == "system":
return "system"
elif message_type == "ai":
return "assistant"
elif message_type == "tool":
return "tool"
else:
return "unknown"
def _set_span_attribute(span: Span, key: str, value: Any) -> None:
if value is not None:
if value != "":
span.set_attribute(key, value)
else:
span.set_attribute(key, "")
def set_request_params(span, kwargs, span_holder: SpanHolder):
if not span.is_recording():
return
for model_tag in ("model", "model_id", "model_name"):
if (model := kwargs.get(model_tag)) is not None:
span_holder.request_model = model
break
elif (
model := (kwargs.get("invocation_params") or {}).get(model_tag)
) is not None:
span_holder.request_model = model
break
else:
model = "unknown"
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, model)
# response is not available for LLM requests (as opposed to chat)
_set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, model)
if "invocation_params" in kwargs:
params = (
kwargs["invocation_params"].get("params") or kwargs["invocation_params"]
)
else:
params = kwargs
_set_span_attribute(
span,
GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS,
params.get("max_tokens") or params.get("max_new_tokens"),
)
_set_span_attribute(
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, params.get("temperature")
)
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, params.get("top_p"))
tools = kwargs.get("invocation_params", {}).get("tools", [])
for i, tool in enumerate(tools):
tool_function = tool.get("function", tool)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}.name",
tool_function.get("name"),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}.description",
tool_function.get("description"),
)
_set_span_attribute(
span,
f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}.parameters",
json.dumps(tool_function.get("parameters", tool.get("input_schema"))),
)
def set_llm_request(
span: Span,
serialized: dict[str, Any],
prompts: list[str],
kwargs: Any,
span_holder: SpanHolder,
) -> None:
set_request_params(span, kwargs, span_holder)
if should_send_prompts():
for i, msg in enumerate(prompts):
_set_span_attribute(
span,
f"{GenAIAttributes.GEN_AI_PROMPT}.{i}.role",
"user",
)
_set_span_attribute(
span,
f"{GenAIAttributes.GEN_AI_PROMPT}.{i}.content",
msg,
)
def set_chat_request(
span: Span,
serialized: dict[str, Any],
messages: list[list[BaseMessage]],
kwargs: Any,
span_holder: SpanHolder,
) -> None:
set_request_params(span, serialized.get("kwargs", {}), span_holder)
if should_send_prompts():
for i, function in enumerate(
kwargs.get("invocation_params", {}).get("functions", [])
):
prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}"
_set_span_attribute(span, f"{prefix}.name", function.get("name"))
_set_span_attribute(
span, f"{prefix}.description", function.get("description")
)
_set_span_attribute(
span, f"{prefix}.parameters", json.dumps(function.get("parameters"))
)
i = 0
for message in messages:
for msg in message:
_set_span_attribute(
span,
f"{GenAIAttributes.GEN_AI_PROMPT}.{i}.role",
_message_type_to_role(msg.type),
)
tool_calls = (
msg.tool_calls
if hasattr(msg, "tool_calls")
else msg.additional_kwargs.get("tool_calls")
)
if tool_calls:
_set_chat_tool_calls(
span, f"{GenAIAttributes.GEN_AI_PROMPT}.{i}", tool_calls
)
# Always set content if it exists, regardless of tool_calls presence
content = (
msg.content
if isinstance(msg.content, str)
else json.dumps(msg.content, cls=CallbackFilteredJSONEncoder)
)
_set_span_attribute(
span,
f"{GenAIAttributes.GEN_AI_PROMPT}.{i}.content",
content,
)
if msg.type == "tool" and hasattr(msg, "tool_call_id"):
_set_span_attribute(
span,
f"{GenAIAttributes.GEN_AI_PROMPT}.{i}.tool_call_id",
msg.tool_call_id,
)
i += 1
def set_chat_response(span: Span, response: LLMResult) -> None:
if not should_send_prompts():
return
i = 0
for generations in response.generations:
for generation in generations:
prefix = f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}"
_set_span_attribute(
span,
f"{prefix}.role",
_message_type_to_role(generation.type),
)
# Try to get content from various sources
content = None
if hasattr(generation, "text") and generation.text:
content = generation.text
elif hasattr(generation, "message") and generation.message and generation.message.content:
if isinstance(generation.message.content, str):
content = generation.message.content
else:
content = json.dumps(generation.message.content, cls=CallbackFilteredJSONEncoder)
if content:
_set_span_attribute(
span,
f"{prefix}.content",
content,
)
# Set finish reason if available
if generation.generation_info and generation.generation_info.get("finish_reason"):
_set_span_attribute(
span,
f"{prefix}.finish_reason",
generation.generation_info.get("finish_reason"),
)
# Handle tool calls and function calls
if hasattr(generation, "message") and generation.message:
# Handle legacy function_call format (single function call)
if generation.message.additional_kwargs.get("function_call"):
_set_span_attribute(
span,
f"{prefix}.tool_calls.0.name",
generation.message.additional_kwargs.get("function_call").get(
"name"
),
)
_set_span_attribute(
span,
f"{prefix}.tool_calls.0.arguments",
generation.message.additional_kwargs.get("function_call").get(
"arguments"
),
)
# Handle new tool_calls format (multiple tool calls)
tool_calls = (
generation.message.tool_calls
if hasattr(generation.message, "tool_calls")
else generation.message.additional_kwargs.get("tool_calls")
)
if tool_calls and isinstance(tool_calls, list):
_set_span_attribute(
span,
f"{prefix}.role",
"assistant",
)
_set_chat_tool_calls(span, prefix, tool_calls)
i += 1
def set_chat_response_usage(
span: Span,
response: LLMResult,
token_histogram: Histogram,
record_token_usage: bool,
model_name: str
) -> None:
input_tokens = 0
output_tokens = 0
total_tokens = 0
cache_read_tokens = 0
# Early return if no generations to avoid potential issues
if not response.generations:
return
try:
for generations in response.generations:
for generation in generations:
if (
hasattr(generation, "message")
and hasattr(generation.message, "usage_metadata")
and generation.message.usage_metadata is not None
):
input_tokens += (
generation.message.usage_metadata.get("input_tokens")
or generation.message.usage_metadata.get("prompt_tokens")
or 0
)
output_tokens += (
generation.message.usage_metadata.get("output_tokens")
or generation.message.usage_metadata.get("completion_tokens")
or 0
)
total_tokens = input_tokens + output_tokens
if generation.message.usage_metadata.get("input_token_details"):
input_token_details = generation.message.usage_metadata.get(
"input_token_details", {}
)
cache_read_tokens += input_token_details.get("cache_read", 0)
except Exception as e:
# If there's any issue processing usage metadata, continue without it
print(f"DEBUG: Error processing usage metadata: {e}")
pass
if (
input_tokens > 0
or output_tokens > 0
or total_tokens > 0
or cache_read_tokens > 0
):
_set_span_attribute(
span,
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
input_tokens,
)
_set_span_attribute(
span,
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
output_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
total_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS,
cache_read_tokens,
)
if record_token_usage:
vendor = span.attributes.get(GenAIAttributes.GEN_AI_SYSTEM, "Langchain")
if input_tokens > 0:
token_histogram.record(
input_tokens,
attributes={
GenAIAttributes.GEN_AI_SYSTEM: vendor,
GenAIAttributes.GEN_AI_TOKEN_TYPE: "input",
GenAIAttributes.GEN_AI_RESPONSE_MODEL: model_name,
},
)
if output_tokens > 0:
token_histogram.record(
output_tokens,
attributes={
GenAIAttributes.GEN_AI_SYSTEM: vendor,
GenAIAttributes.GEN_AI_TOKEN_TYPE: "output",
GenAIAttributes.GEN_AI_RESPONSE_MODEL: model_name,
},
)
def extract_model_name_from_response_metadata(response: LLMResult) -> str:
for generations in response.generations:
for generation in generations:
if (
getattr(generation, "message", None)
and getattr(generation.message, "response_metadata", None)
and (model_name := generation.message.response_metadata.get("model_name"))
):
return model_name
def _extract_model_name_from_association_metadata(metadata: Optional[dict[str, Any]] = None) -> str:
if metadata:
return metadata.get("ls_model_name") or "unknown"
return "unknown"
def _set_chat_tool_calls(
span: Span, prefix: str, tool_calls: list[dict[str, Any]]
) -> None:
for idx, tool_call in enumerate(tool_calls):
tool_call_prefix = f"{prefix}.tool_calls.{idx}"
tool_call_dict = dict(tool_call)
tool_id = tool_call_dict.get("id")
tool_name = tool_call_dict.get(
"name", tool_call_dict.get("function", {}).get("name")
)
tool_args = tool_call_dict.get(
"args", tool_call_dict.get("function", {}).get("arguments")
)
_set_span_attribute(span, f"{tool_call_prefix}.id", tool_id)
_set_span_attribute(
span,
f"{tool_call_prefix}.name",
tool_name,
)
_set_span_attribute(
span,
f"{tool_call_prefix}.arguments",
json.dumps(tool_args, cls=CallbackFilteredJSONEncoder),
)