109 lines
3.4 KiB
Python
109 lines
3.4 KiB
Python
from opentelemetry.instrumentation.marqo.utils import dont_throw
|
|
from opentelemetry.semconv.trace import SpanAttributes
|
|
|
|
from opentelemetry import context as context_api
|
|
from opentelemetry.instrumentation.utils import (
|
|
_SUPPRESS_INSTRUMENTATION_KEY,
|
|
)
|
|
from opentelemetry.semconv_ai import SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, Events
|
|
from opentelemetry.semconv_ai import SpanAttributes as AISpanAttributes
|
|
|
|
|
|
def _with_tracer_wrapper(func):
|
|
"""Helper for providing tracer for wrapper functions."""
|
|
|
|
def _with_tracer(tracer, to_wrap):
|
|
def wrapper(wrapped, instance, args, kwargs):
|
|
return func(tracer, to_wrap, wrapped, instance, args, kwargs)
|
|
|
|
return wrapper
|
|
|
|
return _with_tracer
|
|
|
|
|
|
def _set_span_attribute(span, name, value):
|
|
if value is not None:
|
|
if value != "":
|
|
span.set_attribute(name, value)
|
|
return
|
|
|
|
|
|
@_with_tracer_wrapper
|
|
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
|
|
"""Instruments and calls every function defined in TO_WRAP."""
|
|
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
|
|
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
|
|
):
|
|
return wrapped(*args, **kwargs)
|
|
|
|
name = to_wrap.get("span_name")
|
|
with tracer.start_as_current_span(name) as span:
|
|
span.set_attribute(SpanAttributes.DB_SYSTEM, "marqo")
|
|
span.set_attribute(SpanAttributes.DB_OPERATION, to_wrap.get("method"))
|
|
|
|
if to_wrap.get("method") == "add_documents":
|
|
_set_add_documents_attributes(span, kwargs)
|
|
elif to_wrap.get("method") == "search":
|
|
_set_search_attributes(span, kwargs)
|
|
elif to_wrap.get("method") == "delete_documents":
|
|
_set_delete_documents_attributes(span, kwargs)
|
|
|
|
return_value = wrapped(*args, **kwargs)
|
|
if to_wrap.get("method") == "search":
|
|
_set_search_result_attributes(span, return_value)
|
|
if to_wrap.get("method") == "delete_documents":
|
|
_set_delete_documents_response_attributes(span, return_value)
|
|
|
|
return return_value
|
|
|
|
|
|
def count_or_none(obj):
|
|
if obj:
|
|
return len(obj)
|
|
|
|
return None
|
|
|
|
|
|
@dont_throw
|
|
def _set_add_documents_attributes(span, kwargs):
|
|
"""
|
|
In contrast to the example in Marqo's docs,
|
|
this requires the declaration of the documents array with the label "documents = ..."
|
|
(https://docs.marqo.ai/2.8/API-Reference/Documents/add_or_replace_documents/)
|
|
Otherwise we cannot retrieve the documents
|
|
see also: https://github.com/traceloop/openllmetry/issues/539
|
|
"""
|
|
_set_span_attribute(
|
|
span,
|
|
AISpanAttributes.CHROMADB_ADD_DOCUMENTS_COUNT,
|
|
count_or_none(kwargs.get("documents")),
|
|
)
|
|
|
|
|
|
@dont_throw
|
|
def _set_search_attributes(span, kwargs):
|
|
_set_span_attribute(span, "db.marqo.search.query", kwargs.get("q"))
|
|
|
|
|
|
@dont_throw
|
|
def _set_delete_documents_attributes(span, kwargs):
|
|
_set_span_attribute(
|
|
span, AISpanAttributes.MILVUS_DELETE_IDS_COUNT, count_or_none(kwargs.get("ids"))
|
|
)
|
|
|
|
|
|
@dont_throw
|
|
def _set_search_result_attributes(span, kwargs):
|
|
_set_span_attribute(
|
|
span, "db.marqo.search.processing_time", kwargs.get("processingTimeMs")
|
|
)
|
|
|
|
events = kwargs.get("hits")
|
|
for event in events:
|
|
span.add_event(name=Events.DB_QUERY_RESULT.value, attributes=event)
|
|
|
|
|
|
@dont_throw
|
|
def _set_delete_documents_response_attributes(span, kwargs):
|
|
_set_span_attribute(span, "db.marqo.delete_documents.status", kwargs.get("status"))
|