Find answers from the community

Updated 2 days ago

Asynchronous Workflow with Human-in-the-Loop Steps

Hello folks, i hope you are doing great.

I'm designing a Workflow with some HITL steps. I have successfully accomplished to break the streaming of events and capture the human input event.

At this point, most examples connect via input() or a websocket and then resume operations. But my use case needs to be asynhronous.

I need to be able to checkpoint the workflow, save it somewhere (e.g. a database) and pause execution.

Later, when i receive the response from the human, i want to load the checkpoint, set the new event and continue.

So far, i'm able to stop at the desired event, save the checkpoint and load it again.

THE PROBLEM:

Breaking the streaming loop to capture the event prevents the Workflow to mark such step as completed, causing the Checkpointer to not checkpoint the progress of the step that emitted the InputRequiredEvent.

Is it possible to accomplish this? Can i somehow force the checkpointer or mark the step as done?
F
3 comments
Hello Logan, i'm not trying to stop in the middle of a step.

Following the docs, i have broken the user interaction in 2 steps: the first one emits the input request event, the second one receives the response event.

This is my workflow:

Plain Text
class ChecklistWorkflow(Workflow):

    @step
    async def start(self, ctx: Context, ev: StartEvent) -> Step2TriggerEvent:
        await ctx.set("form_data", ev.form_data)
        await ctx.set("client_name", ev.client_name)

        return Step2Event()

    @step
    async def step_2(
        self, ctx: Context, ev: Step2TriggerEvent | Step2ResumeEvent
    ) -> Union[UserInputRequestEvent, TaskCompletedEvent]:

        form_data = await ctx.get("form_data")
        client_name = await ctx.get("client_name")

        result = do_something()

        if result["action"] == "USER_CLARIFICATION":
            return UserInputRequestEvent(prefix="", questions=result["questions"])
        elif result["action"] == "FINAL_ANSWER":
            return TaskCompletedEvent()

    @step
    async def step_3(
        self, ctx: Context, ev: UserInputResponseEvent | TaskCompletedEvent
    ) -> Step2ResumeEvent | StopEvent:
        """Either process human response or end of workflow"""
        if isinstance(ev, UserInputResponseEvent):
            print(f"Resuming from user input: {ev.response}")

            await ctx.set("user_response", ev.response)

            return Step2ResumeEvent()  # Back to step 2 to continue the task

        elif isinstance(ev, TaskCompletedEvent):
            print(f"Checklist completed: {ev.checklist}")
            return StopEvent(result=ev.checklist) # Task completed, finishing workflow


This is working fine. Then i'm intercepting the HITL event as suggested in the docs:

Plain Text
handler = wflow_ckptr.run(form_data=form_data, client_name="RecargaPay")

async for event in handler.stream_events():
    if isinstance(event, UserClarificationRequestEvent):
        break
The problem is that the handler seems to get in a pending status and doesn't mark step2 as completed, therefore the checkpoint is not triggered.
Plain Text
for run_id, ckpts in wflow_ckptr.checkpoints.items():
    print(
        f"Run: {run_id} has stored checkpoints for steps {[c.last_completed_step for c in ckpts]}"
    )
>>> Run: 7701c65e-bbee-4d76-8b2c-7224046052f3 has stored checkpoints for steps ['start']
Add a reply
Sign up and join the conversation on Discord