Or, you can take the approach of writing a callback handler that just sends sever-side events for example, to a frontend
๐ฎ that PR is a good lead. Thanks for that.
I do want to stream server-side events to the client, but at the same time also stream the StreamingAgentChatResponse.response_gen. Thus I think I need to combine both of those things (events and chat response deltas) into one stream.
One idea is to turn the chat response deltas into events as well. So StreamingAgentChatResponse.response_gen just returns events and not deltas
Since you are digging into the callbacks, I wanted to run a few things by you
We are planning some QoL updates to the callback system, mainly
- Adding typed event-payload pairs, so that you know exactly what payload an event has
- Docs on the above
- Possibly only enabling callbacks to be global in the
Settings
object. This would reduce code complexity with having to inherit callback managers in the code
Do those things make sense? If you have other ideas/pain points/comments, very open to suggestions. Your plan to add an iterator would also be very nice to have ๐
I do have one suggestion regarding events. Currently, a callback handler implements
on_event_start
and
on_event_end
and for each event, these get triggered with the same event_type. If you wanted to log these events, in a queue, you'd need to generate a
CBEvent
object. But there is no property in
CBEvent
allowing you to discern whether it's a starting event or an ending event. So I had to make a
CBEndEvent
class that is basically just
class CBEventEnd(CBEvent): # pylint: disable=too-few-public-methods
pass
so I could later tell whether the events in my event queue were starting or ending events. Perhaps adding an
is_start
or
is_end
boolean to the
CBEvent
object would solve this.
As for payloads, yes they are currently payload: dict[str, Any] | None = None,
which isn't strongly typed.
We should prolly have well-defined event classes that all extend CBEvent
and those are sent into on_event_start
and on_event_end
instead
only enabling callbacks to be global in the Settings object.
Personally not a fan of global anything. I'd rather see a strong dependency injection pattern where settings are injected into components.
hmmm... yea fair ๐
Is there an actual use case for a callback manager ever not being global though?
What if you had two LlamaIndex stacks running parallel with completely different settings?
In the same process? Settings.callback_manager
would only apply per-process I think (If I'm thinking about this correctly)
In the same process yes, but each running inside coroutines.
and why do they need different callback managers? Just because something about the handlers they are using is different? (Sorry, just want to make sure I fully grasp the use case haha)
the current ingection of the callback manager into components is just very confusing/hard to trace... will have to think about how to improve this then
Let's say a user query arrives and you want to run it against two different LlamaIndex implementations, each using different LLMs and monitoring different events. A callback manager is a container for callback handlers. To be efficient, you want to only include the callback handlers that your LlamaIndex stack cares to use, yet you have two stacks that care about different handlers.
If the callback manager is only ever global, you need to push a bunch of handlers into it, run stack1, then adjust handlers and then run stack2. That feels wrong. Instead, you could make two callback managers, each with their own set of handlers and inject each one into each stack and let them run in parallel, oblivious to each other.
Dependency injection keeps things nice and separated, but yes it can be a hassle to drill dependencies down into each component. It's the price you pay for a great Inversion of Control pattern that keeps everything loosely coupled.
Plus it makes testing super lovely as you can test various components by injecting mock dependencies into them.
Ooooo this is useful - merging two async generators into one:
https://stackoverflow.com/questions/55299564/join-multiple-async-generators-in-python/55317623#55317623So the EventStreamerCallbackHandler would produce a generator of events from its Queue. And the StreamingAgentChatResponse is already doing this for its chat response deltas. Now we merge them via the above solution and we get a StreamingAgentChatAndEventsResponse generator.
I need to better understand when
start_trace
and
end_trace
are called inside a callback handler to understand when the EventStreamerCallbackHandler would know its Queue is never going to get new events. Who (or which component) is responsible for calling
start_trace
and
end_trace
?
usually start/end trace is handled by either with callback_manager.as_trace("name")
or a decorator like @trace_method("chat")
both of those handle calling start/end trace
Do you have an example (notebook or doc) showing this pattern in use?
let find a spot in the code
Hmmm...I have to rethink my architecture to solve this real-time event streaming problem. My naive approach is similar to the above PR where I have something like:
streamed_chat_and_events_response = chat_engine.stream_chat_with_events(user_input)
for message in streamed_chat_and_events_response.response_gen():
if isinstance(message, CBEvent):
# We have an event
else:
# this is a message delta
But
chat_engine.stream_chat_with_events
doesn't resolve until we've started the response synthesis phase. So we don't start iterating over the generator and seeing the events until much later than when they've actually happened. I'm not sure how to proceed now.
๐ค I guess the handler/trace needs to be triggered sooner? Or this is an issue with combining generators?
I think my problem might be using the out-of-the-box chat engines. I think I need to build my own so I can do:at the same moment the chat stream is written to the queue in its own thread. This will allow me to return a StreamingAgentChatAndEventsResponse
that kicks off its queue writing (in its own thread) at the same time it is generated/returned.
Time to experiment...
(but first, I can't seem to get basic async streaming chat to work. It just hangs. I'm still on 0.9.32 so maybe that's the issue. But I sense real-time streaming of events is going to require async, unless someone can show me how to combine two sync generators (for events and response deltas). I tried aiostream last night and was able to combine async generators (but the chat_engine's async_response_gen() just hangs and I haven't figured out why yet)).
Pardon me while I'll talk out loud (Rubber Duck debugging)...
This is from a
ContextChatEngine
:
@trace_method("chat")
async def astream_chat(
self, message: str, chat_history: Optional[List[ChatMessage]] = None
) -> StreamingAgentChatResponse:
if chat_history is not None:
self._memory.set(chat_history)
self._memory.put(ChatMessage(content=message, role="user"))
context_str_template, nodes = await self._agenerate_context(message)
prefix_messages = self._get_prefix_messages_with_context(context_str_template)
initial_token_count = len(
self._memory.tokenizer_fn(
" ".join([(m.content or "") for m in prefix_messages])
)
)
all_messages = prefix_messages + self._memory.get(
initial_token_count=initial_token_count
)
chat_response = StreamingAgentChatResponse(
achat_stream=await self._llm.astream_chat(all_messages),
sources=[
ToolOutput(
tool_name="retriever",
content=str(prefix_messages[0]),
raw_input={"message": message},
raw_output=prefix_messages[0],
)
],
source_nodes=nodes,
)
thread = Thread(
target=lambda x: asyncio.run(chat_response.awrite_response_to_history(x)),
args=(self._memory,),
)
thread.start()
return chat_response
It won't return
chat_response
until
achat_stream=await self._llm.astream_chat(all_messages)
resolves and that won't happen until the LLM produces something in its chat stream. Essentially this function doesn't return until the response from the LLM begins.
If we're going to get a real-time events iterator, in addition to this
chat_response
iterator combined into one iterator, we can't
await
on the LLM's
astream_chat
. Perhaps instead, we pass to
achat_stream
a Callable that produces a
ChatResponseAsyncGen
instead of an actual
ChatResponseAsyncGen
and thus we can delay actually calling it until we begin the entire iteration process.
Perhaps the thread at the bottom of that function should be responsible for the
await self._llm.astream_chat(all_messages)
call and it then populates its own queue. This way this function can return before the LLM response begins
I think.... that makes sense haha. Doing my best to follow along.
If this can be put into a PR at somepoint, happy to merge/review/test ๐
I just cloned your repo but VSCode is a sea of red lines - mostly Pylance complaining but oddly Pylint complains it can't resolve any of the llama_index.*
imports
ha, my vscode works fine, but I can see that happening. Make sure vscode is using a venv with at least llama-index-core installed in it
I have my VSCode python interpreter pointing to ~/.cache/pypoetry/virtualenvs/llama-index-UPoQwng--py3.10/bin/python
and that helps. I gather that's the right spot as I think it's where poetry is keeping the virtual env.
Pylint is happy now.
Pylance is still complaining about loads of type errors, but as you pointed out in my last PR, the framework needs to support older versions of Python. Do you use the Pylance VSCode extension and do you have any special settings to make it happy?
Also, a question about Callback Traces. Would it be correct to assume that while a Trace is happening, we'd expect that for every on_event_start
called there would be a corresponding on_event_end
called as well? In other words, do we expect an even number of events to happen inside a Trace, half being "start events" and half being "end events"?
It appears inside a given Trace I can see a CBEventType.LLM
event start, then the Trace ends, then the CBEventType.LLM
event ends but this on_event_end
callback happens outside the Trace. This is unexpected.
I checked my extensions, and I only have pylint ๐
Yes, there should be an event end for every event start
Hmm, I could see that happening with async
Oh good call on the async. But I have tested this with sync as well and I believe it's happening.
And yes, I do see one end event for every start event, but the final CBEventType.LLM
end event lands outside the actual trace.
This is my event streamer callback handler
There is no async here (yet). My RAG flow uses a sync ContextChatEngine's stream_chat
to run the RAG pipe. The output of the print
statements in the event streamer suggests the CBEventType.LLM
end event occurs after I see end_trace
printed. This is unexpected.
Yea I think for async, the code leaves the trace (and the trace ends), but the final event still needs to be awaited.
In the underlying data structures, that final event should still correctly get logged under the original trace (... I think)
That would be my expectation too, but not what I'm seeing. I sense there is something async happening somewhere in my sync execution. I wonder if you'd be willing to try that above EventStreamer as a callback handler inside a RAG pipeline that uses a sync ContextChatEngine.stream_chat and confirm the order of print statements is as you expect.
We should see
start_trace
Added to Queue: Start ...
Added to Queue: End ...
end_trace
However, we see
start_trace
Added to Queue: Start ...
end_trace
Added to Queue: End ...
I think that lines up with my understanding. The LLM event starts/ends instantly (but only starts adding to the queue once the user iterates over it)
Your LLM event at the end is 0 seconds. I don't think it starts/ends instantly. I think the trace hasn't picked up the "end" event and is performing its duration calculations on a single LLM CBEvent, which keeps the duration to 0.0.
The function
_get_time_stats_from_event_pairs is getting passed a
List[CBEvent]
but it only has one LLM event inside (the start one). It should have two.
total_secs
starts as 0.0 and remains 0.0 because:
for event_pair in event_pairs:
start_time = datetime.strptime(event_pair[0].time, TIMESTAMP_FORMAT)
end_time = datetime.strptime(event_pair[-1].time, TIMESTAMP_FORMAT)
total_secs += (end_time - start_time).total_seconds()
adds 0 seconds to total_secs when there is only one event in
event_pairs
...because
event_pair[0]
and
event_pair[-1]
are the same object when there is only one element in the list.
From my observations, the "end" event for that LLM happens after the trace is completed. I haven't figured out why that is the case yet. The trace should end when the final LLM end event happens. The fact that it doesn't feels like an architecture bug. Or is this by design?
What if, instead of using a Queue system for the streamed response from the LLM, streamed responses were treated just like all the other events (i.e. there's a new "response event" produced for a non-streamed response and several for a streamed response)? You could still use the callback handler system to capture these "response events" and stream them back to the client just like the StreamingAgentChatRespone
generator. Perhaps the built-in Chat Engines would come equipt with a default callback handler that maintained its own queue of "response events" and it's that generator/queue that was returned when you access something like streamed_agent_chat_response.response_gen
. But then you could optionally swap that default callback handler for one that captures even more events and stream those back to the client as well, all from a single generator/iterator.
Perhaps it's wise to fire a special event to signal that the RAG pipe has been completed. Tracing would end when this final event was fired.
Does this event-based-llm-responses idea have any merit or does it conflict with any future plans for the framework? Is this idea clear? I'm keen to work on it but it's a huge project with possible breaking changes so I don't want to proceed if the folks at Llama-index have other ideas.
its definitely an issue with the design. Callbacks are hard haha
We could return streaming as an event, but I would say presvering the current UX (i.e. the user gets a generator returned to them) is important to me
The user could still call chat_engine.stream_chat(user_input)
and get back something that they can iterate over (aka a generator), except it's a generator of events, each with a payload and that payload includes the LLM response deltas.
This is every similar to what is currently there - except the thing you're iterating over is a series of CBEvent
s instead of a Queue inside a StreamingAgentChatResponse
it defnitely sounds like it could work! I'm open to the change ๐
Ok, it's quite a bit of work.
hence why it hasn't been touched in a bit ๐
Is this something that might be put on a LlamaIndex developer backlog or something that you'd expect your community to produce for you?
Its definitely in our backlog, but typing the payloads and increasing callback coverage are a touch higher priorities
Ok, I have thoughts on those too:
1) Each event should be its own class, extended from CBEvent
2) Each event should be a single event in time, not a pair of start and end events. You can have a RetrieverStart event and a RetrieverEnd event but these are different classes. Think of how JavaScript does it for inspiration. They mastered events and callbacks so let's not reinvent the wheel.
3) Payloads are just members of the class of the event. You get a schema/typing because your event is its own class
4) Folks can build their own events by extending CBEvent
5) There is already a fairly good pattern for events but maybe it becomes
some_event = MyEvent(prop1='foo', prop2='bar')
self.callback_manager.fire_event(some_event)
and folks are responsible for firing events whenever they think is the right time.
1) Agreed, this will help add types for the payloads
2) Agreed (and again, helps add specific types for start/end events)
3) Agreed
4) Yup (although I think custom events are not guaranteed to work in observability integrations, like arize)
5) Personally I kind of like the as_event()
syntax too. If a feature needs more precise control, they can call on_event_start/end
manually
although I see on_event_start/end is redundant if the event type has that info lol
5) I would think of an event class to be more like dataclasses and they shouldn't have functions - they just hold properties. If you wrap code in with...as event
then you're married to the idea of a single event having a start
and end
but the JS paradigm (as well as the error/exception rasing paradigm) is that you just fire an event at a specific moment and it doesn't spread itself over a block of code
You don't ever say (psuedocode)
with SomeException as exception:
try:
result = do_the_thing()
if (result > 10):
exception.when_fired(prop1 = result)
if (result > 1)
exception.raise()
Instead, you just raise the exception as a single burst.
Events should work like that too. They shouldn't spread over a block of code. They should be fired in one line.
Appreciate the feedback here -- bookmarking this thread in our notion doc hahaha
"If a feature needs more precise control, they can call on_event_start/end manually"
This contradicts point 2 - that events are a single moment in time and don't have starts/ends.
Ok so if we've reached alignment on these ideas (have we done that yet), then how do we divide and conquer?
Do you need to confer with other LlamaIndex folks first?
I don't want to start building PRs if this isn't the general direction that your team is heading in.
It's a ton of work on my end, especially since I'm far more unfamiliar with the codebase than you are
I think we agree! I'm on board
Yea if you want to be involved in this, I can pass along to the team member resonsible for handling this, and we can coordinate something?
We are still dealing with the v0.10.x fallout (rip), but I would suspect us starting on this sometime next week
Like....can I go back to my team and tell them "Llama Index is going to implement these [insert features]" and my team can depend on that happening?
Our company is invested in using LlamaIndex so I can give work hours to helping and I prolly have a few developers here that can chip in as well. I just want to make sure we're aligned and being effective
100% -- I agree with you (and your teams) vision for callbacks. I'd like to make this happen so that it works out for us + community + partner integrations
One concern I have is breaking changes -- either we publish an alpha version at some point and give partners time to adjust, or we support the old interface in some sort of backwards compatibility mode for a short time
Agreed - I've not worked on such a public framework so I understand supporting older APIs is a challenge
I think the class-based-event system would be a rather big change and would prolly break something somewhere
I think the StreamingAgentChatResponse as an iterator over CBEvents would be less of a breaking change
Yea agreed on both points there. I'm sure theres some sneaky magic (i.e. nasty hacks) we can do to maintain enough of the current interface for backward compatibility ๐
Then we remove the magic a month after lol