Find answers from the community

Updated 3 months ago

Release

At a glance

The post asks "When is the next release?", and the comments discuss various issues related to using the LlamaIndex library and its workflows. Community members share their experiences, including difficulties with automating tasks, issues with event streaming in FastAPI, and differences in behavior between running the code in a terminal versus in a FastAPI application. While there is no explicitly marked answer, the comments suggest that the community members are trying to troubleshoot and understand the underlying issues they are facing.

Useful resources
When is the next release
L
O
15 comments
Oh yea, I need to do that. So freaking tedious lol
Will try to tonight
haha, come on
I'm ready to replace my AI custom workflow with a llama index workflow, ALL IN !
What I'm doing is like a mock of llama index, why just all in :LlamaIndex:
ugh that takes so long. need to automate this more lol
https://github.com/run-llama/llama_index/pull/16919
I am here again; I am using the workflow followed by this (https://docs.llamaindex.ai/en/stable/examples/workflow/self_discover_workflow/), I found the events written to the stream were not sent immediately when the workflow ended, the stream_events() methods receive a batch of events suddenly after the workflow finished, is that a coroutine dispatching problem? Or was my wrong using
workflow code:
Plain Text
@step
    async def get_modules(self, ctx: Context, ev: StartEvent) -> GetModulesEvent:
        """Get modules step."""
        # get input data, store llm into ctx
        task = ev.get("task")

        if task is None:
            raise ValueError("'task' argument is required.")

        # format prompt and get result from LLM
        prompt = SELECT_PRMOPT_TEMPLATE.format(
            task=task, reasoning_modules=_REASONING_MODULES
        )

        print(f"==== begin get modules =====>> {self.get_elapsed_time()}")

        ctx.write_event_to_stream(
            ChatEvent(
                message=MessageFactory.text_progress_message("ๆญฃๅœจ็”Ÿๆˆๆ€่€ƒๆจกๅ—..."),
            ),
        )

        print(f"==== begin get modules =====>> {self.get_elapsed_time()}")

        result = await self.llm.acomplete(prompt)

        print(f"==== end get modules =====>> {self.get_elapsed_time()}")

        ctx.write_event_to_stream(
            ChatEvent(
                message=MessageFactory.text_progress_message("ๆ€่€ƒๆจกๅ—็”ŸๆˆๅฎŒๆˆ"),
            ),
        )

        print(f"==== end get modules =====>> {self.get_elapsed_time()}")

        return GetModulesEvent(task=task, modules=str(result))


using code:
Plain Text
@router.post("/api/chat/demo")
async def demo(
    request: Request,
):
    body = await request.json()
    query = body.get("query") or ""

    async def generate_events():
        workflow = SelfDiscoverWorkflow(
            llm=get_llm(),
            timeout=None,
        )
        handler = workflow.run(
            task=query,
        )
        async for ev in handler.stream_events():
            if isinstance(ev, ChatEvent):
                yield SSEResponse.format_string(
                    SSEResponse.EventType.DATA,
                    ev.message,
                )

        await handler

    return SSEResponse.send(generate_events())


SSEResponse.send is a wrapper of FastAPI steaming response
That looks fine to me. If you test the workflow outside of fastapi, I would imagine it works fine.

I tested with this workflow

Plain Text
import asyncio
from llama_index.core.workflow import Context, Workflow, step, StartEvent, StopEvent, Event
from llama_index.llms.openai import OpenAI

class StreamingWorkflow(Workflow):
    @step
    async def stream(self, ctx: Context, ev: StartEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o-mini")
        
        ctx.write_event_to_stream(Event(msg="Starting..."))
        resp = await llm.acomplete("Hello, world!")
        ctx.write_event_to_stream(Event(msg=resp.text))

        ctx.write_event_to_stream(Event(msg="Stopping..."))
        resp = await llm.acomplete("What is the weather in Tokyo?")
        ctx.write_event_to_stream(Event(msg=resp.text))

        return StopEvent(result="Done!")
    
async def main():
    w = StreamingWorkflow()
    handler = w.run()
    async for ev in handler.stream_events():
        if hasattr(ev, "msg"):
            print(ev.msg)
    result = await handler
    print(result)

if __name__ == "__main__":
    asyncio.run(main())


The above works fine in a terminal

For my fastapi endpoint, I just use StreamingResposne. I noticed that this will only stream chunks when it encounters a newline

Plain Text
@app.get("/")
async def root():
    w = StreamingWorkflow()
    handler = w.run()

    async def stream():
        async for ev in handler.stream_events():
            if hasattr(ev, "msg"):
                yield ev.msg + "\n"

    return StreamingResponse(stream(), media_type="text/plain")


You can even get crazier and do llm.astream_complete() and write each token as a stream event
I've never used SSE before with fastapi tbh, only Respnse, StreamingResponse, and websockets.
Thanks for your help. I tested my code in Terminal, and it works as well. Maybe it's related to the SSE used in FastAPI. I will check it going forward. Thanks again!
I found the problem is when the method write_event_to_stream finishes executing, it should run the await LLM.acomplete method, but it doesn't release the CPU to other coroutines; if I add a line async.sleep(0.1), the program works. Only appear in FastAPI
I debugged the ctx.steam_queue, and it continued increasing until the stop_event sent
How come my fastapi code worked fine?
I didn't have any sleep calls, and the output seemed correct when hitting the endpoint
Add a reply
Sign up and join the conversation on Discord