750 lines
26 KiB
Python
750 lines
26 KiB
Python
import json
|
|
import time
|
|
|
|
import anthropic
|
|
from opentelemetry.instrumentation.bedrock.config import Config
|
|
from opentelemetry.instrumentation.bedrock.utils import should_send_prompts
|
|
from opentelemetry.semconv._incubating.attributes import (
|
|
gen_ai_attributes as GenAIAttributes,
|
|
)
|
|
from opentelemetry.semconv._incubating.attributes.aws_attributes import (
|
|
AWS_BEDROCK_GUARDRAIL_ID
|
|
)
|
|
from opentelemetry.semconv_ai import (
|
|
LLMRequestTypeValues,
|
|
SpanAttributes,
|
|
)
|
|
|
|
PROMPT_FILTER_KEY = "prompt_filter_results"
|
|
CONTENT_FILTER_KEY = "content_filter_results"
|
|
|
|
anthropic_client = None
|
|
|
|
|
|
def _set_span_attribute(span, name, value):
|
|
if value is not None:
|
|
if value != "":
|
|
span.set_attribute(name, value)
|
|
return
|
|
|
|
|
|
def set_model_message_span_attributes(model_vendor, span, request_body):
|
|
if not should_send_prompts():
|
|
return
|
|
if model_vendor == "cohere":
|
|
_set_prompt_span_attributes(span, request_body)
|
|
elif model_vendor == "anthropic":
|
|
if "prompt" in request_body:
|
|
_set_prompt_span_attributes(span, request_body)
|
|
elif "messages" in request_body:
|
|
for idx, message in enumerate(request_body.get("messages")):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.role",
|
|
message.get("role"),
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.0.content",
|
|
json.dumps(message.get("content")),
|
|
)
|
|
elif model_vendor == "ai21":
|
|
_set_prompt_span_attributes(span, request_body)
|
|
elif model_vendor == "meta":
|
|
_set_llama_prompt_span_attributes(span, request_body)
|
|
elif model_vendor == "amazon":
|
|
_set_amazon_input_span_attributes(span, request_body)
|
|
elif model_vendor == "imported_model":
|
|
_set_imported_model_prompt_span_attributes(span, request_body)
|
|
|
|
|
|
def set_model_choice_span_attributes(model_vendor, span, response_body):
|
|
if not should_send_prompts():
|
|
return
|
|
if model_vendor == "cohere":
|
|
_set_generations_span_attributes(span, response_body)
|
|
elif model_vendor == "anthropic":
|
|
_set_anthropic_response_span_attributes(span, response_body)
|
|
elif model_vendor == "ai21":
|
|
_set_span_completions_attributes(span, response_body)
|
|
elif model_vendor == "meta":
|
|
_set_llama_response_span_attributes(span, response_body)
|
|
elif model_vendor == "amazon":
|
|
_set_amazon_response_span_attributes(span, response_body)
|
|
elif model_vendor == "imported_model":
|
|
_set_imported_model_response_span_attributes(span, response_body)
|
|
|
|
|
|
def set_model_span_attributes(
|
|
provider,
|
|
model_vendor,
|
|
model,
|
|
span,
|
|
request_body,
|
|
response_body,
|
|
headers,
|
|
metric_params,
|
|
kwargs,
|
|
):
|
|
response_model = response_body.get("model")
|
|
response_id = response_body.get("id")
|
|
|
|
_set_span_attribute(span, AWS_BEDROCK_GUARDRAIL_ID, _guardrail_value(kwargs))
|
|
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_SYSTEM, provider)
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, model)
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, response_model)
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_ID, response_id)
|
|
|
|
if model_vendor == "cohere":
|
|
_set_cohere_span_attributes(span, request_body, response_body, metric_params)
|
|
elif model_vendor == "anthropic":
|
|
if "prompt" in request_body:
|
|
_set_anthropic_completion_span_attributes(
|
|
span, request_body, response_body, headers, metric_params
|
|
)
|
|
elif "messages" in request_body:
|
|
_set_anthropic_messages_span_attributes(
|
|
span, request_body, response_body, headers, metric_params
|
|
)
|
|
elif model_vendor == "ai21":
|
|
_set_ai21_span_attributes(span, request_body, response_body, metric_params)
|
|
elif model_vendor == "meta":
|
|
_set_llama_span_attributes(span, request_body, response_body, metric_params)
|
|
elif model_vendor == "amazon":
|
|
_set_amazon_span_attributes(
|
|
span, request_body, response_body, headers, metric_params
|
|
)
|
|
elif model_vendor == "imported_model":
|
|
_set_imported_model_span_attributes(
|
|
span, request_body, response_body, metric_params
|
|
)
|
|
|
|
|
|
def _guardrail_value(request_body):
|
|
identifier = request_body.get("guardrailIdentifier")
|
|
if identifier is not None:
|
|
version = request_body.get("guardrailVersion")
|
|
return f"{identifier}:{version}"
|
|
return None
|
|
|
|
|
|
def set_guardrail_attributes(span, input_filters, output_filters):
|
|
if input_filters:
|
|
_set_span_attribute(
|
|
span,
|
|
f"{SpanAttributes.LLM_PROMPTS}.{PROMPT_FILTER_KEY}",
|
|
json.dumps(input_filters, default=str)
|
|
)
|
|
if output_filters:
|
|
_set_span_attribute(
|
|
span,
|
|
f"{SpanAttributes.LLM_COMPLETIONS}.{CONTENT_FILTER_KEY}",
|
|
json.dumps(output_filters, default=str)
|
|
)
|
|
|
|
|
|
def _set_prompt_span_attributes(span, request_body):
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.user", request_body.get("prompt")
|
|
)
|
|
|
|
|
|
def _set_cohere_span_attributes(span, request_body, response_body, metric_params):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value
|
|
)
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("p"))
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
|
|
)
|
|
|
|
# based on contract at
|
|
# https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-cohere-command-r-plus.html
|
|
input_tokens = response_body.get("token_count", {}).get("prompt_tokens")
|
|
output_tokens = response_body.get("token_count", {}).get("response_tokens")
|
|
|
|
if input_tokens is None or output_tokens is None:
|
|
meta = response_body.get("meta", {})
|
|
billed_units = meta.get("billed_units", {})
|
|
input_tokens = input_tokens or billed_units.get("input_tokens")
|
|
output_tokens = output_tokens or billed_units.get("output_tokens")
|
|
|
|
if input_tokens is not None and output_tokens is not None:
|
|
_record_usage_to_span(
|
|
span,
|
|
input_tokens,
|
|
output_tokens,
|
|
metric_params,
|
|
)
|
|
|
|
|
|
def _set_generations_span_attributes(span, response_body):
|
|
for i, generation in enumerate(response_body.get("generations")):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content",
|
|
generation.get("text"),
|
|
)
|
|
|
|
|
|
def _set_anthropic_completion_span_attributes(
|
|
span, request_body, response_body, headers, metric_params
|
|
):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS,
|
|
request_body.get("max_tokens_to_sample"),
|
|
)
|
|
|
|
if (
|
|
response_body.get("usage") is not None
|
|
and response_body.get("usage").get("input_tokens") is not None
|
|
and response_body.get("usage").get("output_tokens") is not None
|
|
):
|
|
_record_usage_to_span(
|
|
span,
|
|
response_body.get("usage").get("input_tokens"),
|
|
response_body.get("usage").get("output_tokens"),
|
|
metric_params,
|
|
)
|
|
elif response_body.get("invocation_metrics") is not None:
|
|
_record_usage_to_span(
|
|
span,
|
|
response_body.get("invocation_metrics").get("inputTokenCount"),
|
|
response_body.get("invocation_metrics").get("outputTokenCount"),
|
|
metric_params,
|
|
)
|
|
elif headers and headers.get("x-amzn-bedrock-input-token-count") is not None:
|
|
# For Anthropic V2 models (claude-v2), token counts are in HTTP headers
|
|
input_tokens = int(headers.get("x-amzn-bedrock-input-token-count", 0))
|
|
output_tokens = int(headers.get("x-amzn-bedrock-output-token-count", 0))
|
|
_record_usage_to_span(
|
|
span,
|
|
input_tokens,
|
|
output_tokens,
|
|
metric_params,
|
|
)
|
|
elif Config.enrich_token_usage:
|
|
_record_usage_to_span(
|
|
span,
|
|
_count_anthropic_tokens([request_body.get("prompt")]),
|
|
_count_anthropic_tokens([response_body.get("completion")]),
|
|
metric_params,
|
|
)
|
|
|
|
|
|
def _set_anthropic_response_span_attributes(span, response_body):
|
|
if response_body.get("completion") is not None:
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content",
|
|
response_body.get("completion"),
|
|
)
|
|
elif response_body.get("content") is not None:
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", "assistant"
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content",
|
|
json.dumps(response_body.get("content")),
|
|
)
|
|
|
|
|
|
def _set_anthropic_messages_span_attributes(
|
|
span, request_body, response_body, headers, metric_params
|
|
):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.CHAT.value
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS,
|
|
request_body.get("max_tokens"),
|
|
)
|
|
|
|
prompt_tokens = 0
|
|
completion_tokens = 0
|
|
if (
|
|
response_body.get("usage") is not None
|
|
and response_body.get("usage").get("input_tokens") is not None
|
|
and response_body.get("usage").get("output_tokens") is not None
|
|
):
|
|
prompt_tokens = response_body.get("usage").get("input_tokens")
|
|
completion_tokens = response_body.get("usage").get("output_tokens")
|
|
_record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params)
|
|
elif response_body.get("invocation_metrics") is not None:
|
|
prompt_tokens = response_body.get("invocation_metrics").get("inputTokenCount")
|
|
completion_tokens = response_body.get("invocation_metrics").get(
|
|
"outputTokenCount"
|
|
)
|
|
_record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params)
|
|
elif headers and headers.get("x-amzn-bedrock-input-token-count") is not None:
|
|
# For Anthropic V2 models (claude-v2), token counts are in HTTP headers
|
|
prompt_tokens = int(headers.get("x-amzn-bedrock-input-token-count", 0))
|
|
completion_tokens = int(headers.get("x-amzn-bedrock-output-token-count", 0))
|
|
_record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params)
|
|
elif Config.enrich_token_usage:
|
|
messages = [message.get("content") for message in request_body.get("messages")]
|
|
|
|
raw_messages = []
|
|
for message in messages:
|
|
if isinstance(message, str):
|
|
raw_messages.append(message)
|
|
else:
|
|
raw_messages.extend([content.get("text") for content in message])
|
|
prompt_tokens = _count_anthropic_tokens(raw_messages)
|
|
completion_tokens = _count_anthropic_tokens(
|
|
[content.get("text") for content in response_body.get("content")]
|
|
)
|
|
_record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params)
|
|
|
|
|
|
def _count_anthropic_tokens(messages: list[str]):
|
|
global anthropic_client
|
|
|
|
# Lazy initialization of the Anthropic client
|
|
if anthropic_client is None:
|
|
try:
|
|
anthropic_client = anthropic.Anthropic()
|
|
except Exception as e:
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.debug(f"Failed to initialize Anthropic client for token counting: {e}")
|
|
# Return 0 if we can't create the client
|
|
return 0
|
|
|
|
count = 0
|
|
try:
|
|
for message in messages:
|
|
count += anthropic_client.count_tokens(text=message)
|
|
except Exception as e:
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.debug(f"Failed to count tokens with Anthropic client: {e}")
|
|
return 0
|
|
|
|
return count
|
|
|
|
|
|
def _set_ai21_span_attributes(span, request_body, response_body, metric_params):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("topP")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("maxTokens")
|
|
)
|
|
|
|
_record_usage_to_span(
|
|
span,
|
|
len(response_body.get("prompt").get("tokens")),
|
|
len(response_body.get("completions")[0].get("data").get("tokens")),
|
|
metric_params,
|
|
)
|
|
|
|
|
|
def _set_span_completions_attributes(span, response_body):
|
|
for i, completion in enumerate(response_body.get("completions")):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content",
|
|
completion.get("data").get("text"),
|
|
)
|
|
|
|
|
|
def _set_llama_span_attributes(span, request_body, response_body, metric_params):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("top_p")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len")
|
|
)
|
|
|
|
_record_usage_to_span(
|
|
span,
|
|
response_body.get("prompt_token_count"),
|
|
response_body.get("generation_token_count"),
|
|
metric_params,
|
|
)
|
|
|
|
|
|
def _set_llama_prompt_span_attributes(span, request_body):
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.content", request_body.get("prompt")
|
|
)
|
|
_set_span_attribute(span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.role", "user")
|
|
|
|
|
|
def _set_llama_response_span_attributes(span, response_body):
|
|
if response_body.get("generation"):
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", "assistant"
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content",
|
|
response_body.get("generation"),
|
|
)
|
|
else:
|
|
for i, generation in enumerate(response_body.get("generations")):
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.role", "assistant"
|
|
)
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content", generation
|
|
)
|
|
|
|
|
|
def _set_amazon_span_attributes(
|
|
span, request_body, response_body, headers, metric_params
|
|
):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value
|
|
)
|
|
|
|
if "textGenerationConfig" in request_body:
|
|
config = request_body.get("textGenerationConfig", {})
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, config.get("topP"))
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount")
|
|
)
|
|
elif "inferenceConfig" in request_body:
|
|
config = request_body.get("inferenceConfig", {})
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, config.get("topP"))
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokens")
|
|
)
|
|
|
|
total_completion_tokens = 0
|
|
total_prompt_tokens = 0
|
|
if "results" in response_body:
|
|
total_prompt_tokens = int(response_body.get("inputTextTokenCount", 0))
|
|
for result in response_body.get("results"):
|
|
if "tokenCount" in result:
|
|
total_completion_tokens += int(result.get("tokenCount", 0))
|
|
elif "totalOutputTextTokenCount" in result:
|
|
total_completion_tokens += int(
|
|
result.get("totalOutputTextTokenCount", 0)
|
|
)
|
|
elif "usage" in response_body:
|
|
total_prompt_tokens += int(response_body.get("inputTokens", 0))
|
|
total_completion_tokens += int(
|
|
headers.get("x-amzn-bedrock-output-token-count", 0)
|
|
)
|
|
# checks for Titan models
|
|
if "inputTextTokenCount" in response_body:
|
|
total_prompt_tokens = response_body.get("inputTextTokenCount")
|
|
if "totalOutputTextTokenCount" in response_body:
|
|
total_completion_tokens = response_body.get("totalOutputTextTokenCount")
|
|
|
|
_record_usage_to_span(
|
|
span,
|
|
total_prompt_tokens,
|
|
total_completion_tokens,
|
|
metric_params,
|
|
)
|
|
|
|
|
|
def _set_amazon_input_span_attributes(span, request_body):
|
|
if "inputText" in request_body:
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.0.user",
|
|
request_body.get("inputText"),
|
|
)
|
|
else:
|
|
prompt_idx = 0
|
|
if "system" in request_body:
|
|
for idx, prompt in enumerate(request_body["system"]):
|
|
prompt_idx = idx + 1
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.role", "system"
|
|
)
|
|
# TODO: add support for "image"
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.content",
|
|
prompt.get("text"),
|
|
)
|
|
for idx, prompt in enumerate(request_body["messages"]):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx + idx}.role",
|
|
prompt.get("role"),
|
|
)
|
|
# TODO: here we stringify the object, consider moving these to events or prompt.{i}.content.{j}
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx + idx}.content",
|
|
json.dumps(prompt.get("content", ""), default=str),
|
|
)
|
|
|
|
|
|
def _set_amazon_response_span_attributes(span, response_body):
|
|
if "results" in response_body:
|
|
for i, result in enumerate(response_body.get("results")):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content",
|
|
result.get("outputText"),
|
|
)
|
|
elif "outputText" in response_body:
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content",
|
|
response_body.get("outputText"),
|
|
)
|
|
elif "output" in response_body:
|
|
msgs = response_body.get("output").get("message", {}).get("content", [])
|
|
for idx, msg in enumerate(msgs):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.{idx}.content",
|
|
msg.get("text"),
|
|
)
|
|
|
|
|
|
def _set_imported_model_span_attributes(
|
|
span, request_body, response_body, metric_params
|
|
):
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("topP")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens")
|
|
)
|
|
prompt_tokens = (
|
|
response_body.get("usage", {}).get("prompt_tokens")
|
|
if response_body.get("usage", {}).get("prompt_tokens") is not None
|
|
else response_body.get("prompt_token_count")
|
|
)
|
|
completion_tokens = response_body.get("usage", {}).get(
|
|
"completion_tokens"
|
|
) or response_body.get("generation_token_count")
|
|
|
|
_record_usage_to_span(
|
|
span,
|
|
prompt_tokens,
|
|
completion_tokens,
|
|
metric_params,
|
|
)
|
|
|
|
|
|
def _set_imported_model_response_span_attributes(span, response_body):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content",
|
|
response_body.get("generation"),
|
|
)
|
|
|
|
|
|
def _set_imported_model_prompt_span_attributes(span, request_body):
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.content", request_body.get("prompt")
|
|
)
|
|
|
|
|
|
def _record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params):
|
|
_set_span_attribute(
|
|
span,
|
|
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
|
|
prompt_tokens,
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
|
|
completion_tokens,
|
|
)
|
|
_set_span_attribute(
|
|
span,
|
|
SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
|
|
prompt_tokens + completion_tokens,
|
|
)
|
|
|
|
metric_attributes = _metric_shared_attributes(
|
|
metric_params.vendor, metric_params.model, metric_params.is_stream
|
|
)
|
|
|
|
if metric_params.duration_histogram:
|
|
duration = time.time() - metric_params.start_time
|
|
metric_params.duration_histogram.record(
|
|
duration,
|
|
attributes=metric_attributes,
|
|
)
|
|
|
|
if (
|
|
metric_params.token_histogram
|
|
and type(prompt_tokens) is int
|
|
and prompt_tokens >= 0
|
|
):
|
|
metric_params.token_histogram.record(
|
|
prompt_tokens,
|
|
attributes={
|
|
**metric_attributes,
|
|
GenAIAttributes.GEN_AI_TOKEN_TYPE: "input",
|
|
},
|
|
)
|
|
if (
|
|
metric_params.token_histogram
|
|
and type(completion_tokens) is int
|
|
and completion_tokens >= 0
|
|
):
|
|
metric_params.token_histogram.record(
|
|
completion_tokens,
|
|
attributes={
|
|
**metric_attributes,
|
|
GenAIAttributes.GEN_AI_TOKEN_TYPE: "output",
|
|
},
|
|
)
|
|
|
|
|
|
def _metric_shared_attributes(
|
|
response_vendor: str, response_model: str, is_streaming: bool = False
|
|
):
|
|
return {
|
|
"vendor": response_vendor,
|
|
GenAIAttributes.GEN_AI_RESPONSE_MODEL: response_model,
|
|
GenAIAttributes.GEN_AI_SYSTEM: "bedrock",
|
|
"stream": is_streaming,
|
|
}
|
|
|
|
|
|
def set_converse_model_span_attributes(span, provider, model, kwargs):
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_SYSTEM, provider)
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, model)
|
|
_set_span_attribute(
|
|
span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.CHAT.value
|
|
)
|
|
|
|
guardrail_config = kwargs.get("guardrailConfig")
|
|
if guardrail_config:
|
|
_set_span_attribute(span, AWS_BEDROCK_GUARDRAIL_ID, _guardrail_value(guardrail_config))
|
|
|
|
config = {}
|
|
if "inferenceConfig" in kwargs:
|
|
config = kwargs.get("inferenceConfig")
|
|
|
|
_set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, config.get("topP"))
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, config.get("temperature")
|
|
)
|
|
_set_span_attribute(
|
|
span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokens")
|
|
)
|
|
|
|
|
|
def set_converse_input_prompt_span_attributes(kwargs, span):
|
|
if not should_send_prompts():
|
|
return
|
|
prompt_idx = 0
|
|
if "system" in kwargs:
|
|
for idx, prompt in enumerate(kwargs["system"]):
|
|
prompt_idx = idx + 1
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.role", "system"
|
|
)
|
|
# TODO: add support for "image"
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.content",
|
|
prompt.get("text"),
|
|
)
|
|
if "messages" in kwargs:
|
|
for idx, prompt in enumerate(kwargs["messages"]):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx+idx}.role",
|
|
prompt.get("role"),
|
|
)
|
|
# TODO: here we stringify the object, consider moving these to events or prompt.{i}.content.{j}
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx+idx}.content",
|
|
json.dumps(prompt.get("content", ""), default=str),
|
|
)
|
|
|
|
|
|
def set_converse_response_span_attributes(response, span):
|
|
if not should_send_prompts():
|
|
return
|
|
if "output" in response:
|
|
message = response["output"]["message"]
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", message.get("role")
|
|
)
|
|
for idx, content in enumerate(message["content"]):
|
|
_set_span_attribute(
|
|
span,
|
|
f"{GenAIAttributes.GEN_AI_COMPLETION}.{idx}.content",
|
|
content.get("text"),
|
|
)
|
|
|
|
|
|
def set_converse_streaming_response_span_attributes(response, role, span):
|
|
if not should_send_prompts():
|
|
return
|
|
_set_span_attribute(span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", role)
|
|
_set_span_attribute(
|
|
span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", "".join(response)
|
|
)
|
|
|
|
|
|
def converse_usage_record(span, response, metric_params):
|
|
prompt_tokens = 0
|
|
completion_tokens = 0
|
|
if "usage" in response:
|
|
if "inputTokens" in response["usage"]:
|
|
prompt_tokens = response["usage"]["inputTokens"]
|
|
if "outputTokens" in response["usage"]:
|
|
completion_tokens = response["usage"]["outputTokens"]
|
|
|
|
_record_usage_to_span(
|
|
span,
|
|
prompt_tokens,
|
|
completion_tokens,
|
|
metric_params,
|
|
)
|