Find answers from the community

Updated 9 months ago

**`self.state.get_task()` hangs when

self.state.get_task() hangs when called in a sub-thread

Hi all,

I'm working on the idea of giving Agents a sense of "back-burner", in the sense that it can "sleep on an idea" / "wait for the right time to come" for a certain task while responding to users' other queries.

The design: I kick off a separate thread that keeps a separate asyncio event loop spinning.

Plain Text
class DeferrableReActAgent(ReActAgent):
    def __init__(...) -> None:
        super().__init__(...)
        self.agent_worker = DeferrableReActAgentWorker.from_tools(...)
        self.deferral_loop = asyncio.new_event_loop()
        def __start_background_loop() -> None:
            asyncio.set_event_loop(self.deferral_loop)
            self.deferral_loop.run_forever()
        deferral_thread = Thread(target=__start_background_loop, daemon=True)
        deferral_thread.start()


I override the method _achat, such that -- when it deems that the it's not yet the right time to carry out a task -- it kicks off a clone of the same task in that sub-thread after 4 secs:
...
V
k
6 comments
Plain Text
    async def _achat(...):
        ...
        while True:
            ...
            if cur_step_output.should_defer:
                spun_off_task = self.create_task(
                    task.input,
                    extra_state=task.extra_state,
                )
                # Run the spun-off task after waiting for 4 seconds.
                promise_to_run_task_after_wait = self._achat_from_task_after_wait(
                    spun_off_task
                )
                # Fulfill this promise in the separate thread we prepared.
                asyncio.run_coroutine_threadsafe(
                    promise_to_run_task_after_wait, self.deferral_loop
                )


Demo: Here's the complete code: https://github.com/tslmy/agent/pull/2/files. Run tool_for_backburner.py to see a proof-of-concept. The user asks the agent to walk the dog, the agent sees that the dog doesn't want to go outside because of the rain, and then the agent defers the dog-walking task to the sub-thread, returning the chat loop to the user.

Symptom: After the 4-sec sleep, while handling the deferred task, the code hangs at this line in agent.runner.base.AgentRunner._arun_step:

Plain Text
task = self.state.get_task(task_id)


I know it hangs on this line because I tapped a debugger into it, but -- due to the nature of debugger -- there is a slight chance that it is pointing me at the wrong line of code.

I'm not very familiar with async programming in Python, so my knowledge could only carry me this far. I'd appreciate any help on this front. Thanks!
@kapa.ai , any pointers?
Let me check the knowledge base for you. One moment! :books:
----------------------
I'm sorry, but your question is quite vague. Could you please provide more details or context? I'm here to help with questions related to LlamaIndex.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@Logan M , this attempt in creating a custom Agent followed a discussion between @jerryjliu0 and me during the Rag-a-thon, which also led to https://github.com/run-llama/llama_index/pull/10450 that you reviewed. I wonder if you'd have more pointers from the maintainer's perspective.
@jerryjliu0 -- Is async programming something you still willing to pair-program for? Please DM me to figure out a time slot.
Add a reply
Sign up and join the conversation on Discord