Find answers from the community

Updated last month

Chat Workflow Timeout Issue

Hello, everybody. there is a question(maybe a bug) in my app, when using Workflow, I sent event to stream, and in the biz logic, I use steam_events to receive it, but when the timeout occurs, I can not catch the exception, and the steam_events was blocked. Here is my code, thanks for your help.

Plain Text
llm = get_llm()

    wf = ChatWorkflow(
        ctx=context,
        llm=llm,
        timeout=3,
        verbose=True,
    )

    async def stream_response():
        handler = wf.run()
        try:
            async for event in handler.stream_events():
                print("---->>>> ", event)
                if isinstance(event, ChatEvent):
                    yield SSEResponse.format_string(
                        SSEResponse.EventType.DATA,
                        event.message.model_dump(),
                    )
        except Exception as e:
            logger.error(f"Chat workflow error: {str(e)}")
            yield SSEResponse.format_string(
                SSEResponse.EventType.DATA,
                {"message": "server error"},
            )
        await handler

    return SSEResponse.send(stream_response())
O
L
O
14 comments
I was just about to type up a question about this too. Pinging @Logan M since he wrote the concierge flow and probably has a thought here.
timeout=3 is suuuuper short ๐Ÿ˜… tbh I would set timeout=None in most cases

That being said, I think this is a bug (could have sworn we had a unit test for the scenario)
Thanks for the reply, maybe other exceptions canโ€™t be caught either?
There's some error in the dispatcher instrumentation that should be caught yea
If this is merged, does it mean that the version released tomorrow will have this change?
Well, might not be tomorrow (depends how quickly I can make the release LOL)
I modify the code in my local workspace, it dose fixed the stream cancel problem, but the workflow still raise the exception in the console, it seems that the exception was reraised?
Plain Text
async def stream_response():
        wf = ChatWorkflow(
            ctx=context,
            llm=llm,
            timeout=3,
            verbose=True,
        )
        handler = wf.run()
        async for event in handler.stream_events():
            print(
                "---->>>> ",
                type(event),
                event.message.event if isinstance(event, ChatEvent) else "stop",
            )
            if isinstance(event, ChatEvent):
                yield SSEResponse.format_string(
                    SSEResponse.EventType.DATA,
                    event.message.model_dump(),
                )
        try:
            await handler
        except Exception as e:
            logger.error(f"Chat workflow error: {str(e)}")
            yield SSEResponse.format_string(
                SSEResponse.EventType.DATA,
                {"message": "server error"},
            )

    return SSEResponse.send(stream_response())
I use this in a fastapi router handler, another question is that I got 5 events in every request, but starts on the second, the start event was not being sent, maybe this is a coroutine problem, I'm not good in python, golang is my main language
Its raised once in the disapatcher/instrumentation and then raised again, no way around this really with async stuff
ngl I'm not really sure what this means haha

A simple way to replicate the issue might help
Ok, I'm going to read some documents about Python async, btw, python async is not as convenient as golang in terms of feelings.
python async is pretty weird I agree. I've gotten used to it over the last year after using it so much though
Add a reply
Sign up and join the conversation on Discord