import copy import json import base64 import logging import asyncio import threading from opentelemetry.instrumentation.vertexai.utils import dont_throw, should_send_prompts from opentelemetry.instrumentation.vertexai.config import Config from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, ) from opentelemetry.semconv_ai import SpanAttributes logger = logging.getLogger(__name__) def _set_span_attribute(span, name, value): if value is not None: if value != "": span.set_attribute(name, value) return def _is_base64_image_part(item): """Check if item is a VertexAI Part object containing image data""" try: # Check if it has the Part attributes we expect if not hasattr(item, 'inline_data') or not hasattr(item, 'mime_type'): return False # Check if it's an image mime type and has inline data if item.mime_type and 'image/' in item.mime_type and item.inline_data: # Check if the inline_data has actual data if hasattr(item.inline_data, 'data') and item.inline_data.data: return True return False except Exception: return False async def _process_image_part(item, trace_id, span_id, content_index): """Process a VertexAI Part object containing image data""" if not Config.upload_base64_image: return None try: # Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg') image_format = item.mime_type.split('/')[1] if item.mime_type else 'unknown' image_name = f"content_{content_index}.{image_format}" # Convert binary data to base64 string for upload binary_data = item.inline_data.data base64_string = base64.b64encode(binary_data).decode('utf-8') # Upload the base64 data - convert IDs to strings url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) # Return OpenAI-compatible format for consistency across LLM providers return { "type": "image_url", "image_url": {"url": url} } except Exception as e: logger.warning(f"Failed to process image part: {e}") # Return None to skip adding this image to the span return None def run_async(method): """Handle async method in sync context, following OpenAI's battle-tested approach""" try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): thread = threading.Thread(target=lambda: asyncio.run(method)) thread.start() thread.join() else: asyncio.run(method) def _process_image_part_sync(item, trace_id, span_id, content_index): """Synchronous version of image part processing using OpenAI's pattern""" if not Config.upload_base64_image: return None try: # Extract format from mime type (e.g., 'image/jpeg' -> 'jpeg') image_format = item.mime_type.split('/')[1] if item.mime_type else 'unknown' image_name = f"content_{content_index}.{image_format}" # Convert binary data to base64 string for upload binary_data = item.inline_data.data base64_string = base64.b64encode(binary_data).decode('utf-8') # Use OpenAI's run_async pattern to handle the async upload function url = None async def upload_task(): nonlocal url url = await Config.upload_base64_image(str(trace_id), str(span_id), image_name, base64_string) run_async(upload_task()) return { "type": "image_url", "image_url": {"url": url} } except Exception as e: logger.warning(f"Failed to process image part sync: {e}") # Return None to skip adding this image to the span return None async def _process_vertexai_argument(argument, span): """Process a single argument for VertexAI, handling different types""" if isinstance(argument, str): # Simple text argument in OpenAI format return [{"type": "text", "text": argument}] elif isinstance(argument, list): # List of mixed content (text strings and Part objects) - deep copy and process content_list = copy.deepcopy(argument) processed_items = [] for item_index, content_item in enumerate(content_list): processed_item = await _process_content_item_vertexai(content_item, span, item_index) if processed_item is not None: processed_items.append(processed_item) return processed_items else: # Single Part object - convert to OpenAI format processed_item = await _process_content_item_vertexai(argument, span, 0) return [processed_item] if processed_item is not None else [] async def _process_content_item_vertexai(content_item, span, item_index): """Process a single content item for VertexAI""" if isinstance(content_item, str): # Convert text to OpenAI format return {"type": "text", "text": content_item} elif _is_base64_image_part(content_item): # Process image part return await _process_image_part( content_item, span.context.trace_id, span.context.span_id, item_index ) elif hasattr(content_item, 'text'): # Text part to OpenAI format return {"type": "text", "text": content_item.text} else: # Other types as text return {"type": "text", "text": str(content_item)} def _process_vertexai_argument_sync(argument, span): """Synchronous version of argument processing for VertexAI""" if isinstance(argument, str): # Simple text argument in OpenAI format return [{"type": "text", "text": argument}] elif isinstance(argument, list): # List of mixed content (text strings and Part objects) - deep copy and process content_list = copy.deepcopy(argument) processed_items = [] for item_index, content_item in enumerate(content_list): processed_item = _process_content_item_vertexai_sync(content_item, span, item_index) if processed_item is not None: processed_items.append(processed_item) return processed_items else: # Single Part object - convert to OpenAI format processed_item = _process_content_item_vertexai_sync(argument, span, 0) return [processed_item] if processed_item is not None else [] def _process_content_item_vertexai_sync(content_item, span, item_index): """Synchronous version of content item processing for VertexAI""" if isinstance(content_item, str): # Convert text to OpenAI format return {"type": "text", "text": content_item} elif _is_base64_image_part(content_item): # Process image part return _process_image_part_sync( content_item, span.context.trace_id, span.context.span_id, item_index ) elif hasattr(content_item, 'text'): # Text part to OpenAI format return {"type": "text", "text": content_item.text} else: # Other types as text return {"type": "text", "text": str(content_item)} @dont_throw async def set_input_attributes(span, args): """Process input arguments, handling both text and image content""" if not span.is_recording(): return if should_send_prompts() and args is not None and len(args) > 0: # Process each argument using extracted helper methods for arg_index, argument in enumerate(args): processed_content = await _process_vertexai_argument(argument, span) if processed_content: _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{arg_index}.role", "user" ) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{arg_index}.content", json.dumps(processed_content) ) # Sync version with image processing support @dont_throw def set_input_attributes_sync(span, args): """Synchronous version with image processing support""" if not span.is_recording(): return if should_send_prompts() and args is not None and len(args) > 0: # Process each argument using extracted helper methods for arg_index, argument in enumerate(args): processed_content = _process_vertexai_argument_sync(argument, span) if processed_content: _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{arg_index}.role", "user" ) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.{arg_index}.content", json.dumps(processed_content) ) @dont_throw def set_model_input_attributes(span, kwargs, llm_model): if not span.is_recording(): return _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, llm_model) _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_PROMPT}.0.user", kwargs.get("prompt") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_TEMPERATURE, kwargs.get("temperature") ) _set_span_attribute( span, GenAIAttributes.GEN_AI_REQUEST_MAX_TOKENS, kwargs.get("max_output_tokens") ) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_P, kwargs.get("top_p")) _set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_TOP_K, kwargs.get("top_k")) _set_span_attribute( span, SpanAttributes.LLM_PRESENCE_PENALTY, kwargs.get("presence_penalty") ) _set_span_attribute( span, SpanAttributes.LLM_FREQUENCY_PENALTY, kwargs.get("frequency_penalty") ) @dont_throw def set_response_attributes(span, llm_model, generation_text): if not span.is_recording() or not should_send_prompts(): return _set_span_attribute(span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role", "assistant") _set_span_attribute( span, f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content", generation_text, ) @dont_throw def set_model_response_attributes(span, llm_model, token_usage): if not span.is_recording(): return _set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, llm_model) if token_usage: _set_span_attribute( span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, token_usage.total_token_count, ) _set_span_attribute( span, GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS, token_usage.candidates_token_count, ) _set_span_attribute( span, GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS, token_usage.prompt_token_count, )