Find answers from the community

Updated last month

Release

When is the next release
L
O
15 comments
Oh yea, I need to do that. So freaking tedious lol
Will try to tonight
haha, come on
I'm ready to replace my AI custom workflow with a llama index workflow, ALL IN !
What I'm doing is like a mock of llama index, why just all in :LlamaIndex:
ugh that takes so long. need to automate this more lol
https://github.com/run-llama/llama_index/pull/16919
I am here again; I am using the workflow followed by this (https://docs.llamaindex.ai/en/stable/examples/workflow/self_discover_workflow/), I found the events written to the stream were not sent immediately when the workflow ended, the stream_events() methods receive a batch of events suddenly after the workflow finished, is that a coroutine dispatching problem? Or was my wrong using
workflow code:
Plain Text
@step
    async def get_modules(self, ctx: Context, ev: StartEvent) -> GetModulesEvent:
        """Get modules step."""
        # get input data, store llm into ctx
        task = ev.get("task")

        if task is None:
            raise ValueError("'task' argument is required.")

        # format prompt and get result from LLM
        prompt = SELECT_PRMOPT_TEMPLATE.format(
            task=task, reasoning_modules=_REASONING_MODULES
        )

        print(f"==== begin get modules =====>> {self.get_elapsed_time()}")

        ctx.write_event_to_stream(
            ChatEvent(
                message=MessageFactory.text_progress_message("ๆญฃๅœจ็”Ÿๆˆๆ€่€ƒๆจกๅ—..."),
            ),
        )

        print(f"==== begin get modules =====>> {self.get_elapsed_time()}")

        result = await self.llm.acomplete(prompt)

        print(f"==== end get modules =====>> {self.get_elapsed_time()}")

        ctx.write_event_to_stream(
            ChatEvent(
                message=MessageFactory.text_progress_message("ๆ€่€ƒๆจกๅ—็”ŸๆˆๅฎŒๆˆ"),
            ),
        )

        print(f"==== end get modules =====>> {self.get_elapsed_time()}")

        return GetModulesEvent(task=task, modules=str(result))


using code:
Plain Text
@router.post("/api/chat/demo")
async def demo(
    request: Request,
):
    body = await request.json()
    query = body.get("query") or ""

    async def generate_events():
        workflow = SelfDiscoverWorkflow(
            llm=get_llm(),
            timeout=None,
        )
        handler = workflow.run(
            task=query,
        )
        async for ev in handler.stream_events():
            if isinstance(ev, ChatEvent):
                yield SSEResponse.format_string(
                    SSEResponse.EventType.DATA,
                    ev.message,
                )

        await handler

    return SSEResponse.send(generate_events())


SSEResponse.send is a wrapper of FastAPI steaming response
That looks fine to me. If you test the workflow outside of fastapi, I would imagine it works fine.

I tested with this workflow

Plain Text
import asyncio
from llama_index.core.workflow import Context, Workflow, step, StartEvent, StopEvent, Event
from llama_index.llms.openai import OpenAI

class StreamingWorkflow(Workflow):
    @step
    async def stream(self, ctx: Context, ev: StartEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o-mini")
        
        ctx.write_event_to_stream(Event(msg="Starting..."))
        resp = await llm.acomplete("Hello, world!")
        ctx.write_event_to_stream(Event(msg=resp.text))

        ctx.write_event_to_stream(Event(msg="Stopping..."))
        resp = await llm.acomplete("What is the weather in Tokyo?")
        ctx.write_event_to_stream(Event(msg=resp.text))

        return StopEvent(result="Done!")
    
async def main():
    w = StreamingWorkflow()
    handler = w.run()
    async for ev in handler.stream_events():
        if hasattr(ev, "msg"):
            print(ev.msg)
    result = await handler
    print(result)

if __name__ == "__main__":
    asyncio.run(main())


The above works fine in a terminal

For my fastapi endpoint, I just use StreamingResposne. I noticed that this will only stream chunks when it encounters a newline

Plain Text
@app.get("/")
async def root():
    w = StreamingWorkflow()
    handler = w.run()

    async def stream():
        async for ev in handler.stream_events():
            if hasattr(ev, "msg"):
                yield ev.msg + "\n"

    return StreamingResponse(stream(), media_type="text/plain")


You can even get crazier and do llm.astream_complete() and write each token as a stream event
I've never used SSE before with fastapi tbh, only Respnse, StreamingResponse, and websockets.
Thanks for your help. I tested my code in Terminal, and it works as well. Maybe it's related to the SSE used in FastAPI. I will check it going forward. Thanks again!
I found the problem is when the method write_event_to_stream finishes executing, it should run the await LLM.acomplete method, but it doesn't release the CPU to other coroutines; if I add a line async.sleep(0.1), the program works. Only appear in FastAPI
I debugged the ctx.steam_queue, and it continued increasing until the stop_event sent
How come my fastapi code worked fine?
I didn't have any sleep calls, and the output seemed correct when hitting the endpoint
Add a reply
Sign up and join the conversation on Discord