Find answers from the community

Updated 2 weeks ago

Safely Canceling an Asynchronous Workflow

if i ran workflow in a task asyncio.create_task(workflow.run()) how do i safely cancel it
L
D
8 comments
Plain Text
task = asyncio.create_task(workflow.run())
task.cancel()
await task


If you don't use create task, you can also cancel with the handler

Plain Text
handler = workflow.run()

# the workflow isn't actually running yet, until you call await 
# you could iterate over events: async for ev in handler.stream_events():
# and then cancel based on some condition

await handler.cancel_run()
Any clue why is the handler treated like a coroutine
Ok the problem was version of the llama index i had 0.11.8 and in that version run didnt return Handlers
I updated it now and it seems to cancel it successfully, although now my main code is not working πŸ™‚ got to fix that now
I have a question again, firstly if I'm not mistaken your first example now no longer works, since it is not a coroutine, and secondly with your second approach does that basically allow concurent work as if it is in a task
Another question a bit off topic

async for ev in workflow.stream_events():
if isinstance(ev, ProgressEvent):
chunk = ev.msg
yield chunk

does this ensures that if there are multiple workflows running but each one for different user in parallel even though they are all creating ProgressEvent class events
only the ones from that user are being caught
Yea true. Its still async though so yes, concurrency works fine. Its returning a subclassed asyncio.Future object. If you really need create_task, you can still use it

Plain Text
async def run_workflow():
  result = await workflow.run()
  return result

task = asyncio.create_task(run_workflow)


But that seems a little redudnant, its already async πŸ˜…

To your second question, workflow.stream_events() is deprecated, if you need to handle multiple users, you should use handler.stream_events() -- this way, each workflow.run() is independant
ok great thanks, now everything works perfectly, this was the problem:
workflow.stream_events(), it messed with cancel_run() too
Add a reply
Sign up and join the conversation on Discord