Find answers from the community

Updated 2 months ago

Cleanup

Hi, I was looking into the workflow code https://github.com/run-llama/llama_index/blob/9083c6d199443076bc9d764022d4c98260d8e504/llama-index-core/llama_index/core/workflow/workflow.py#L317 and i would like to know what's should be the best way to clean up a task after an interruption.

I did prepare a sample, where the clean up of an interrupted task takes some time, in the output of python clean_up_task_sample.py you can see that step1 finished and step2 finished is printed after finally.

setup1 starting setup2 starting setup1 finished setup2 finished step1 starting step2 starting step1 finishing step2 finishing except finally step1 finished step2 finished loop 0 loop 1 loop 2

i did tried adding await asyncio.gather(*unfinished, return_exceptions=True) after the tasks are cancelled, and that fixed the issue, but maybe wait_for(asyncio.gather( could make more sense to avoid blocking to long.

Should i maybe open a Feature Request or a Bug Report?

Thanks
L
j
t
18 comments
What I mean is that the function/tasks are still running, the CancelledError was raised within each function, but due to a cleanup that takes some time, the function did not finish before the print statement after after _ = await w.run()
Shouldn't the line of code I linked to cancel the tasks though?
yes, a CancelledError will be raised inside each of the unfinished tasks, but if i want to do some clean ups that takes some time, _run_workflow will not wait for the unfinished tasks to be finished
I don't actually know how to "wait" after you can cancel on a task, I don't think you can call await task after calling cancel?
oh actually I have a similar question! Glad someone in the community already raised this.

How do I cancel a task on LlamaDeploy (and consequently Workflows)?
@titus You might need to provide a few more details

Once you kick off a run for a workflow, it will run until either it times out or it returns
@Logan M this is what I have seen

Plain Text
for t in unfinished:
    t.cancel()
await asyncio.gather(*unfinished, return_exceptions=True)


Plain Text
for t in unfinished:
    t.cancel()
await asyncio.shield(asyncio.gather(*unfinished, return_exceptions=True))


Plain Text
for t in unfinished:
    t.cancel()
with contextlib.suppress(asyncio.CancelledError):
    await asyncio.gather(*tasks, return_exceptions=True)
ah nice, I think I can add that to the repo
thanks for the tip!
ah thanks @jpizarrom @Logan M!

I was thinking some users might just type the wrong thing and want to cancel the task on the frontend. Then they'll want to start a new task almost immediately (i.e. write a new message) which will queue the message on top of the previously executing message.
I'm using a library known as Chainlit for LLM frontend tasks and this library allows for users to click the "stop" button at the bottom right of the screen as the agents are executing - this could be because of anything like a typo on their end or they've figured out the answer or the agents have ran too long (which happens when the multi-agent systems are really really large).

Clicking "stop" unfreezes the message bar and returns a message stating that the task was stopped - so users can type again. But chainlit does have an "on_stop" callback that I was hoping to use to send a message to my workflow deployment on llama-deploy to stop the workflow's current execution for this user's session.

https://docs.chainlit.io/concepts/chat-lifecycle#on-stop
This is what happens as the agents are running (notice the stop button in the bottom right corner)
I think for this to happen, we need an explicit "cancel" method on a workflow
oh yes please! haha. that'll be great! Do I raise a feature request on GitHub?
Add a reply
Sign up and join the conversation on Discord