from time import sleep
from typing import Any, Dict, List, Optional
import uuid
import os
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
from llama_index.core.callbacks.base_handler import BaseCallbackHandler
from llama_index.core.callbacks import (
CallbackManager,
LlamaDebugHandler,
)
from llama_index.core import Settings
from llama_index.core.callbacks.schema import (
CBEventType,
)
from opentelemetry import trace
from opentelemetry.context import Context
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.span import Span
from opentelemetry.context import get_current
from llama_indexer.utils.traces import LLAMA_INDEXER_TRACER_NAME
from opentelemetry.context import attach
IS_DEV = os.getenv("PYTHON_ENV", "production") == "development"
class OpenTelemetryCallbackHandler(BaseCallbackHandler):
def __init__(
self,
event_starts_to_ignore: List[CBEventType],
event_ends_to_ignore: List[CBEventType],
):
super().__init__(event_starts_to_ignore, event_ends_to_ignore)
self.span_map: Dict[str, Span] = {}
def on_event_start(
self,
event_type: CBEventType,
payload: Optional[Dict[str, Any]] = None,
event_id: str = "",
parent_id: str = "",
**kwargs: Any,
) -> str:
print(f"Event start: {event_type}")
if event_type in self.event_starts_to_ignore:
return event_id
tracer = trace.get_tracer(LLAMA_INDEXER_TRACER_NAME)
current_span = trace.get_current_span()
print(f"Current Span: {current_span}")
new_span = trace.get_tracer(__name__).start_span(
event_type.name,
context=trace.set_span_in_context(current_span),
kind=trace.SpanKind.INTERNAL,
)
context = get_current()
context = trace.set_span_in_context(new_span, context)
print(f"New Span: {new_span}")
return event_id
def on_event_end(
self,
event_type: CBEventType,
payload: Optional[Dict[str, Any]] = None,
event_id: str = "",
**kwargs: Any,
) -> None:
print(f"Event end: {event_type}")
if event_type in self.event_ends_to_ignore:
return
span = trace.get_current_span()
print(f"Span end: {span}")
if span:
if payload and IS_DEV:
span.set_attribute("payload", str(payload))
span.end()
def start_trace(self, trace_id: Optional[str] = None) -> None:
return None
def end_trace(
self,
trace_id: Optional[str] = None,
trace_map: Optional[Dict[str, List[str]]] = None,
) -> None:
return None
def flush(self):
self.trace_id = None
handlers: List[BaseCallbackHandler] = [
OpenTelemetryCallbackHandler([CBEventType.TEMPLATING], [CBEventType.TEMPLATING])
]
if IS_DEV:
handlers.append(LlamaDebugHandler(print_trace_on_end=True))
Settings.callback_manager = CallbackManager(handlers)
def get_callback_manager() -> CallbackManager:
return Settings.callback_manager