import json import time import anthropic from opentelemetry.instrumentation.bedrock.config import Config from opentelemetry.instrumentation.bedrock.utils import should_send_prompts from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, ) from opentelemetry.semconv._incubating.attributes.aws_attributes import ( AWS_BEDROCK_GUARDRAIL_ID ) from opentelemetry.semconv_ai import ( LLMRequestTypeValues, SpanAttributes, ) PROMPT_FILTER_KEY = "prompt_filter_results" CONTENT_FILTER_KEY = "content_filter_results" anthropic_client = None def _set_span_attribute(span, name, value): if value is not None: if value != "": span.set_attribute(name, value) return def set_model_message_span_attributes(model_vendor, span, request_body): if not should_send_prompts(): return if model_vendor == "cohere": _set_prompt_span_attributes(span, request_body) elif model_vendor == "anthropic": if "prompt" in request_body: _set_prompt_span_attributes(span, request_body) elif "messages" in request_body: for idx, message in enumerate(request_body.get("messages")): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.role", message.get("role"), ) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.content", json.dumps(message.get("content")), ) elif model_vendor == "ai21": _set_prompt_span_attributes(span, request_body) elif model_vendor == "meta": _set_llama_prompt_span_attributes(span, request_body) elif model_vendor == "amazon": _set_amazon_input_span_attributes(span, request_body) elif model_vendor == "imported_model": _set_imported_model_prompt_span_attributes(span, request_body) def set_model_choice_span_attributes(model_vendor, span, response_body): if not should_send_prompts(): return if model_vendor == "cohere": _set_generations_span_attributes(span, response_body) elif model_vendor == "anthropic": _set_anthropic_response_span_attributes(span, response_body) elif model_vendor == "ai21": _set_span_completions_attributes(span, response_body) elif model_vendor == "meta": _set_llama_response_span_attributes(span, response_body) elif model_vendor == "amazon": _set_amazon_response_span_attributes(span, response_body) elif model_vendor == "imported_model": _set_imported_model_response_span_attributes(span, response_body) def set_model_span_attributes( provider, model_vendor, model, span, request_body, response_body, headers, metric_params, kwargs, ): response_model = response_body.get("model") response_id = response_body.get("id") _set_span_attribute(span, AWS_BEDROCK_GUARDRAIL_ID, _guardrail_value(kwargs)) _set_span_attribute(span, GenAIAttributes.GEN_AI_SYSTEM, provider) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, model) _set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, response_model) _set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_ID, response_id) if model_vendor == "cohere": _set_cohere_span_attributes(span, request_body, response_body, metric_params) elif model_vendor == "anthropic": if "prompt" in request_body: _set_anthropic_completion_span_attributes( span, request_body, response_body, headers, metric_params ) elif "messages" in request_body: _set_anthropic_messages_span_attributes( span, request_body, response_body, headers, metric_params ) elif model_vendor == "ai21": _set_ai21_span_attributes(span, request_body, response_body, metric_params) elif model_vendor == "meta": _set_llama_span_attributes(span, request_body, response_body, metric_params) elif model_vendor == "amazon": _set_amazon_span_attributes( span, request_body, response_body, headers, metric_params ) elif model_vendor == "imported_model": _set_imported_model_span_attributes( span, request_body, response_body, metric_params ) def _guardrail_value(request_body): identifier = request_body.get("guardrailIdentifier") if identifier is not None: version = request_body.get("guardrailVersion") return f"{identifier}:{version}" return None def set_guardrail_attributes(span, input_filters, output_filters): if input_filters: _set_span_attribute( span, f"{SpanAttributes.LLM_PROMPTS}.{PROMPT_FILTER_KEY}", json.dumps(input_filters, default=str) ) if output_filters: _set_span_attribute( span, f"{SpanAttributes.LLM_COMPLETIONS}.{CONTENT_FILTER_KEY}", json.dumps(output_filters, default=str) ) def _set_prompt_span_attributes(span, request_body): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.user", request_body.get("prompt") ) def _set_cohere_span_attributes(span, request_body, response_body, metric_params): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value ) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("p")) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens") ) # based on contract at # https://docs.aws.amazon.com/bedrock/latest/userguide/model-parameters-cohere-command-r-plus.html input_tokens = response_body.get("token_count", {}).get("prompt_tokens") output_tokens = response_body.get("token_count", {}).get("response_tokens") if input_tokens is None or output_tokens is None: meta = response_body.get("meta", {}) billed_units = meta.get("billed_units", {}) input_tokens = input_tokens or billed_units.get("input_tokens") output_tokens = output_tokens or billed_units.get("output_tokens") if input_tokens is not None and output_tokens is not None: _record_usage_to_span( span, input_tokens, output_tokens, metric_params, ) def _set_generations_span_attributes(span, response_body): for i, generation in enumerate(response_body.get("generations")): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content", generation.get("text"), ) def _set_anthropic_completion_span_attributes( span, request_body, response_body, headers, metric_params ): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("top_p") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens_to_sample"), ) if ( response_body.get("usage") is not None and response_body.get("usage").get("input_tokens") is not None and response_body.get("usage").get("output_tokens") is not None ): _record_usage_to_span( span, response_body.get("usage").get("input_tokens"), response_body.get("usage").get("output_tokens"), metric_params, ) elif response_body.get("invocation_metrics") is not None: _record_usage_to_span( span, response_body.get("invocation_metrics").get("inputTokenCount"), response_body.get("invocation_metrics").get("outputTokenCount"), metric_params, ) elif headers and headers.get("x-amzn-bedrock-input-token-count") is not None: # For Anthropic V2 models (claude-v2), token counts are in HTTP headers input_tokens = int(headers.get("x-amzn-bedrock-input-token-count", 0)) output_tokens = int(headers.get("x-amzn-bedrock-output-token-count", 0)) _record_usage_to_span( span, input_tokens, output_tokens, metric_params, ) elif Config.enrich_token_usage: _record_usage_to_span( span, _count_anthropic_tokens([request_body.get("prompt")]), _count_anthropic_tokens([response_body.get("completion")]), metric_params, ) def _set_anthropic_response_span_attributes(span, response_body): if response_body.get("completion") is not None: _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", response_body.get("completion"), ) elif response_body.get("content") is not None: _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", "assistant" ) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", json.dumps(response_body.get("content")), ) def _set_anthropic_messages_span_attributes( span, request_body, response_body, headers, metric_params ): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.CHAT.value ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("top_p") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens"), ) prompt_tokens = 0 completion_tokens = 0 if ( response_body.get("usage") is not None and response_body.get("usage").get("input_tokens") is not None and response_body.get("usage").get("output_tokens") is not None ): prompt_tokens = response_body.get("usage").get("input_tokens") completion_tokens = response_body.get("usage").get("output_tokens") _record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params) elif response_body.get("invocation_metrics") is not None: prompt_tokens = response_body.get("invocation_metrics").get("inputTokenCount") completion_tokens = response_body.get("invocation_metrics").get( "outputTokenCount" ) _record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params) elif headers and headers.get("x-amzn-bedrock-input-token-count") is not None: # For Anthropic V2 models (claude-v2), token counts are in HTTP headers prompt_tokens = int(headers.get("x-amzn-bedrock-input-token-count", 0)) completion_tokens = int(headers.get("x-amzn-bedrock-output-token-count", 0)) _record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params) elif Config.enrich_token_usage: messages = [message.get("content") for message in request_body.get("messages")] raw_messages = [] for message in messages: if isinstance(message, str): raw_messages.append(message) else: raw_messages.extend([content.get("text") for content in message]) prompt_tokens = _count_anthropic_tokens(raw_messages) completion_tokens = _count_anthropic_tokens( [content.get("text") for content in response_body.get("content")] ) _record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params) def _count_anthropic_tokens(messages: list[str]): global anthropic_client # Lazy initialization of the Anthropic client if anthropic_client is None: try: anthropic_client = anthropic.Anthropic() except Exception as e: import logging logger = logging.getLogger(__name__) logger.debug(f"Failed to initialize Anthropic client for token counting: {e}") # Return 0 if we can't create the client return 0 count = 0 try: for message in messages: count += anthropic_client.count_tokens(text=message) except Exception as e: import logging logger = logging.getLogger(__name__) logger.debug(f"Failed to count tokens with Anthropic client: {e}") return 0 return count def _set_ai21_span_attributes(span, request_body, response_body, metric_params): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("topP") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("maxTokens") ) _record_usage_to_span( span, len(response_body.get("prompt").get("tokens")), len(response_body.get("completions")[0].get("data").get("tokens")), metric_params, ) def _set_span_completions_attributes(span, response_body): for i, completion in enumerate(response_body.get("completions")): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content", completion.get("data").get("text"), ) def _set_llama_span_attributes(span, request_body, response_body, metric_params): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("top_p") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_gen_len") ) _record_usage_to_span( span, response_body.get("prompt_token_count"), response_body.get("generation_token_count"), metric_params, ) def _set_llama_prompt_span_attributes(span, request_body): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.content", request_body.get("prompt") ) _set_span_attribute(span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.role", "user") def _set_llama_response_span_attributes(span, response_body): if response_body.get("generation"): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", "assistant" ) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", response_body.get("generation"), ) else: for i, generation in enumerate(response_body.get("generations")): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.role", "assistant" ) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content", generation ) def _set_amazon_span_attributes( span, request_body, response_body, headers, metric_params ): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value ) if "textGenerationConfig" in request_body: config = request_body.get("textGenerationConfig", {}) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, config.get("topP")) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, config.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokenCount") ) elif "inferenceConfig" in request_body: config = request_body.get("inferenceConfig", {}) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, config.get("topP")) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, config.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokens") ) total_completion_tokens = 0 total_prompt_tokens = 0 if "results" in response_body: total_prompt_tokens = int(response_body.get("inputTextTokenCount", 0)) for result in response_body.get("results"): if "tokenCount" in result: total_completion_tokens += int(result.get("tokenCount", 0)) elif "totalOutputTextTokenCount" in result: total_completion_tokens += int( result.get("totalOutputTextTokenCount", 0) ) elif "usage" in response_body: total_prompt_tokens += int(response_body.get("inputTokens", 0)) total_completion_tokens += int( headers.get("x-amzn-bedrock-output-token-count", 0) ) # checks for Titan models if "inputTextTokenCount" in response_body: total_prompt_tokens = response_body.get("inputTextTokenCount") if "totalOutputTextTokenCount" in response_body: total_completion_tokens = response_body.get("totalOutputTextTokenCount") _record_usage_to_span( span, total_prompt_tokens, total_completion_tokens, metric_params, ) def _set_amazon_input_span_attributes(span, request_body): if "inputText" in request_body: _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.user", request_body.get("inputText"), ) else: prompt_idx = 0 if "system" in request_body: for idx, prompt in enumerate(request_body["system"]): prompt_idx = idx + 1 _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.role", "system" ) # TODO: add support for "image" _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.content", prompt.get("text"), ) for idx, prompt in enumerate(request_body["messages"]): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx + idx}.role", prompt.get("role"), ) # TODO: here we stringify the object, consider moving these to events or prompt.{i}.content.{j} _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx + idx}.content", json.dumps(prompt.get("content", ""), default=str), ) def _set_amazon_response_span_attributes(span, response_body): if "results" in response_body: for i, result in enumerate(response_body.get("results")): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{i}.content", result.get("outputText"), ) elif "outputText" in response_body: _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", response_body.get("outputText"), ) elif "output" in response_body: msgs = response_body.get("output").get("message", {}).get("content", []) for idx, msg in enumerate(msgs): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{idx}.content", msg.get("text"), ) def _set_imported_model_span_attributes( span, request_body, response_body, metric_params ): _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.COMPLETION.value ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, request_body.get("topP") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, request_body.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, request_body.get("max_tokens") ) prompt_tokens = ( response_body.get("usage", {}).get("prompt_tokens") if response_body.get("usage", {}).get("prompt_tokens") is not None else response_body.get("prompt_token_count") ) completion_tokens = response_body.get("usage", {}).get( "completion_tokens" ) or response_body.get("generation_token_count") _record_usage_to_span( span, prompt_tokens, completion_tokens, metric_params, ) def _set_imported_model_response_span_attributes(span, response_body): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", response_body.get("generation"), ) def _set_imported_model_prompt_span_attributes(span, request_body): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.content", request_body.get("prompt") ) def _record_usage_to_span(span, prompt_tokens, completion_tokens, metric_params): _set_span_attribute( span, GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS, prompt_tokens, ) _set_span_attribute( span, GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, completion_tokens, ) _set_span_attribute( span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, prompt_tokens + completion_tokens, ) metric_attributes = _metric_shared_attributes( metric_params.vendor, metric_params.model, metric_params.is_stream ) if metric_params.duration_histogram: duration = time.time() - metric_params.start_time metric_params.duration_histogram.record( duration, attributes=metric_attributes, ) if ( metric_params.token_histogram and type(prompt_tokens) is int and prompt_tokens >= 0 ): metric_params.token_histogram.record( prompt_tokens, attributes={ **metric_attributes, GenAIAttributes.GEN_AI_TOKEN_TYPE: "input", }, ) if ( metric_params.token_histogram and type(completion_tokens) is int and completion_tokens >= 0 ): metric_params.token_histogram.record( completion_tokens, attributes={ **metric_attributes, GenAIAttributes.GEN_AI_TOKEN_TYPE: "output", }, ) def _metric_shared_attributes( response_vendor: str, response_model: str, is_streaming: bool = False ): return { "vendor": response_vendor, GenAIAttributes.GEN_AI_RESPONSE_MODEL: response_model, GenAIAttributes.GEN_AI_SYSTEM: "bedrock", "stream": is_streaming, } def set_converse_model_span_attributes(span, provider, model, kwargs): _set_span_attribute(span, GenAIAttributes.GEN_AI_SYSTEM, provider) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, model) _set_span_attribute( span, SpanAttributes.LLM_REQUEST_TYPE, LLMRequestTypeValues.CHAT.value ) guardrail_config = kwargs.get("guardrailConfig") if guardrail_config: _set_span_attribute(span, AWS_BEDROCK_GUARDRAIL_ID, _guardrail_value(guardrail_config)) config = {} if "inferenceConfig" in kwargs: config = kwargs.get("inferenceConfig") _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, config.get("topP")) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, config.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, config.get("maxTokens") ) def set_converse_input_prompt_span_attributes(kwargs, span): if not should_send_prompts(): return prompt_idx = 0 if "system" in kwargs: for idx, prompt in enumerate(kwargs["system"]): prompt_idx = idx + 1 _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.role", "system" ) # TODO: add support for "image" _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{idx}.content", prompt.get("text"), ) if "messages" in kwargs: for idx, prompt in enumerate(kwargs["messages"]): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx+idx}.role", prompt.get("role"), ) # TODO: here we stringify the object, consider moving these to events or prompt.{i}.content.{j} _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{prompt_idx+idx}.content", json.dumps(prompt.get("content", ""), default=str), ) def set_converse_response_span_attributes(response, span): if not should_send_prompts(): return if "output" in response: message = response["output"]["message"] _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", message.get("role") ) for idx, content in enumerate(message["content"]): _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.{idx}.content", content.get("text"), ) def set_converse_streaming_response_span_attributes(response, role, span): if not should_send_prompts(): return _set_span_attribute(span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", role) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", "".join(response) ) def converse_usage_record(span, response, metric_params): prompt_tokens = 0 completion_tokens = 0 if "usage" in response: if "inputTokens" in response["usage"]: prompt_tokens = response["usage"]["inputTokens"] if "outputTokens" in response["usage"]: completion_tokens = response["usage"]["outputTokens"] _record_usage_to_span( span, prompt_tokens, completion_tokens, metric_params, )