Find answers from the community

Updated 2 weeks ago

Hey! I'm currently working on a chat app. App is based on websockets (fastapi lib).I have

@kapa.ai
Hey! I'm currently working on a chat app. App is based on websockets (fastapi lib).
I have a problem with paralelism. Once I call one instance the second one is waiting for the first one to be done.
I've looked around but I don't seem to find a solution. Tried to implement
Plain Text
chat_engine.astream_chat
instead of the sync one -
Plain Text
chat_engine.stream_chat
,
but that does not seem to work for me either (I straight up could not get astream_chat to work). I wonder if this is why it waits for the first instance to be done or not.
Anybody has any idea? (As a model wrapper I use Ollama - it is not single threaded, tried it in cmd)
k
y
L
109 comments
Please stand by @yadibolt as I search the knowledge base. I normally take about 15 seconds or so. :wave:
----------------------
Based on the information provided in the knowledge sources, I can offer some insights about the astream_chat method in LlamaIndex, which seems relevant to your question. However, please note that the knowledge sources don't contain specific information about FastAPI, websockets, or parallelism issues you're experiencing. I'll focus on what I can confidently say about LlamaIndex's async streaming capabilities.

The astream_chat method is indeed the asynchronous version of stream_chat in LlamaIndex. It's designed to provide a streaming response asynchronously. Here's what we know from the documentation:

  1. The astream_chat method is part of the BaseChatEngine class in LlamaIndex LlamaIndex > En > Latest > Api_reference > Index > BaseChatEngine > astream_chat.
  1. It's an abstract method, which means it needs to be implemented by specific chat engine classes.
  1. The method signature is:
    Plain Text
    async def astream_chat(self, message: str, chat_history: Optional[List[ChatMessage]] = None) -> StreamingAgentChatResponse
  1. To use it, you would typically await the method and then iterate over the response asynchronously. Here's an example from the documentation LlamaIndex > En > Latest > Examples > Llm > Async chat:
Plain Text
   async def get_dog_name_streaming(llm, messages):
       async for chunk in await llm.astream_chat(messages):
           print(chunk.delta, end="", flush=True)

   asyncio.run(get_dog_name_streaming(llm, messages))
   
  1. If you're using Ollama as your model wrapper, there is an OllamaMultiModal class that implements astream_chat LlamaIndex > En > Latest > Api_reference > Multi_modal_llms > Ollama > OllamaMultiModal.
Given that you're having trouble getting astream_chat to work, it's possible that there might be an issue with how it's being called or how the asynchronous code is being handled in your FastAPI application. Unfortunately, without seeing your specific implementation, it's difficult to pinpoint the exact problem.

To troubleshoot, you might want to:
  1. Ensure you're using the correct async syntax when calling astream_chat.
  2. Check that you're properly awaiting the method and iterating over the response asynchronously.
  3. Verify that your FastAPI routes are defined as asynchronous where necessary.
  4. Make sure you're running your FastAPI application with an ASGI server that supports websockets and asynchronous operations, like Uvicorn.
If you're still facing issues, it might be helpful to share your specific implementation (the relevant parts of your FastAPI app and how you're calling astream_chat) in the LlamaIndex community forums or GitHub issues for more targeted assistance.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@kapa.ai I implemented the astream_chat it returns error: error_message
"Initial token count exceeds token limit", but I've set the token limit to 2048
Let me check the knowledge base for you again. One moment! :books:
----------------------
I understand you're encountering an error when using astream_chat in LlamaIndex. Let's look at this issue based on the information provided in the knowledge sources.

The error message "Initial token count exceeds token limit" is related to the token limit setting in the chat memory buffer. This error occurs when the initial token count of your conversation history exceeds the set token limit. Let's break down the problem and potential solution:

  1. Token Limit: You mentioned setting the token limit to 2048. This is consistent with the artificial limit set in one of the examples LlamaIndex > En > Latest > Examples > Finetuning > React_agent > limit the context window artifically to test refine process Part 1:
Plain Text
   Settings.context_window = 2048
   


  1. Chat Memory Buffer: The error is likely coming from the ChatMemoryBuffer class. This class is responsible for managing the chat history and enforcing token limits LlamaIndex > En > Latest > Api_reference > Memory > Chat memory buffer > ChatMemoryBuffer > get.
  1. Initial Token Count: The error suggests that the initial token count of your conversation history is already exceeding the 2048 token limit you've set.
To resolve this issue, you have a few options:

  1. Increase the token limit: If your conversation history is consistently larger than 2048 tokens, you might need to increase the limit. You can do this when initializing the ChatMemoryBuffer:
Plain Text
   memory = ChatMemoryBuffer.from_defaults(token_limit=4096)  # or any higher value
   
  1. Clear or truncate the chat history: If you don't need the entire conversation history, you can clear it or keep only the most recent messages.
  1. Use a different memory implementation: LlamaIndex offers other memory implementations that might be more suitable for your use case.
  1. Check your implementation: Ensure that you're not accidentally adding extra content to the chat history that's causing it to exceed the token limit.
Here's an example of how you might initialize a chat engine with a higher token limit:

Plain Text
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.chat_engine import SimpleChatEngine

memory = ChatMemoryBuffer.from_defaults(token_limit=4096)
chat_engine = SimpleChatEngine.from_defaults(memory=memory)

# Now use chat_engine.astream_chat()


If you're still encountering issues after trying these solutions, it would be helpful to review how you're initializing and using the chat engine and memory components in your code. The exact implementation can affect how the token limit is applied and how the chat history is managed.

Remember, the token limit is there to manage the context window of the language model. If you increase it significantly, make sure your underlying LLM can handle the larger context.

LlamaIndex > En > Latest > Api_reference > Memory > Chat memory buffer provides more detailed information about the ChatMemoryBuffer class and its configuration options.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@yadibolt

Plain Text
from fastapi.responses import StreamingResponse

@app.post("/chat")
async def chat(...):
  response = await chat_engine.astream_chat(...)
  async def gen():
    async for token in response.async_response_gen():
      # fastapi typically needs a newline to deliminate chunks
      yeild token + "\n" 
  
  return StreamingResponse(gen())
seems alright, but still getting that "Initial token count exceeds token limit"
I'm using the buffer too, dunno what's wrong
And all I'm sending is "Hello"
Plain Text
chat_memory = ChatMemoryBuffer.from_defaults(
  token_limit=4096,
  chat_history=self.chat_history
)
Plain Text
self.chat_engine: ContextChatEngine = index.as_chat_engine(
   similarity_top_k=self.similarity_top_k,
   chat_mode='context',
   llm=self.llm,
   context_template=formatted_context_template,
   verbose=True,
   system_prompt=formatted_system_prompt,
   streaming=False,
   memory=chat_memory
)
whats your similarity top k value? Seems like its retrieving way too much text
I should lower that right
oh, 512 per chunk
weird that normal implementation with stream_chat works
what version of llama-index-core do you have installed?
should be latest
"0.10.53.post1"
oh thats not latest -- we are at 0.12.10 these days
hope nothing breaks now
shouldn't πŸ™
You probably need to bump all llama-index-* packages if you update though
We removed the initial token count thing from those chat engines in newer versions. It shouldn't hit that error anymore no matter the top-k
but it will make more than one llm call if the top-k is large
Thank you will try. You've modified HuggingFaceEmbeddings too?
hmm I can't remember from that old. Probably not, but since everything depends on llama-index-core, all the packages that depend on it (so everything) have to also bump
just a symptom of how poetry deps work
I'll have to look through my and llama-index repo, some imports are broken for me πŸ˜… Thank you for help for now, I'll let you know if something pops up with that async streaming if I can ofc.
which imports broke? Happy to help if its related to version changes or anything
I'm using Elasticsearch so now the vectorstore can't be initialized
to be exact
Plain Text
'ElasticsearchStore' object has no attribute '__pydantic_private__'
Works fine! Thank you! Now to the core of the problem. It still waits for the instance to finish generating and it's not generating concurrently.
I printed the chat_engines separately, both do have different memory adress, so they are not the same instance
@kapa.ai hi again. I've implemented the async version of stream_chat, but when I prompt two separate instances it waits for the first one to be done before going to another. I'm using Ollama, again I've specified how many models can be instantiated and how many of them can run at the same time. Both have value of 4. How do I run these Ollama instances concurrently without being blocked?
Let me check the knowledge base for you again. One moment! :books:
----------------------
Hello again! I understand you're facing issues with concurrent execution of Ollama instances using LlamaIndex's async version of stream_chat. Let's address this based on the information available in the knowledge sources.

The issue you're experiencing seems to be related to concurrency in LlamaIndex workflows. While you've configured Ollama to allow multiple instances, the LlamaIndex workflow might be the bottleneck. Here's what we can gather from the documentation:

  1. LlamaIndex provides a ParallelAgentRunner class that allows for concurrent execution of workflows LlamaIndex > En > Latest > Examples > Workflow > Parallel Execution of Same Event Example. This class can help you run multiple instances concurrently.
  1. The @step decorator in LlamaIndex workflows can be used to control the number of concurrent executions. You can set the num_workers parameter to specify how many steps can be executed simultaneously LlamaIndex > En > Latest > Understanding > Workflows > Concurrent execution of workflows.
Here's an example of how you might implement this:

```python
from llama_index.core.workflow import Workflow, step

class ParallelFlow(Workflow):
@step(num_workers=4)
async def run_ollama(self, ctx: Context, ev: OllamaEvent) -> StopEvent:
# Your Ollama chat logic here
response = await self.ollama_instance.astream_chat(ev.query)
# Process the response
return StopEvent(result=processed_response)
@step
async def start(self, ctx: Context, ev: StartEvent) -> OllamaEvent:
# Emit multiple events for parallel processing
ctx.send_event(OllamaEvent(query="Query 1"))
ctx.send_event(OllamaEvent(query="Query 2"))
ctx.send_event(OllamaEvent(query="Query 3"))
ctx.send_event(OllamaEvent(query="Query 4"))
Plain Text
In this example:
- The `run_ollama` step is decorated with `@step(num_workers=4)`, allowing up to 4 concurrent executions.
- The `start` step emits multiple events, which will be processed in parallel by `run_ollama`.

To use this workflow:
python
flow = ParallelFlow()
await flow.start()
Plain Text
This approach should allow you to run multiple Ollama instances concurrently without blocking.

Additionally, ensure that you're using the asynchronous methods correctly in your FastAPI application. Here's a basic example of how you might structure your FastAPI route:
python
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
# Use your ParallelFlow here
flow = ParallelFlow()
result = await flow.start(data)
await websocket.send_text(result)
```

Remember to run your FastAPI application with an ASGI server that supports concurrent WebSocket connections, such as Uvicorn with multiple workers.

If you're still experiencing issues after implementing these suggestions, it might be worth checking if there are any other bottlenecks in your application or if there are any specific limitations with your current Ollama setup that might be preventing true concurrency.
LlamaIndex > En > Latest > Understanding > Workflows > Concurrent execution of workflows provides more detailed information about concurrent execution in LlamaIndex workflows.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@yadibolt whats the full code to your endpoint? Shouldn't be an issue assuming async is used everywhere. You mentioned you are using HuggingFaceEmbedding() which does not have async because its running in process and not over an API, so embedding calls will block πŸ€” But the Ollama LLM definitely has proper async
It's a long one but ill share important snippets.
chat.py:
Plain Text
# initialize the embedding model in order to speed up the response time
Settings.embed_model = HuggingFaceEmbedding(
    model_name="google-bert/bert-base-multilingual-cased"
)

# initialize router
router = APIRouter()

sessions = {} # stores the chat sessions

# define routes - WebSocket
@router.websocket("/{uuid}") # uuid - user unique identifier
async def chat(websocket: WebSocket, uuid: str):
...
  try:
        while True:
            # parse received data
            data = await websocket.receive_text()
            payload = json.loads(data)
            ...
            message_handler = MessageHandler(llm_instance=chat_LLM, websocket=websocket)
            # generate response from the model
                            message_task = asyncio.to_thread(await message_handler.handle_RAG_message(message=payload['message']), sessions[current_user.get_uuid()]["chat_session"].get_current_conversation().get_conversation_id())
                            assistant_generated_message = await message_task
in sessions, users have stored their "own" instance of LLM instance that is being used
handle_RAG_message method:
Plain Text
async def handle_RAG_message(self, message: str) -> str:
        """
        Handles the message from the client
        Returns the generated assistant_generated_message
        """

        # token generation loop
        await self.websocket.send_text(WSResponse(action="generate-state", state="started", error=False, error_message="", timestamp=int(time.time())).to_json())
        
        assistant_generated_message = ""
        chat_engine = self.llm_instance.get_chat_engine()

        print(f"chat_engine: {chat_engine}")

        response = await chat_engine.astream_chat(message=message)
        async for token in response.async_response_gen():
            assistant_generated_message += token
            await self.websocket.send_text(WSResponse(action="generate", partial=token, error=False, error_message="", timestamp=int(time.time())).to_json())
        await self.websocket.send_text(WSResponse(action="generate-state", state="finished", error=False, error_message="", timestamp=int(time.time())).to_json())

        return assistant_generated_message
should I just avoid using sessions dict?
eh I think thats fine (its like a mock redis at that point)

How are you testing that things aren't concurrent?
I have two windows open, both have their own session
in inspector - WS
currently first one:
HuggingFaceEmbedding() should not be an issue, since that is not getting called each time
only at the start of app
if you do not understand anything I will provide more information
I just send message over the websockets from one and another window, wait for response generation... aaand
both respond with generate action which means that it hit the handle_RAG_message() method
but, one window is generating response, another is not
once the first one is done, then the second one starts
that's why I was thinking it was because of synchronous stream chat, but it's still the same
without the asyncio.to_thread() it waits entirely
not even sending 'generate-state'
which means it is called only after the first is done ☹️
It also gets called during the chat, since it has to retrieve context and emebd your query/message
so the fix would be download the model and run it locally u say right?
uhhh not quite. The fix would be to use an embedding model that has a proper async api (i.e. hosted on a server). TEI, ollama, openai, etc. are valid options
But this is assuming that the embed model is the issue too
You could maybe verify if its an issue by swapping in a dummy embedding model

Plain Text
from llama_index.core import MockEmbedding

embed_model = MockEmbedding(embed_dim=1234)


The embed_dim should match your current embedding model dim
both of these should be good no?
those ollama embed models are also good, if you want to try OllamaEmbedding πŸ‘
ill try it and let you know
It definitely works finally. Thanks. Last thing I need to figure out, why is it sending all tokens at the same time and not streaming
Like it sends them separately but yeah:
Plain Text
# token generation loop
        await self.websocket.send_text(WSResponse(action="generate-state", state="started", error=False, error_message="", timestamp=int(time.time())).to_json())
        
        assistant_generated_message = ""
        chat_engine = self.llm_instance.get_chat_engine()

        response = await chat_engine.astream_chat(message=message)
        async def gen():
            async for token in response.async_response_gen():
                yield token

        async for token in gen():
            assistant_generated_message += token
            await self.websocket.send_text(WSResponse(action="generate", partial=token, error=False, error_message="", timestamp=int(time.time())).to_json())

        await self.websocket.send_text(WSResponse(action="generate-state", state="finished", error=False, error_message="", timestamp=int(time.time())).to_json())
this is the generation method now
@kapa.ai Hey, I have a method that should return streaming response, but it returns all the tokens at the same time. Do you know why?
Here is the method:
Plain Text
response = await chat_engine.astream_chat(message=message)
        async def gen():
            async for token in response.async_response_gen():
                yield token

        async for token in gen():
            assistant_generated_message += token
            await # send websocket response

        await # send websocket response
Let me check the knowledge base for you again. One moment! :books:
----------------------
Based on the information provided in the knowledge sources and your code snippet, I can offer some insights into why your method might be returning all tokens at once instead of streaming them. Let's analyze the issue:

  1. The astream_chat method you're using is the correct asynchronous streaming method for LlamaIndex chat engines LlamaIndex > En > Latest > Api_reference > Chat_engines > Context > ContextChatEngine Part 4.
  1. Your implementation of the gen() function and the subsequent iteration over it looks correct in principle.
However, there are a few potential issues that could cause the tokens to be returned all at once:

  1. Buffering: The async for loop might be buffering the entire response before yielding anything. This can happen if the underlying implementation of async_response_gen() is not truly streaming.
  1. Websocket implementation: The issue might be in how you're sending the websocket response (which is not shown in the provided code).
  1. Event loop blocking: If there's any blocking operation in your code, it might cause the entire response to be generated before any tokens are sent.
To address these issues, you could try the following modifications:

Plain Text
async def stream_chat(message: str):
    response = await chat_engine.astream_chat(message=message)
    assistant_generated_message = ""
    
    async for token in response.async_response_gen():
        assistant_generated_message += token
        # Yield each token immediately
        yield token
        # Optionally, you can add a small delay to ensure the event loop can process other tasks
        await asyncio.sleep(0)

# In your websocket handler
async for token in stream_chat(message):
    await websocket.send_text(token)
This modification:
  1. Removes the nested gen() function, simplifying the code.
  2. Yields each token immediately as it's received.
  3. Optionally adds a small delay with await asyncio.sleep(0) to ensure the event loop can process other tasks.
If you're using FastAPI, you might want to use the StreamingResponse class to handle the streaming:

Plain Text
from fastapi.responses import StreamingResponse

@app.get("/stream")
async def stream_endpoint(message: str):
    return StreamingResponse(stream_chat(message), media_type="text/plain")


If you're still experiencing issues after these modifications, it might be worth checking:

  1. The specific implementation of the chat engine you're using.
  2. Any middleware or other components in your application that might be buffering the response.
  3. The configuration of your ASGI server (e.g., Uvicorn) to ensure it's set up for streaming responses.
Remember, true streaming requires support at every level of your application stack, from the LLM to the chat engine, to your web framework, and finally to the client receiving the stream.

LlamaIndex > En > Latest > Api_reference > Chat_engines > Context > ContextChatEngine Part 4 provides more details about the astream_chat method and its expected behavior.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@kapa.ai I'm using websockets the StreamingResponse from fastapi is probably not useful
Let me check the knowledge base for you again. One moment! :books:
----------------------
You're right, and I apologize for the confusion in my previous response. Since you're using WebSockets, the StreamingResponse from FastAPI indeed isn't applicable in your case. Let's focus on how to properly stream the response over WebSockets using LlamaIndex's astream_chat method.

Based on the information from the knowledge sources, here's a more appropriate approach for streaming chat responses over WebSockets:

Plain Text
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    
    while True:
        message = await websocket.receive_text()
        
        response = await chat_engine.astream_chat(message=message)
        
        async for token in response.async_response_gen():
            await websocket.send_text(token)
        
        # Optionally send a message to indicate the end of the stream
        await websocket.send_text("[END_OF_STREAM]")


This implementation:

  1. Uses the WebSocket protocol to maintain a persistent connection.
  2. Receives messages from the client.
  3. Uses astream_chat to generate a streaming response.
  4. Sends each token individually as it's generated.
  5. Optionally sends an end-of-stream marker.
A few key points to ensure proper streaming:

  1. Make sure your chat_engine is configured for streaming. The astream_chat method should return a StreamingAgentChatResponse object LlamaIndex > En > Latest > Api_reference > Chat_engines > StreamingAgentChatResponse Part 4.
  1. The async_response_gen() method of StreamingAgentChatResponse should yield individual tokens LlamaIndex > En > Latest > Api_reference > Chat_engines > StreamingAgentChatResponse Part 4.
  1. If you're still not seeing individual tokens being sent, you might need to ensure that the underlying LLM (in your case, Ollama) is configured for streaming as well.
  1. To prevent any potential buffering, you can add a small delay after sending each token:
Plain Text
   async for token in response.async_response_gen():
       await websocket.send_text(token)
       await asyncio.sleep(0.01)  # Small delay to prevent buffering
   


If you're still experiencing issues with tokens being sent all at once, it might be worth checking:

  1. Your Ollama configuration to ensure it's set up for streaming.
  2. The specific implementation of the chat engine you're using, to confirm it supports token-by-token streaming.
  3. Any middleware or WebSocket configurations in your FastAPI application that might be affecting the streaming behavior.
Remember, for true streaming, every component in your stack needs to support it, from the LLM (Ollama) through LlamaIndex and up to your WebSocket implementation.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@Logan M sorry for the ping, may I ask you for your assistance?
I check ContextChatEngine if it supports streaming - says yes, I have set streaming to True yet still no token streaming one by one
Have you tried testing outside of fastapi? It definitely works

Plain Text
async for token in response.async_response_gen():
  print(token, end="", flush=True)


Seems like either fastapi or your recieving end is buffering responses on the socket?
Maybe you should be using await websocket.send_json(...) ?
it does work... Oh my... what's wrong then
send_json works aa expected? Not sure what the issue is, but seems like you should just use that then πŸ˜…
I do.. everywhere now, but the issue persists
It's like it loads the response in the memory and then it spits out.. If I add a delay, let's say 0.01s they come token by token
what in the world.. πŸ˜‚
wrapping it in another async function was the issue
I would like to know why tho... πŸ˜‚
Hmm I have no idea πŸ˜… I'm very new to websockets lol
I do know I had a lot of issues with the websockets package in fastapi. I ended up switching to socket.io and mounting that within my fastapi app
Seemed more reliable
The websockets package seems pretty bare bones / under baked imo
I'll have a look at the socket.io, thanks!
I owe you a wine πŸ˜„
Add a reply
Sign up and join the conversation on Discord