202 lines
7.8 KiB
Python
202 lines
7.8 KiB
Python
import os
|
|
import time
|
|
from typing import Collection
|
|
|
|
from wrapt import wrap_function_wrapper
|
|
from opentelemetry.trace import SpanKind, get_tracer, Tracer
|
|
from opentelemetry.trace.status import Status, StatusCode
|
|
from opentelemetry.metrics import Histogram, Meter, get_meter
|
|
from opentelemetry.instrumentation.utils import unwrap
|
|
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
|
from opentelemetry.instrumentation.crewai.version import __version__
|
|
from opentelemetry.semconv._incubating.attributes import (
|
|
gen_ai_attributes as GenAIAttributes,
|
|
)
|
|
from opentelemetry.semconv_ai import SpanAttributes, TraceloopSpanKindValues, Meters
|
|
from .crewai_span_attributes import CrewAISpanAttributes, set_span_attribute
|
|
|
|
_instruments = ("crewai >= 0.70.0",)
|
|
|
|
|
|
class CrewAIInstrumentor(BaseInstrumentor):
|
|
|
|
def instrumentation_dependencies(self) -> Collection[str]:
|
|
return _instruments
|
|
|
|
def _instrument(self, **kwargs):
|
|
tracer_provider = kwargs.get("tracer_provider")
|
|
tracer = get_tracer(__name__, __version__, tracer_provider)
|
|
|
|
meter_provider = kwargs.get("meter_provider")
|
|
meter = get_meter(__name__, __version__, meter_provider)
|
|
|
|
token_histogram = None
|
|
duration_histogram = None
|
|
if is_metrics_enabled():
|
|
token_histogram, duration_histogram = _create_metrics(meter)
|
|
|
|
wrap_function_wrapper("crewai.crew", "Crew.kickoff",
|
|
wrap_kickoff(tracer, duration_histogram, token_histogram))
|
|
wrap_function_wrapper("crewai.agent", "Agent.execute_task",
|
|
wrap_agent_execute_task(tracer, duration_histogram, token_histogram))
|
|
wrap_function_wrapper("crewai.task", "Task.execute_sync",
|
|
wrap_task_execute(tracer, duration_histogram, token_histogram))
|
|
wrap_function_wrapper("crewai.llm", "LLM.call",
|
|
wrap_llm_call(tracer, duration_histogram, token_histogram))
|
|
|
|
def _uninstrument(self, **kwargs):
|
|
unwrap("crewai.crew.Crew", "kickoff")
|
|
unwrap("crewai.agent.Agent", "execute_task")
|
|
unwrap("crewai.task.Task", "execute_sync")
|
|
unwrap("crewai.llm.LLM", "call")
|
|
|
|
|
|
def with_tracer_wrapper(func):
|
|
"""Helper for providing tracer for wrapper functions."""
|
|
|
|
def _with_tracer(tracer, duration_histogram, token_histogram):
|
|
def wrapper(wrapped, instance, args, kwargs):
|
|
return func(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs)
|
|
return wrapper
|
|
return _with_tracer
|
|
|
|
|
|
@with_tracer_wrapper
|
|
def wrap_kickoff(tracer: Tracer, duration_histogram: Histogram, token_histogram: Histogram,
|
|
wrapped, instance, args, kwargs):
|
|
with tracer.start_as_current_span(
|
|
"crewai.workflow",
|
|
kind=SpanKind.INTERNAL,
|
|
attributes={
|
|
GenAIAttributes.GEN_AI_SYSTEM: "crewai",
|
|
}
|
|
) as span:
|
|
try:
|
|
CrewAISpanAttributes(span=span, instance=instance)
|
|
result = wrapped(*args, **kwargs)
|
|
if result:
|
|
class_name = instance.__class__.__name__
|
|
span.set_attribute(f"crewai.{class_name.lower()}.result", str(result))
|
|
span.set_status(Status(StatusCode.OK))
|
|
if class_name == "Crew":
|
|
for attr in ["tasks_output", "token_usage", "usage_metrics"]:
|
|
if hasattr(result, attr):
|
|
span.set_attribute(f"crewai.crew.{attr}", str(getattr(result, attr)))
|
|
return result
|
|
except Exception as ex:
|
|
span.set_status(Status(StatusCode.ERROR, str(ex)))
|
|
raise
|
|
|
|
|
|
@with_tracer_wrapper
|
|
def wrap_agent_execute_task(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs):
|
|
agent_name = instance.role if hasattr(instance, "role") else "agent"
|
|
with tracer.start_as_current_span(
|
|
f"{agent_name}.agent",
|
|
kind=SpanKind.CLIENT,
|
|
attributes={
|
|
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
|
|
}
|
|
) as span:
|
|
try:
|
|
CrewAISpanAttributes(span=span, instance=instance)
|
|
result = wrapped(*args, **kwargs)
|
|
if token_histogram:
|
|
token_histogram.record(
|
|
instance._token_process.get_summary().prompt_tokens,
|
|
attributes={
|
|
GenAIAttributes.GEN_AI_SYSTEM: "crewai",
|
|
GenAIAttributes.GEN_AI_TOKEN_TYPE: "input",
|
|
GenAIAttributes.GEN_AI_RESPONSE_MODEL: str(instance.llm.model),
|
|
}
|
|
)
|
|
token_histogram.record(
|
|
instance._token_process.get_summary().completion_tokens,
|
|
attributes={
|
|
GenAIAttributes.GEN_AI_SYSTEM: "crewai",
|
|
GenAIAttributes.GEN_AI_TOKEN_TYPE: "output",
|
|
GenAIAttributes.GEN_AI_RESPONSE_MODEL: str(instance.llm.model),
|
|
},
|
|
)
|
|
|
|
set_span_attribute(span, GenAIAttributes.GEN_AI_REQUEST_MODEL, str(instance.llm.model))
|
|
set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, str(instance.llm.model))
|
|
span.set_status(Status(StatusCode.OK))
|
|
return result
|
|
except Exception as ex:
|
|
span.set_status(Status(StatusCode.ERROR, str(ex)))
|
|
raise
|
|
|
|
|
|
@with_tracer_wrapper
|
|
def wrap_task_execute(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs):
|
|
task_name = instance.description if hasattr(instance, "description") else "task"
|
|
|
|
with tracer.start_as_current_span(
|
|
f"{task_name}.task",
|
|
kind=SpanKind.CLIENT,
|
|
attributes={
|
|
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.TASK.value,
|
|
}
|
|
) as span:
|
|
try:
|
|
CrewAISpanAttributes(span=span, instance=instance)
|
|
result = wrapped(*args, **kwargs)
|
|
set_span_attribute(span, SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result))
|
|
span.set_status(Status(StatusCode.OK))
|
|
return result
|
|
except Exception as ex:
|
|
span.set_status(Status(StatusCode.ERROR, str(ex)))
|
|
raise
|
|
|
|
|
|
@with_tracer_wrapper
|
|
def wrap_llm_call(tracer, duration_histogram, token_histogram, wrapped, instance, args, kwargs):
|
|
llm = instance.model if hasattr(instance, "model") else "llm"
|
|
with tracer.start_as_current_span(
|
|
f"{llm}.llm",
|
|
kind=SpanKind.CLIENT,
|
|
attributes={
|
|
}
|
|
) as span:
|
|
start_time = time.time()
|
|
try:
|
|
CrewAISpanAttributes(span=span, instance=instance)
|
|
result = wrapped(*args, **kwargs)
|
|
|
|
if duration_histogram:
|
|
duration = time.time() - start_time
|
|
duration_histogram.record(
|
|
duration,
|
|
attributes={
|
|
GenAIAttributes.GEN_AI_SYSTEM: "crewai",
|
|
GenAIAttributes.GEN_AI_RESPONSE_MODEL: str(instance.model)
|
|
},
|
|
)
|
|
|
|
span.set_status(Status(StatusCode.OK))
|
|
return result
|
|
except Exception as ex:
|
|
span.set_status(Status(StatusCode.ERROR, str(ex)))
|
|
raise
|
|
|
|
|
|
def is_metrics_enabled() -> bool:
|
|
return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true"
|
|
|
|
|
|
def _create_metrics(meter: Meter):
|
|
token_histogram = meter.create_histogram(
|
|
name=Meters.LLM_TOKEN_USAGE,
|
|
unit="token",
|
|
description="Measures number of input and output tokens used",
|
|
)
|
|
|
|
duration_histogram = meter.create_histogram(
|
|
name=Meters.LLM_OPERATION_DURATION,
|
|
unit="s",
|
|
description="GenAI operation duration",
|
|
)
|
|
|
|
return token_histogram, duration_histogram
|