from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) import asyncio # Import asyncio to run the async functions class WaitExampleFlow(Workflow): @step async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent: if hasattr(ev, "data"): ctx.data["some_database"] = ev.data return StopEvent(result=None) @step async def query(self, ctx: Context, ev: StartEvent) -> StopEvent | None: if hasattr(ev, "query"): # do we have any data? if "some_database" in ctx.data: data = ctx.data["some_database"] return StopEvent(result=f"Got the data {data}") else: # there's non data yet return None else: # this isn't a query return None async def main(): w = WaitExampleFlow(verbose=True) result = await w.run(query="Can I kick it?") if result is None: print("No you can't") print("---") result = await w.run(data="Yes you can") print("---") result = await w.run(query="Can I kick it?") print(result) if __name__ == "__main__": asyncio.run(main())
ctx.data["some_database"]
does not return any value the key is missingrun()
calls afaik (yet, anyways, working on a session UX)from llama_index.core.workflow import ( Context, Workflow, StartEvent, StopEvent, step, ) import asyncio # Import asyncio to run the async functions class QueryEvent(Event): query: str class WaitExampleFlow(Workflow): @step async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent: if "data" not in ev.data: raise ValueError("Data is missing") ctx.data["some_database"] = ev.data if "query" not in ev.data: raise ValueError("Query is missing") return QueryEvent(query=ev.query) @step async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent: data = ctx.data["some_database"] return StopEvent(result=f"Got the data {data}") async def main(): w = WaitExampleFlow(verbose=True) result = await w.run(query=query, data=data) if __name__ == "__main__": asyncio.run(main())
.run()
on the same instance concurrently (i.e asyncio,gather(w.run(), w.run())