Find answers from the community

Updated 2 months ago

Hi, I'm having trouble getting a nested

Hi, I'm having trouble getting a nested workflow working properly with event streaming. The workflow runs fine, but the event streaming is behaving strangely.

The workflow I'm testing is a variation on the Joke example from the documentation. I have a MainWorkflow that checks if the context has a topic field. If it's missing, it runs a TopicWorkflow. If it's present, it runs a JokeWorkflow.

The first time I run the workflow without a topic field, all the events work fine. When I set the topic field and run the workflow again, I somehow get a duplicate result event from the first run, then no further events.

I'll paste my code in a thread. I'd really appreciate some help!
r
L
a
21 comments
Workflow code:

Plain Text
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context,
    Event,
)


class MessageEvent(Event):
    message: str


class TopicWorkflow(Workflow):
    @step
    async def start(self, ctx: Context, event: StartEvent) -> StopEvent:
        print("TopicWorkflow: Asking for topic")
        ctx.write_event_to_stream(MessageEvent(message="MESSAGE FROM TOPIC WORKFLOW"))
        return StopEvent(result="Give me a topic and I'll tell you a joke!")


class JokeWorkflow(Workflow):
    @step
    async def start(self, ctx: Context, event: StartEvent) -> StopEvent:
        print(f"JokeWorkflow: Telling joke about {event.topic}")
        ctx.write_event_to_stream(MessageEvent(message="MESSAGE FROM JOKE WORKFLOW"))
        return StopEvent(result=f"Here is a funny joke about {event.topic}")


class MainWorkflow(Workflow):
    @step
    async def start(
        self,
        context: Context,
        event: StartEvent,
        topic_workflow: Workflow,
        joke_workflow: Workflow,
    ) -> StopEvent:
        topic = await context.get("topic", "")

        print(context)

        if topic == "":
            # Context has no topic; we need to ask for it
            print("MainWorkflow: Asking for topic")
            handler = topic_workflow.run()
            async for nested_event in handler.stream_events():
                context.write_event_to_stream(nested_event)

            res = await handler
        else:
            # We have a topic, tell a joke about it
            print(f"MainWorkflow: Already have topic {topic}")
            handler = joke_workflow.run(topic=topic)
            async for nested_event in handler.stream_events():
                context.write_event_to_stream(nested_event)

            res = await handler

        return StopEvent(result=res)
Main:
Plain Text
import asyncio
from project_context_bootstrapping.workflows.test import (
    MainWorkflow,
    TopicWorkflow,
    JokeWorkflow,
)
from llama_index.core.workflow import Context


async def main():
    w = MainWorkflow(timeout=60, verbose=True)
    w.add_workflows(
        topic_workflow=TopicWorkflow(),
        joke_workflow=JokeWorkflow(),
    )
    context = Context(workflow=w)

    # First workflow run with no topic
    print("Running workflow with no topic")
    handler = w.run(ctx=context)
    async for event in handler.stream_events():
        print(f">>>>> EVENT (1): {event}")
    result = await handler
    print("FIRST RESULT: ", result)
    print("")

    # Set the joke topic
    print("Running workflow second time")
    await context.set("topic", "pirates")

    # Second workflow run with a topic in the context
    handler = w.run(ctx=context)
    async for event in handler.stream_events():
        print(f">>>>> EVENT (2): {event}")
    result = await handler
    print("SECOND RESULT: ", result)


if __name__ == "__main__":
    asyncio.run(main())
Output:
Plain Text
Running workflow with no topic
Running step start
<llama_index.core.workflow.context.Context object at 0x7f839953fbd0>
MainWorkflow: Asking for topic
TopicWorkflow: Asking for topic
>>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW'
>>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!"
Step start produced event StopEvent
FIRST RESULT:  Give me a topic and I'll tell you a joke!

Running workflow second time
>>>>> EVENT (2): result="Give me a topic and I'll tell you a joke!"
Running step start
<llama_index.core.workflow.context.Context object at 0x7f839953fbd0>
MainWorkflow: Already have topic pirates
JokeWorkflow: Telling joke about pirates
Step start produced event StopEvent
SECOND RESULT:  Here is a funny joke about pirates
I logged the context object during each run. It is the same object each time.

Strangely, if I run the workflow a third time with no changes, I get the events as expected. If I run it a fourth time, the problem reappears.
I ran the code as you pasted, but set verbose=False, and removed the context print

Here's the output
Plain Text
Running workflow with no topic
MainWorkflow: Asking for topic
TopicWorkflow: Asking for topic
>>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW'
>>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!"
FIRST RESULT:  Give me a topic and I'll tell you a joke!

Running workflow second time
>>>>> EVENT (2): result="Give me a topic and I'll tell you a joke!"
MainWorkflow: Already have topic pirates
JokeWorkflow: Telling joke about pirates
SECOND RESULT:  Here is a funny joke about pirates


I'm not sure what you mean by a duplicate result? It runs once, gets the topic, runs again, generates a joke with topic πŸ€”
The workflow is producing the expected result, but the events I'm emitting aren't getting through. I should see >>>>> EVENT (2): message='MESSAGE FROM JOKE WORKFLOW' when I run it the second time, and I shouldn't see >>>>> EVENT (2): result="Give me a topic and I'll tell you a joke!" as that is the result from the first run.
i.e. The expected output is:

Plain Text
Running workflow with no topic
MainWorkflow: Asking for topic
TopicWorkflow: Asking for topic
>>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW'
>>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!"
FIRST RESULT:  Give me a topic and I'll tell you a joke!

Running workflow second time
MainWorkflow: Already have topic pirates
JokeWorkflow: Telling joke about pirates
>>>>> EVENT (2): message='MESSAGE FROM JOKE WORKFLOW'
>>>>> EVENT (2): result="Here is a funny joke about pirates"
SECOND RESULT:  Here is a funny joke about pirates
Hmmm, seems like a bug, maybe to do with streaming queue not being exhausted? I added some code

print(f"Streaming queue length: {context.streaming_queue.qsize()}")
After the first run, there's still elements in the queue πŸ€”
There was a recent PR that touched some of this logic... I wonder if downgrading one or two versions would help πŸ€”
seems like duplicate stop events are making their way into the streaming queue,
I'll give it a try. I had the same issue with the previous version and upgraded to the current one yesterday, but will drop back a couple and see if it persists.
It did seem to me like it could have been a bug. If you run the workflow a few more times without doing anything to the context, the events come through (but appear out of order, before the print statements).
I think i kind of know where the issue is. I can debug/make a patch tomorrow morning (it's 8pm here lol)
No worries, enjoy your evening. Thanks for your help!
Okay, so I did some digging on this. The issue stems from how we currently handle streaming events:

  • we have a while loop that breaks as soon as we hit the first StopEvent (which was in the most recent PR that Logan referred to. Before that we wouldn't even stream any StopEvent and things would get cloggged in the queue)
  • this doesn't bode too well if you have nested workflows, and, you write the events of that nested workflow to the main workflows context β€” and this is the case for your example
Ultimately, what we end up doing here is we're putting the nested workflows StopEvent into the main workflow's streaming queue and our current logic breaks on first occurrence of StopEvent
So, we perhaps need to figure out a better way to handle this in our library...but for now you can catch the StopEvent of the nested workflow and change it to another event type i.e., your custom MessageEvent before putting it into the main workflows ctx.streaming_queue.

Plain Text
    if topic == "":
        # Context has no topic; we need to ask for it
        print("MainWorkflow: Asking for topic")
        handler = topic_workflow.run()
        async for nested_event in handler.stream_events():
            if isinstance(nested_event, StopEvent):
                nested_event = MessageEvent(message=nested_event.result)
            ctx.write_event_to_stream(nested_event)

        res = await handler
(do the same in the else block and everything should work as expected.)
Returned output

Plain Text
Running workflow with no topic
Running step start
MainWorkflow: Asking for topic
TopicWorkflow: Asking for topic
>>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW'
>>>>> EVENT (1): message="Give me a topic and I'll tell you a joke!"
Step start produced event StopEvent
>>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!"
FIRST RESULT:  Give me a topic and I'll tell you a joke!


and

Plain Text
Running workflow second time
Running step start
MainWorkflow: Already have topic pirates
JokeWorkflow: Telling joke about pirates
>>>>> EVENT (2): message='MESSAGE FROM JOKE WORKFLOW'
>>>>> EVENT (2): message='Here is a funny joke about pirates'
Step start produced event StopEvent
>>>>> EVENT (2): result='Here is a funny joke about pirates'
SECOND RESULT:  Here is a funny joke about pirates
Thanks @andrei, I'll give that a try.

Is there a reason why only StopEvents are streamed by default? It seems like it would be useful to stream all events, as I've found those are the points where you would want to report feedback to the user when significant things happen.

I don't know the internals of how the streaming works though, so that may be a naive suggestion. It sounds like there's a queue that gets clogged up if nothing is consuming the events?
@rohoon tbh its debatle if the stop-event should be streamed or not. Mostly wanted to make it an intentional API (you pick what gets streamed)

But when streaming a nested workflow, the stop event from the nested workflow is stopping the steam early, while meanwhile, the real stopevent from the top level goes into the streaming queue, such that on the next run the stream stops immediately

I hope that makes sense πŸ˜… Basically, there needs to be a way to avoid letting stop events from nested workflows from stopping the stream, which is what the workaround suggested is doing
Add a reply
Sign up and join the conversation on Discord