MainWorkflow
that checks if the context has a topic
field. If it's missing, it runs a TopicWorkflow
. If it's present, it runs a JokeWorkflow
.topic
field, all the events work fine. When I set the topic
field and run the workflow again, I somehow get a duplicate result
event from the first run, then no further events.from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Context, Event, ) class MessageEvent(Event): message: str class TopicWorkflow(Workflow): @step async def start(self, ctx: Context, event: StartEvent) -> StopEvent: print("TopicWorkflow: Asking for topic") ctx.write_event_to_stream(MessageEvent(message="MESSAGE FROM TOPIC WORKFLOW")) return StopEvent(result="Give me a topic and I'll tell you a joke!") class JokeWorkflow(Workflow): @step async def start(self, ctx: Context, event: StartEvent) -> StopEvent: print(f"JokeWorkflow: Telling joke about {event.topic}") ctx.write_event_to_stream(MessageEvent(message="MESSAGE FROM JOKE WORKFLOW")) return StopEvent(result=f"Here is a funny joke about {event.topic}") class MainWorkflow(Workflow): @step async def start( self, context: Context, event: StartEvent, topic_workflow: Workflow, joke_workflow: Workflow, ) -> StopEvent: topic = await context.get("topic", "") print(context) if topic == "": # Context has no topic; we need to ask for it print("MainWorkflow: Asking for topic") handler = topic_workflow.run() async for nested_event in handler.stream_events(): context.write_event_to_stream(nested_event) res = await handler else: # We have a topic, tell a joke about it print(f"MainWorkflow: Already have topic {topic}") handler = joke_workflow.run(topic=topic) async for nested_event in handler.stream_events(): context.write_event_to_stream(nested_event) res = await handler return StopEvent(result=res)
import asyncio from project_context_bootstrapping.workflows.test import ( MainWorkflow, TopicWorkflow, JokeWorkflow, ) from llama_index.core.workflow import Context async def main(): w = MainWorkflow(timeout=60, verbose=True) w.add_workflows( topic_workflow=TopicWorkflow(), joke_workflow=JokeWorkflow(), ) context = Context(workflow=w) # First workflow run with no topic print("Running workflow with no topic") handler = w.run(ctx=context) async for event in handler.stream_events(): print(f">>>>> EVENT (1): {event}") result = await handler print("FIRST RESULT: ", result) print("") # Set the joke topic print("Running workflow second time") await context.set("topic", "pirates") # Second workflow run with a topic in the context handler = w.run(ctx=context) async for event in handler.stream_events(): print(f">>>>> EVENT (2): {event}") result = await handler print("SECOND RESULT: ", result) if __name__ == "__main__": asyncio.run(main())
Running workflow with no topic Running step start <llama_index.core.workflow.context.Context object at 0x7f839953fbd0> MainWorkflow: Asking for topic TopicWorkflow: Asking for topic >>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW' >>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!" Step start produced event StopEvent FIRST RESULT: Give me a topic and I'll tell you a joke! Running workflow second time >>>>> EVENT (2): result="Give me a topic and I'll tell you a joke!" Running step start <llama_index.core.workflow.context.Context object at 0x7f839953fbd0> MainWorkflow: Already have topic pirates JokeWorkflow: Telling joke about pirates Step start produced event StopEvent SECOND RESULT: Here is a funny joke about pirates
Running workflow with no topic MainWorkflow: Asking for topic TopicWorkflow: Asking for topic >>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW' >>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!" FIRST RESULT: Give me a topic and I'll tell you a joke! Running workflow second time >>>>> EVENT (2): result="Give me a topic and I'll tell you a joke!" MainWorkflow: Already have topic pirates JokeWorkflow: Telling joke about pirates SECOND RESULT: Here is a funny joke about pirates
>>>>> EVENT (2): message='MESSAGE FROM JOKE WORKFLOW'
when I run it the second time, and I shouldn't see >>>>> EVENT (2): result="Give me a topic and I'll tell you a joke!"
as that is the result from the first run.Running workflow with no topic MainWorkflow: Asking for topic TopicWorkflow: Asking for topic >>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW' >>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!" FIRST RESULT: Give me a topic and I'll tell you a joke! Running workflow second time MainWorkflow: Already have topic pirates JokeWorkflow: Telling joke about pirates >>>>> EVENT (2): message='MESSAGE FROM JOKE WORKFLOW' >>>>> EVENT (2): result="Here is a funny joke about pirates" SECOND RESULT: Here is a funny joke about pirates
print(f"Streaming queue length: {context.streaming_queue.qsize()}")
print
statements).if topic == "": # Context has no topic; we need to ask for it print("MainWorkflow: Asking for topic") handler = topic_workflow.run() async for nested_event in handler.stream_events(): if isinstance(nested_event, StopEvent): nested_event = MessageEvent(message=nested_event.result) ctx.write_event_to_stream(nested_event) res = await handler
Running workflow with no topic Running step start MainWorkflow: Asking for topic TopicWorkflow: Asking for topic >>>>> EVENT (1): message='MESSAGE FROM TOPIC WORKFLOW' >>>>> EVENT (1): message="Give me a topic and I'll tell you a joke!" Step start produced event StopEvent >>>>> EVENT (1): result="Give me a topic and I'll tell you a joke!" FIRST RESULT: Give me a topic and I'll tell you a joke!
Running workflow second time Running step start MainWorkflow: Already have topic pirates JokeWorkflow: Telling joke about pirates >>>>> EVENT (2): message='MESSAGE FROM JOKE WORKFLOW' >>>>> EVENT (2): message='Here is a funny joke about pirates' Step start produced event StopEvent >>>>> EVENT (2): result='Here is a funny joke about pirates' SECOND RESULT: Here is a funny joke about pirates