Find answers from the community

Updated 2 days ago

Retrieving Chunks as ChatResponse Objects from an Agent

Hello there!
I'm experiencing an issue - i want to retrieve chunks in form of ChatResponse objects from an agent.
i did a following:
Plain Text
response_generator = self.agent.stream_chat(message=messages[-1].content, chat_history=messages[:-1]).chat_stream
                for token in response_generator:
                    yield token

but i'm getting:
Plain Text
ValueError: generator already executing

when using response_gen instead of chat_stream it's working flawless. However, i truly need that ChatResponse objects
b
L
9 comments
Maybe it would be useful if chunks were put on queue as a whole, not just the delta :c
I think the syntax is wrong here?

It should be

Plain Text
resp = agent.stream_chat(...)
for r in resp.response_gen:
  print(r.delta, end="", flush=True)
The thing is i want to directly access ChatCompletionChunk objects, not the strings returned from response_gen property. That's why i have tried to read chat_stream directly
However, I've already noticed that AgentWorker is running a tread
Plain Text
            thread = Thread(
                target=agent_response_stream.write_response_to_history,
                args=(task.extra_state["new_memory"],),
                kwargs={"on_stream_end_fn": partial(self.finalize_task, task)},
            )
            thread.start()

in which agent_response_stream.write_response_to_history is already consuming chat_stream generator
What's more, it's not even possible to directly retrieve ChatCompletionChunk objects from the queue, as only the delta is put there:
Plain Text
    def write_response_to_history(
        self,
        memory: BaseMemory,
        on_stream_end_fn: Optional[Callable] = None,
    ) -> None:
        if self.chat_stream is None:
            raise ValueError(
                "chat_stream is None. Cannot write to history without chat_stream."
            )

        # try/except to prevent hanging on error
        dispatcher.event(StreamChatStartEvent())
        try:
            final_text = ""
            for chat in self.chat_stream:
                self.is_function = is_function(chat.message)
                if chat.delta:
                    dispatcher.event(
                        StreamChatDeltaReceivedEvent(
                            delta=chat.delta,
                        )
                    )
                    self.put_in_queue(chat.delta)
[...]
@Logan M is there any reason why whole chunks cannot be put on the queue? It might be handy while being easy to implement at first glance
Tbh, i would use the newer AgentWorkflow -- it exposes way more over it's streaming api, and is arguably better engineered overall (eventually these other agent classes will be deprecated anyways)
Add a reply
Sign up and join the conversation on Discord