Find answers from the community

Updated 2 months ago

Asynchronous Workflow with Human-in-the-Loop Steps

At a glance
The community member is designing a workflow with HITL (Human-in-the-Loop) steps. They have successfully broken the streaming of events and captured the human input event. However, their use case requires asynchronous operation, where they need to checkpoint the workflow, save it, and pause execution. Later, when they receive the response from the human, they want to load the checkpoint, set the new event, and continue.

The community member has been able to stop at the desired event, save the checkpoint, and load it again. However, the problem is that breaking the streaming loop to capture the event prevents the Workflow from marking the step as completed, causing the Checkpointer to not checkpoint the progress of the step that emitted the InputRequiredEvent. The community member is asking if it is possible to accomplish this and if they can somehow force the checkpointer or mark the step as done.

In the comments, the community member provides more details about their workflow implementation, which involves breaking the user interaction into two steps: the first one emits the input request event, and the second one receives the response event. The community member is intercepting the HITL event as suggested in the documentation, but the problem is that the handler seems to get in a pending status and doesn't mark step2 as completed, therefore the checkpoint is not triggered.

Hello folks, i hope you are doing great.

I'm designing a Workflow with some HITL steps. I have successfully accomplished to break the streaming of events and capture the human input event.

At this point, most examples connect via input() or a websocket and then resume operations. But my use case needs to be asynhronous.

I need to be able to checkpoint the workflow, save it somewhere (e.g. a database) and pause execution.

Later, when i receive the response from the human, i want to load the checkpoint, set the new event and continue.

So far, i'm able to stop at the desired event, save the checkpoint and load it again.

THE PROBLEM:

Breaking the streaming loop to capture the event prevents the Workflow to mark such step as completed, causing the Checkpointer to not checkpoint the progress of the step that emitted the InputRequiredEvent.

Is it possible to accomplish this? Can i somehow force the checkpointer or mark the step as done?
F
3 comments
Hello Logan, i'm not trying to stop in the middle of a step.

Following the docs, i have broken the user interaction in 2 steps: the first one emits the input request event, the second one receives the response event.

This is my workflow:

Plain Text
class ChecklistWorkflow(Workflow):

    @step
    async def start(self, ctx: Context, ev: StartEvent) -> Step2TriggerEvent:
        await ctx.set("form_data", ev.form_data)
        await ctx.set("client_name", ev.client_name)

        return Step2Event()

    @step
    async def step_2(
        self, ctx: Context, ev: Step2TriggerEvent | Step2ResumeEvent
    ) -> Union[UserInputRequestEvent, TaskCompletedEvent]:

        form_data = await ctx.get("form_data")
        client_name = await ctx.get("client_name")

        result = do_something()

        if result["action"] == "USER_CLARIFICATION":
            return UserInputRequestEvent(prefix="", questions=result["questions"])
        elif result["action"] == "FINAL_ANSWER":
            return TaskCompletedEvent()

    @step
    async def step_3(
        self, ctx: Context, ev: UserInputResponseEvent | TaskCompletedEvent
    ) -> Step2ResumeEvent | StopEvent:
        """Either process human response or end of workflow"""
        if isinstance(ev, UserInputResponseEvent):
            print(f"Resuming from user input: {ev.response}")

            await ctx.set("user_response", ev.response)

            return Step2ResumeEvent()  # Back to step 2 to continue the task

        elif isinstance(ev, TaskCompletedEvent):
            print(f"Checklist completed: {ev.checklist}")
            return StopEvent(result=ev.checklist) # Task completed, finishing workflow


This is working fine. Then i'm intercepting the HITL event as suggested in the docs:

Plain Text
handler = wflow_ckptr.run(form_data=form_data, client_name="RecargaPay")

async for event in handler.stream_events():
    if isinstance(event, UserClarificationRequestEvent):
        break
The problem is that the handler seems to get in a pending status and doesn't mark step2 as completed, therefore the checkpoint is not triggered.
Plain Text
for run_id, ckpts in wflow_ckptr.checkpoints.items():
    print(
        f"Run: {run_id} has stored checkpoints for steps {[c.last_completed_step for c in ckpts]}"
    )
>>> Run: 7701c65e-bbee-4d76-8b2c-7224046052f3 has stored checkpoints for steps ['start']
Add a reply
Sign up and join the conversation on Discord