Find answers from the community

Updated 10 months ago

I'm looking for a callback handler that

I'm looking for a callback handler that will stream back Llama Index events in real-time. And then I'd like to combine that stream with the streamed response you typically get from a chat engine. The result would enable a consumer of Llama Index to iterate over a generator/queue that either produced a Llama Index event (a CBEventType) or the delta of a streamed chat response.

I'm willing to build this callback handler but want to check if anyone has seen one. I would think it's a fairly common feature to want to stream back both the CBEventType events and the generated response in one iterator.
L
t
77 comments
There was this (now stalled) pr a while back that maybe you can take insirpration from https://github.com/run-llama/llama_index/pull/9164
Or, you can take the approach of writing a callback handler that just sends sever-side events for example, to a frontend
๐Ÿ˜ฎ that PR is a good lead. Thanks for that.

I do want to stream server-side events to the client, but at the same time also stream the StreamingAgentChatResponse.response_gen. Thus I think I need to combine both of those things (events and chat response deltas) into one stream.
One idea is to turn the chat response deltas into events as well. So StreamingAgentChatResponse.response_gen just returns events and not deltas
Could work yes. Currently an event is only fired when the stream is done (this is handled in some rather fun (i.e. ugly) wrapper here
https://github.com/run-llama/llama_index/blob/fc51b9fcc9c2bbdc09a9ac91deea7715872c3f44/llama-index-core/llama_index/core/llms/callbacks.py#L24
Since you are digging into the callbacks, I wanted to run a few things by you

We are planning some QoL updates to the callback system, mainly
  1. Adding typed event-payload pairs, so that you know exactly what payload an event has
  2. Docs on the above
  3. Possibly only enabling callbacks to be global in the Settings object. This would reduce code complexity with having to inherit callback managers in the code
Do those things make sense? If you have other ideas/pain points/comments, very open to suggestions. Your plan to add an iterator would also be very nice to have ๐Ÿ™
I do have one suggestion regarding events. Currently, a callback handler implements on_event_start and on_event_end and for each event, these get triggered with the same event_type. If you wanted to log these events, in a queue, you'd need to generate a CBEvent object. But there is no property in CBEvent allowing you to discern whether it's a starting event or an ending event. So I had to make a CBEndEvent class that is basically just
Plain Text
class CBEventEnd(CBEvent):  # pylint: disable=too-few-public-methods
    pass

so I could later tell whether the events in my event queue were starting or ending events. Perhaps adding an is_start or is_end boolean to the CBEvent object would solve this.
As for payloads, yes they are currently payload: dict[str, Any] | None = None, which isn't strongly typed.

We should prolly have well-defined event classes that all extend CBEvent and those are sent into on_event_start and on_event_end instead
ah good point!
only enabling callbacks to be global in the Settings object.
Personally not a fan of global anything. I'd rather see a strong dependency injection pattern where settings are injected into components.
hmmm... yea fair ๐Ÿ˜… Is there an actual use case for a callback manager ever not being global though?
What if you had two LlamaIndex stacks running parallel with completely different settings?
In the same process? Settings.callback_manager would only apply per-process I think (If I'm thinking about this correctly)
In the same process yes, but each running inside coroutines.
and why do they need different callback managers? Just because something about the handlers they are using is different? (Sorry, just want to make sure I fully grasp the use case haha)
the current ingection of the callback manager into components is just very confusing/hard to trace... will have to think about how to improve this then
Let's say a user query arrives and you want to run it against two different LlamaIndex implementations, each using different LLMs and monitoring different events. A callback manager is a container for callback handlers. To be efficient, you want to only include the callback handlers that your LlamaIndex stack cares to use, yet you have two stacks that care about different handlers.

If the callback manager is only ever global, you need to push a bunch of handlers into it, run stack1, then adjust handlers and then run stack2. That feels wrong. Instead, you could make two callback managers, each with their own set of handlers and inject each one into each stack and let them run in parallel, oblivious to each other.

Dependency injection keeps things nice and separated, but yes it can be a hassle to drill dependencies down into each component. It's the price you pay for a great Inversion of Control pattern that keeps everything loosely coupled.
Plus it makes testing super lovely as you can test various components by injecting mock dependencies into them.
Ooooo this is useful - merging two async generators into one:

https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python/55317623#55317623

So the EventStreamerCallbackHandler would produce a generator of events from its Queue. And the StreamingAgentChatResponse is already doing this for its chat response deltas. Now we merge them via the above solution and we get a StreamingAgentChatAndEventsResponse generator.

I need to better understand when start_trace and end_trace are called inside a callback handler to understand when the EventStreamerCallbackHandler would know its Queue is never going to get new events. Who (or which component) is responsible for calling start_trace and end_trace?
usually start/end trace is handled by either with callback_manager.as_trace("name") or a decorator like @trace_method("chat")
both of those handle calling start/end trace
Do you have an example (notebook or doc) showing this pattern in use?
let find a spot in the code
Hmmm...I have to rethink my architecture to solve this real-time event streaming problem. My naive approach is similar to the above PR where I have something like:
Plain Text
streamed_chat_and_events_response = chat_engine.stream_chat_with_events(user_input)

for message in streamed_chat_and_events_response.response_gen():
    if isinstance(message, CBEvent):
      # We have an event
    else:
      # this is a message delta


But chat_engine.stream_chat_with_events doesn't resolve until we've started the response synthesis phase. So we don't start iterating over the generator and seeing the events until much later than when they've actually happened. I'm not sure how to proceed now.
๐Ÿค” I guess the handler/trace needs to be triggered sooner? Or this is an issue with combining generators?
I think my problem might be using the out-of-the-box chat engines. I think I need to build my own so I can do:at the same moment the chat stream is written to the queue in its own thread. This will allow me to return a StreamingAgentChatAndEventsResponse that kicks off its queue writing (in its own thread) at the same time it is generated/returned.
Time to experiment...

(but first, I can't seem to get basic async streaming chat to work. It just hangs. I'm still on 0.9.32 so maybe that's the issue. But I sense real-time streaming of events is going to require async, unless someone can show me how to combine two sync generators (for events and response deltas). I tried aiostream last night and was able to combine async generators (but the chat_engine's async_response_gen() just hangs and I haven't figured out why yet)).
Pardon me while I'll talk out loud (Rubber Duck debugging)...

This is from a ContextChatEngine:
Plain Text
    @trace_method("chat")
    async def astream_chat(
        self, message: str, chat_history: Optional[List[ChatMessage]] = None
    ) -> StreamingAgentChatResponse:
        if chat_history is not None:
            self._memory.set(chat_history)
        self._memory.put(ChatMessage(content=message, role="user"))

        context_str_template, nodes = await self._agenerate_context(message)
        prefix_messages = self._get_prefix_messages_with_context(context_str_template)
        initial_token_count = len(
            self._memory.tokenizer_fn(
                " ".join([(m.content or "") for m in prefix_messages])
            )
        )
        all_messages = prefix_messages + self._memory.get(
            initial_token_count=initial_token_count
        )

        chat_response = StreamingAgentChatResponse(
            achat_stream=await self._llm.astream_chat(all_messages),
            sources=[
                ToolOutput(
                    tool_name="retriever",
                    content=str(prefix_messages[0]),
                    raw_input={"message": message},
                    raw_output=prefix_messages[0],
                )
            ],
            source_nodes=nodes,
        )
        thread = Thread(
            target=lambda x: asyncio.run(chat_response.awrite_response_to_history(x)),
            args=(self._memory,),
        )
        thread.start()

        return chat_response
It won't return chat_response until
Plain Text
achat_stream=await self._llm.astream_chat(all_messages)

resolves and that won't happen until the LLM produces something in its chat stream. Essentially this function doesn't return until the response from the LLM begins.

If we're going to get a real-time events iterator, in addition to this chat_response iterator combined into one iterator, we can't await on the LLM's astream_chat. Perhaps instead, we pass to achat_stream a Callable that produces a ChatResponseAsyncGen instead of an actual ChatResponseAsyncGen and thus we can delay actually calling it until we begin the entire iteration process.
Perhaps the thread at the bottom of that function should be responsible for the
Plain Text
await self._llm.astream_chat(all_messages)

call and it then populates its own queue. This way this function can return before the LLM response begins
I think.... that makes sense haha. Doing my best to follow along.

If this can be put into a PR at somepoint, happy to merge/review/test ๐Ÿ™‚
I just cloned your repo but VSCode is a sea of red lines - mostly Pylance complaining but oddly Pylint complains it can't resolve any of the llama_index.* imports
ha, my vscode works fine, but I can see that happening. Make sure vscode is using a venv with at least llama-index-core installed in it
I have my VSCode python interpreter pointing to ~/.cache/pypoetry/virtualenvs/llama-index-UPoQwng--py3.10/bin/python and that helps. I gather that's the right spot as I think it's where poetry is keeping the virtual env.

Pylint is happy now.

Pylance is still complaining about loads of type errors, but as you pointed out in my last PR, the framework needs to support older versions of Python. Do you use the Pylance VSCode extension and do you have any special settings to make it happy?
Also, a question about Callback Traces. Would it be correct to assume that while a Trace is happening, we'd expect that for every on_event_start called there would be a corresponding on_event_end called as well? In other words, do we expect an even number of events to happen inside a Trace, half being "start events" and half being "end events"?

It appears inside a given Trace I can see a CBEventType.LLM event start, then the Trace ends, then the CBEventType.LLM event ends but this on_event_end callback happens outside the Trace. This is unexpected.
I checked my extensions, and I only have pylint ๐Ÿ‘€
Yes, there should be an event end for every event start

Hmm, I could see that happening with async
Oh good call on the async. But I have tested this with sync as well and I believe it's happening.

And yes, I do see one end event for every start event, but the final CBEventType.LLM end event lands outside the actual trace.
This is my event streamer callback handler

There is no async here (yet). My RAG flow uses a sync ContextChatEngine's stream_chat to run the RAG pipe. The output of the print statements in the event streamer suggests the CBEventType.LLM end event occurs after I see end_trace printed. This is unexpected.
Yea I think for async, the code leaves the trace (and the trace ends), but the final event still needs to be awaited.

In the underlying data structures, that final event should still correctly get logged under the original trace (... I think)
That would be my expectation too, but not what I'm seeing. I sense there is something async happening somewhere in my sync execution. I wonder if you'd be willing to try that above EventStreamer as a callback handler inside a RAG pipeline that uses a sync ContextChatEngine.stream_chat and confirm the order of print statements is as you expect.

We should see
Plain Text
start_trace
Added to Queue: Start ...
Added to Queue: End ...
end_trace


However, we see
Plain Text
start_trace
Added to Queue: Start ...
end_trace
Added to Queue: End ...
I think that lines up with my understanding. The LLM event starts/ends instantly (but only starts adding to the queue once the user iterates over it)
Your LLM event at the end is 0 seconds. I don't think it starts/ends instantly. I think the trace hasn't picked up the "end" event and is performing its duration calculations on a single LLM CBEvent, which keeps the duration to 0.0.
The function _get_time_stats_from_event_pairs is getting passed a List[CBEvent] but it only has one LLM event inside (the start one). It should have two. total_secs starts as 0.0 and remains 0.0 because:
Plain Text
  for event_pair in event_pairs:
      start_time = datetime.strptime(event_pair[0].time, TIMESTAMP_FORMAT)
      end_time = datetime.strptime(event_pair[-1].time, TIMESTAMP_FORMAT)
      total_secs += (end_time - start_time).total_seconds()

adds 0 seconds to total_secs when there is only one event in event_pairs...because event_pair[0] and event_pair[-1] are the same object when there is only one element in the list.
From my observations, the "end" event for that LLM happens after the trace is completed. I haven't figured out why that is the case yet. The trace should end when the final LLM end event happens. The fact that it doesn't feels like an architecture bug. Or is this by design?

What if, instead of using a Queue system for the streamed response from the LLM, streamed responses were treated just like all the other events (i.e. there's a new "response event" produced for a non-streamed response and several for a streamed response)? You could still use the callback handler system to capture these "response events" and stream them back to the client just like the StreamingAgentChatRespone generator. Perhaps the built-in Chat Engines would come equipt with a default callback handler that maintained its own queue of "response events" and it's that generator/queue that was returned when you access something like streamed_agent_chat_response.response_gen. But then you could optionally swap that default callback handler for one that captures even more events and stream those back to the client as well, all from a single generator/iterator.

Perhaps it's wise to fire a special event to signal that the RAG pipe has been completed. Tracing would end when this final event was fired.

Does this event-based-llm-responses idea have any merit or does it conflict with any future plans for the framework? Is this idea clear? I'm keen to work on it but it's a huge project with possible breaking changes so I don't want to proceed if the folks at Llama-index have other ideas.
ahhh good catch...
its definitely an issue with the design. Callbacks are hard haha
We could return streaming as an event, but I would say presvering the current UX (i.e. the user gets a generator returned to them) is important to me
The user could still call chat_engine.stream_chat(user_input) and get back something that they can iterate over (aka a generator), except it's a generator of events, each with a payload and that payload includes the LLM response deltas.
This is every similar to what is currently there - except the thing you're iterating over is a series of CBEvents instead of a Queue inside a StreamingAgentChatResponse
it defnitely sounds like it could work! I'm open to the change ๐Ÿ™‚
Ok, it's quite a bit of work.
hence why it hasn't been touched in a bit ๐Ÿ˜…
Is this something that might be put on a LlamaIndex developer backlog or something that you'd expect your community to produce for you?
Its definitely in our backlog, but typing the payloads and increasing callback coverage are a touch higher priorities
Ok, I have thoughts on those too:
1) Each event should be its own class, extended from CBEvent
2) Each event should be a single event in time, not a pair of start and end events. You can have a RetrieverStart event and a RetrieverEnd event but these are different classes. Think of how JavaScript does it for inspiration. They mastered events and callbacks so let's not reinvent the wheel.
3) Payloads are just members of the class of the event. You get a schema/typing because your event is its own class
4) Folks can build their own events by extending CBEvent
5) There is already a fairly good pattern for events but maybe it becomes
Plain Text
some_event = MyEvent(prop1='foo', prop2='bar')
self.callback_manager.fire_event(some_event)

and folks are responsible for firing events whenever they think is the right time.
1) Agreed, this will help add types for the payloads
2) Agreed (and again, helps add specific types for start/end events)
3) Agreed
4) Yup (although I think custom events are not guaranteed to work in observability integrations, like arize)
5) Personally I kind of like the as_event() syntax too. If a feature needs more precise control, they can call on_event_start/end manually
although I see on_event_start/end is redundant if the event type has that info lol
5) I would think of an event class to be more like dataclasses and they shouldn't have functions - they just hold properties. If you wrap code in with...as event then you're married to the idea of a single event having a start and end but the JS paradigm (as well as the error/exception rasing paradigm) is that you just fire an event at a specific moment and it doesn't spread itself over a block of code
You don't ever say (psuedocode)
Plain Text
with SomeException as exception:
   try:
     result = do_the_thing()
     if (result > 10):
       exception.when_fired(prop1 = result)
     if (result > 1)
       exception.raise()

Instead, you just raise the exception as a single burst.

Events should work like that too. They shouldn't spread over a block of code. They should be fired in one line.
Appreciate the feedback here -- bookmarking this thread in our notion doc hahaha
"If a feature needs more precise control, they can call on_event_start/end manually"
This contradicts point 2 - that events are a single moment in time and don't have starts/ends.
Ok so if we've reached alignment on these ideas (have we done that yet), then how do we divide and conquer?
Do you need to confer with other LlamaIndex folks first?
I don't want to start building PRs if this isn't the general direction that your team is heading in.
It's a ton of work on my end, especially since I'm far more unfamiliar with the codebase than you are
I think we agree! I'm on board
Yea if you want to be involved in this, I can pass along to the team member resonsible for handling this, and we can coordinate something?

We are still dealing with the v0.10.x fallout (rip), but I would suspect us starting on this sometime next week
Like....can I go back to my team and tell them "Llama Index is going to implement these [insert features]" and my team can depend on that happening?
Our company is invested in using LlamaIndex so I can give work hours to helping and I prolly have a few developers here that can chip in as well. I just want to make sure we're aligned and being effective
100% -- I agree with you (and your teams) vision for callbacks. I'd like to make this happen so that it works out for us + community + partner integrations
One concern I have is breaking changes -- either we publish an alpha version at some point and give partners time to adjust, or we support the old interface in some sort of backwards compatibility mode for a short time
Agreed - I've not worked on such a public framework so I understand supporting older APIs is a challenge
I think the class-based-event system would be a rather big change and would prolly break something somewhere
I think the StreamingAgentChatResponse as an iterator over CBEvents would be less of a breaking change
Yea agreed on both points there. I'm sure theres some sneaky magic (i.e. nasty hacks) we can do to maintain enough of the current interface for backward compatibility ๐Ÿ™‚
Then we remove the magic a month after lol
Add a reply
Sign up and join the conversation on Discord