That looks fine to me. If you test the workflow outside of fastapi, I would imagine it works fine.
I tested with this workflow
import asyncio
from llama_index.core.workflow import Context, Workflow, step, StartEvent, StopEvent, Event
from llama_index.llms.openai import OpenAI
class StreamingWorkflow(Workflow):
@step
async def stream(self, ctx: Context, ev: StartEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o-mini")
ctx.write_event_to_stream(Event(msg="Starting..."))
resp = await llm.acomplete("Hello, world!")
ctx.write_event_to_stream(Event(msg=resp.text))
ctx.write_event_to_stream(Event(msg="Stopping..."))
resp = await llm.acomplete("What is the weather in Tokyo?")
ctx.write_event_to_stream(Event(msg=resp.text))
return StopEvent(result="Done!")
async def main():
w = StreamingWorkflow()
handler = w.run()
async for ev in handler.stream_events():
if hasattr(ev, "msg"):
print(ev.msg)
result = await handler
print(result)
if __name__ == "__main__":
asyncio.run(main())
The above works fine in a terminal
For my fastapi endpoint, I just use StreamingResposne. I noticed that this will only stream chunks when it encounters a newline
@app.get("/")
async def root():
w = StreamingWorkflow()
handler = w.run()
async def stream():
async for ev in handler.stream_events():
if hasattr(ev, "msg"):
yield ev.msg + "\n"
return StreamingResponse(stream(), media_type="text/plain")
You can even get crazier and do
llm.astream_complete()
and write each token as a stream event