Please stand by @yadibolt as I search the knowledge base. I normally take about 15 seconds or so. :wave:
----------------------
Based on the information provided in the knowledge sources, I can offer some insights about the
astream_chat
method in LlamaIndex, which seems relevant to your question. However, please note that the knowledge sources don't contain specific information about FastAPI, websockets, or parallelism issues you're experiencing. I'll focus on what I can confidently say about LlamaIndex's async streaming capabilities.
The
astream_chat
method is indeed the asynchronous version of
stream_chat
in LlamaIndex. It's designed to provide a streaming response asynchronously. Here's what we know from the documentation:
- The
astream_chat
method is part of the BaseChatEngine
class in LlamaIndex LlamaIndex > En > Latest > Api_reference > Index > BaseChatEngine > astream_chat.
- It's an abstract method, which means it needs to be implemented by specific chat engine classes.
- The method signature is:
async def astream_chat(self, message: str, chat_history: Optional[List[ChatMessage]] = None) -> StreamingAgentChatResponse
- To use it, you would typically await the method and then iterate over the response asynchronously. Here's an example from the documentation LlamaIndex > En > Latest > Examples > Llm > Async chat:
async def get_dog_name_streaming(llm, messages):
async for chunk in await llm.astream_chat(messages):
print(chunk.delta, end="", flush=True)
asyncio.run(get_dog_name_streaming(llm, messages))
- If you're using Ollama as your model wrapper, there is an
OllamaMultiModal
class that implements astream_chat
LlamaIndex > En > Latest > Api_reference > Multi_modal_llms > Ollama > OllamaMultiModal.
Given that you're having trouble getting
astream_chat
to work, it's possible that there might be an issue with how it's being called or how the asynchronous code is being handled in your FastAPI application. Unfortunately, without seeing your specific implementation, it's difficult to pinpoint the exact problem.
To troubleshoot, you might want to:
- Ensure you're using the correct async syntax when calling
astream_chat
. - Check that you're properly awaiting the method and iterating over the response asynchronously.
- Verify that your FastAPI routes are defined as asynchronous where necessary.
- Make sure you're running your FastAPI application with an ASGI server that supports websockets and asynchronous operations, like Uvicorn.
If you're still facing issues, it might be helpful to share your specific implementation (the relevant parts of your FastAPI app and how you're calling
astream_chat
) in the LlamaIndex community forums or GitHub issues for more targeted assistance.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@kapa.ai I implemented the astream_chat it returns error: error_message
"Initial token count exceeds token limit", but I've set the token limit to 2048
Let me check the knowledge base for you again. One moment! :books:
----------------------
I understand you're encountering an error when using
astream_chat
in LlamaIndex. Let's look at this issue based on the information provided in the knowledge sources.
The error message "Initial token count exceeds token limit" is related to the token limit setting in the chat memory buffer. This error occurs when the initial token count of your conversation history exceeds the set token limit. Let's break down the problem and potential solution:
- Token Limit: You mentioned setting the token limit to 2048. This is consistent with the artificial limit set in one of the examples LlamaIndex > En > Latest > Examples > Finetuning > React_agent > limit the context window artifically to test refine process Part 1:
Settings.context_window = 2048
- Chat Memory Buffer: The error is likely coming from the
ChatMemoryBuffer
class. This class is responsible for managing the chat history and enforcing token limits LlamaIndex > En > Latest > Api_reference > Memory > Chat memory buffer > ChatMemoryBuffer > get.
- Initial Token Count: The error suggests that the initial token count of your conversation history is already exceeding the 2048 token limit you've set.
To resolve this issue, you have a few options:
- Increase the token limit: If your conversation history is consistently larger than 2048 tokens, you might need to increase the limit. You can do this when initializing the
ChatMemoryBuffer
:
memory = ChatMemoryBuffer.from_defaults(token_limit=4096) # or any higher value
- Clear or truncate the chat history: If you don't need the entire conversation history, you can clear it or keep only the most recent messages.
- Use a different memory implementation: LlamaIndex offers other memory implementations that might be more suitable for your use case.
- Check your implementation: Ensure that you're not accidentally adding extra content to the chat history that's causing it to exceed the token limit.
Here's an example of how you might initialize a chat engine with a higher token limit:
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.chat_engine import SimpleChatEngine
memory = ChatMemoryBuffer.from_defaults(token_limit=4096)
chat_engine = SimpleChatEngine.from_defaults(memory=memory)
# Now use chat_engine.astream_chat()
If you're still encountering issues after trying these solutions, it would be helpful to review how you're initializing and using the chat engine and memory components in your code. The exact implementation can affect how the token limit is applied and how the chat history is managed.
Remember, the token limit is there to manage the context window of the language model. If you increase it significantly, make sure your underlying LLM can handle the larger context.
LlamaIndex > En > Latest > Api_reference > Memory > Chat memory buffer provides more detailed information about the
ChatMemoryBuffer
class and its configuration options.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@yadibolt
from fastapi.responses import StreamingResponse
@app.post("/chat")
async def chat(...):
response = await chat_engine.astream_chat(...)
async def gen():
async for token in response.async_response_gen():
# fastapi typically needs a newline to deliminate chunks
yeild token + "\n"
return StreamingResponse(gen())
seems alright, but still getting that "Initial token count exceeds token limit"
I'm using the buffer too, dunno what's wrong
And all I'm sending is "Hello"
chat_memory = ChatMemoryBuffer.from_defaults(
token_limit=4096,
chat_history=self.chat_history
)
self.chat_engine: ContextChatEngine = index.as_chat_engine(
similarity_top_k=self.similarity_top_k,
chat_mode='context',
llm=self.llm,
context_template=formatted_context_template,
verbose=True,
system_prompt=formatted_system_prompt,
streaming=False,
memory=chat_memory
)
whats your similarity top k value? Seems like its retrieving way too much text
I should lower that right
weird that normal implementation with stream_chat works
what version of llama-index-core
do you have installed?
oh thats not latest -- we are at 0.12.10 these days
You probably need to bump all llama-index-*
packages if you update though
We removed the initial token count thing from those chat engines in newer versions. It shouldn't hit that error anymore no matter the top-k
but it will make more than one llm call if the top-k is large
Thank you will try. You've modified HuggingFaceEmbeddings too?
hmm I can't remember from that old. Probably not, but since everything depends on llama-index-core
, all the packages that depend on it (so everything) have to also bump
just a symptom of how poetry deps work
I'll have to look through my and llama-index repo, some imports are broken for me π
Thank you for help for now, I'll let you know if something pops up with that async streaming if I can ofc.
which imports broke? Happy to help if its related to version changes or anything
I'm using Elasticsearch so now the vectorstore can't be initialized
to be exact
'ElasticsearchStore' object has no attribute '__pydantic_private__'
Works fine! Thank you! Now to the core of the problem. It still waits for the instance to finish generating and it's not generating concurrently.
I printed the chat_engines separately, both do have different memory adress, so they are not the same instance
@kapa.ai hi again. I've implemented the async version of stream_chat, but when I prompt two separate instances it waits for the first one to be done before going to another. I'm using Ollama, again I've specified how many models can be instantiated and how many of them can run at the same time. Both have value of 4. How do I run these Ollama instances concurrently without being blocked?
Let me check the knowledge base for you again. One moment! :books:
----------------------
Hello again! I understand you're facing issues with concurrent execution of Ollama instances using LlamaIndex's async version of
stream_chat
. Let's address this based on the information available in the knowledge sources.
The issue you're experiencing seems to be related to concurrency in LlamaIndex workflows. While you've configured Ollama to allow multiple instances, the LlamaIndex workflow might be the bottleneck. Here's what we can gather from the documentation:
- LlamaIndex provides a
ParallelAgentRunner
class that allows for concurrent execution of workflows LlamaIndex > En > Latest > Examples > Workflow > Parallel Execution of Same Event Example. This class can help you run multiple instances concurrently.
- The
@step
decorator in LlamaIndex workflows can be used to control the number of concurrent executions. You can set the num_workers
parameter to specify how many steps can be executed simultaneously LlamaIndex > En > Latest > Understanding > Workflows > Concurrent execution of workflows.
Here's an example of how you might implement this:
```python
from llama_index.core.workflow import Workflow, step
class ParallelFlow(Workflow):
@step(num_workers=4)
async def run_ollama(self, ctx: Context, ev: OllamaEvent) -> StopEvent:
# Your Ollama chat logic here
response = await self.ollama_instance.astream_chat(ev.query)
# Process the response
return StopEvent(result=processed_response)
@step
async def start(self, ctx: Context, ev: StartEvent) -> OllamaEvent:
# Emit multiple events for parallel processing
ctx.send_event(OllamaEvent(query="Query 1"))
ctx.send_event(OllamaEvent(query="Query 2"))
ctx.send_event(OllamaEvent(query="Query 3"))
ctx.send_event(OllamaEvent(query="Query 4"))
In this example:
- The `run_ollama` step is decorated with `@step(num_workers=4)`, allowing up to 4 concurrent executions.
- The `start` step emits multiple events, which will be processed in parallel by `run_ollama`.
To use this workflow:
python
flow = ParallelFlow()
await flow.start()
This approach should allow you to run multiple Ollama instances concurrently without blocking.
Additionally, ensure that you're using the asynchronous methods correctly in your FastAPI application. Here's a basic example of how you might structure your FastAPI route:
python
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
# Use your ParallelFlow here
flow = ParallelFlow()
result = await flow.start(data)
await websocket.send_text(result)
```
Remember to run your FastAPI application with an ASGI server that supports concurrent WebSocket connections, such as Uvicorn with multiple workers.
If you're still experiencing issues after implementing these suggestions, it might be worth checking if there are any other bottlenecks in your application or if there are any specific limitations with your current Ollama setup that might be preventing true concurrency.
@yadibolt whats the full code to your endpoint? Shouldn't be an issue assuming async is used everywhere. You mentioned you are using HuggingFaceEmbedding()
which does not have async because its running in process and not over an API, so embedding calls will block π€ But the Ollama LLM definitely has proper async
It's a long one but ill share important snippets.
chat.py:
# initialize the embedding model in order to speed up the response time
Settings.embed_model = HuggingFaceEmbedding(
model_name="google-bert/bert-base-multilingual-cased"
)
# initialize router
router = APIRouter()
sessions = {} # stores the chat sessions
# define routes - WebSocket
@router.websocket("/{uuid}") # uuid - user unique identifier
async def chat(websocket: WebSocket, uuid: str):
...
try:
while True:
# parse received data
data = await websocket.receive_text()
payload = json.loads(data)
...
message_handler = MessageHandler(llm_instance=chat_LLM, websocket=websocket)
# generate response from the model
message_task = asyncio.to_thread(await message_handler.handle_RAG_message(message=payload['message']), sessions[current_user.get_uuid()]["chat_session"].get_current_conversation().get_conversation_id())
assistant_generated_message = await message_task
in sessions, users have stored their "own" instance of LLM instance that is being used
handle_RAG_message method:
async def handle_RAG_message(self, message: str) -> str:
"""
Handles the message from the client
Returns the generated assistant_generated_message
"""
# token generation loop
await self.websocket.send_text(WSResponse(action="generate-state", state="started", error=False, error_message="", timestamp=int(time.time())).to_json())
assistant_generated_message = ""
chat_engine = self.llm_instance.get_chat_engine()
print(f"chat_engine: {chat_engine}")
response = await chat_engine.astream_chat(message=message)
async for token in response.async_response_gen():
assistant_generated_message += token
await self.websocket.send_text(WSResponse(action="generate", partial=token, error=False, error_message="", timestamp=int(time.time())).to_json())
await self.websocket.send_text(WSResponse(action="generate-state", state="finished", error=False, error_message="", timestamp=int(time.time())).to_json())
return assistant_generated_message
should I just avoid using sessions dict?
eh I think thats fine (its like a mock redis at that point)
How are you testing that things aren't concurrent?
I have two windows open, both have their own session
HuggingFaceEmbedding() should not be an issue, since that is not getting called each time
if you do not understand anything I will provide more information
I just send message over the websockets from one and another window, wait for response generation... aaand
both respond with generate action which means that it hit the handle_RAG_message() method
but, one window is generating response, another is not
once the first one is done, then the second one starts
that's why I was thinking it was because of synchronous stream chat, but it's still the same
without the asyncio.to_thread() it waits entirely
not even sending 'generate-state'
which means it is called only after the first is done βΉοΈ
It also gets called during the chat, since it has to retrieve context and emebd your query/message
so the fix would be download the model and run it locally u say right?
uhhh not quite. The fix would be to use an embedding model that has a proper async api (i.e. hosted on a server). TEI, ollama, openai, etc. are valid options
But this is assuming that the embed model is the issue too
You could maybe verify if its an issue by swapping in a dummy embedding model
from llama_index.core import MockEmbedding
embed_model = MockEmbedding(embed_dim=1234)
The embed_dim should match your current embedding model dim
both of these should be good no?
those ollama embed models are also good, if you want to try OllamaEmbedding π
ill try it and let you know
It definitely works finally. Thanks. Last thing I need to figure out, why is it sending all tokens at the same time and not streaming
Like it sends them separately but yeah:
# token generation loop
await self.websocket.send_text(WSResponse(action="generate-state", state="started", error=False, error_message="", timestamp=int(time.time())).to_json())
assistant_generated_message = ""
chat_engine = self.llm_instance.get_chat_engine()
response = await chat_engine.astream_chat(message=message)
async def gen():
async for token in response.async_response_gen():
yield token
async for token in gen():
assistant_generated_message += token
await self.websocket.send_text(WSResponse(action="generate", partial=token, error=False, error_message="", timestamp=int(time.time())).to_json())
await self.websocket.send_text(WSResponse(action="generate-state", state="finished", error=False, error_message="", timestamp=int(time.time())).to_json())
this is the generation method now
@kapa.ai Hey, I have a method that should return streaming response, but it returns all the tokens at the same time. Do you know why?
Here is the method:
response = await chat_engine.astream_chat(message=message)
async def gen():
async for token in response.async_response_gen():
yield token
async for token in gen():
assistant_generated_message += token
await # send websocket response
await # send websocket response
Let me check the knowledge base for you again. One moment! :books:
----------------------
Based on the information provided in the knowledge sources and your code snippet, I can offer some insights into why your method might be returning all tokens at once instead of streaming them. Let's analyze the issue:
- The
astream_chat
method you're using is the correct asynchronous streaming method for LlamaIndex chat engines LlamaIndex > En > Latest > Api_reference > Chat_engines > Context > ContextChatEngine Part 4.
- Your implementation of the
gen()
function and the subsequent iteration over it looks correct in principle.
However, there are a few potential issues that could cause the tokens to be returned all at once:
- Buffering: The
async for
loop might be buffering the entire response before yielding anything. This can happen if the underlying implementation of async_response_gen()
is not truly streaming.
- Websocket implementation: The issue might be in how you're sending the websocket response (which is not shown in the provided code).
- Event loop blocking: If there's any blocking operation in your code, it might cause the entire response to be generated before any tokens are sent.
To address these issues, you could try the following modifications:
async def stream_chat(message: str):
response = await chat_engine.astream_chat(message=message)
assistant_generated_message = ""
async for token in response.async_response_gen():
assistant_generated_message += token
# Yield each token immediately
yield token
# Optionally, you can add a small delay to ensure the event loop can process other tasks
await asyncio.sleep(0)
# In your websocket handler
async for token in stream_chat(message):
await websocket.send_text(token)
This modification:
- Removes the nested
gen()
function, simplifying the code. - Yields each token immediately as it's received.
- Optionally adds a small delay with
await asyncio.sleep(0)
to ensure the event loop can process other tasks.
If you're using FastAPI, you might want to use the
StreamingResponse
class to handle the streaming:
from fastapi.responses import StreamingResponse
@app.get("/stream")
async def stream_endpoint(message: str):
return StreamingResponse(stream_chat(message), media_type="text/plain")
If you're still experiencing issues after these modifications, it might be worth checking:
- The specific implementation of the chat engine you're using.
- Any middleware or other components in your application that might be buffering the response.
- The configuration of your ASGI server (e.g., Uvicorn) to ensure it's set up for streaming responses.
Remember, true streaming requires support at every level of your application stack, from the LLM to the chat engine, to your web framework, and finally to the client receiving the stream.
LlamaIndex > En > Latest > Api_reference > Chat_engines > Context > ContextChatEngine Part 4 provides more details about the
astream_chat
method and its expected behavior.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@kapa.ai I'm using websockets the StreamingResponse from fastapi is probably not useful
Let me check the knowledge base for you again. One moment! :books:
----------------------
You're right, and I apologize for the confusion in my previous response. Since you're using WebSockets, the
StreamingResponse
from FastAPI indeed isn't applicable in your case. Let's focus on how to properly stream the response over WebSockets using LlamaIndex's
astream_chat
method.
Based on the information from the knowledge sources, here's a more appropriate approach for streaming chat responses over WebSockets:
@app.websocket("/chat")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
message = await websocket.receive_text()
response = await chat_engine.astream_chat(message=message)
async for token in response.async_response_gen():
await websocket.send_text(token)
# Optionally send a message to indicate the end of the stream
await websocket.send_text("[END_OF_STREAM]")
This implementation:
- Uses the WebSocket protocol to maintain a persistent connection.
- Receives messages from the client.
- Uses
astream_chat
to generate a streaming response. - Sends each token individually as it's generated.
- Optionally sends an end-of-stream marker.
A few key points to ensure proper streaming:
- Make sure your
chat_engine
is configured for streaming. The astream_chat
method should return a StreamingAgentChatResponse
object LlamaIndex > En > Latest > Api_reference > Chat_engines > StreamingAgentChatResponse Part 4.
- The
async_response_gen()
method of StreamingAgentChatResponse
should yield individual tokens LlamaIndex > En > Latest > Api_reference > Chat_engines > StreamingAgentChatResponse Part 4.
- If you're still not seeing individual tokens being sent, you might need to ensure that the underlying LLM (in your case, Ollama) is configured for streaming as well.
- To prevent any potential buffering, you can add a small delay after sending each token:
async for token in response.async_response_gen():
await websocket.send_text(token)
await asyncio.sleep(0.01) # Small delay to prevent buffering
If you're still experiencing issues with tokens being sent all at once, it might be worth checking:
- Your Ollama configuration to ensure it's set up for streaming.
- The specific implementation of the chat engine you're using, to confirm it supports token-by-token streaming.
- Any middleware or WebSocket configurations in your FastAPI application that might be affecting the streaming behavior.
Remember, for true streaming, every component in your stack needs to support it, from the LLM (Ollama) through LlamaIndex and up to your WebSocket implementation.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
@Logan M sorry for the ping, may I ask you for your assistance?
I check ContextChatEngine if it supports streaming - says yes, I have set streaming to True yet still no token streaming one by one
Have you tried testing outside of fastapi? It definitely works
async for token in response.async_response_gen():
print(token, end="", flush=True)
Seems like either fastapi or your recieving end is buffering responses on the socket?
Maybe you should be using await websocket.send_json(...)
?
it does work... Oh my... what's wrong then
send_json works aa expected? Not sure what the issue is, but seems like you should just use that then π
I do.. everywhere now, but the issue persists
It's like it loads the response in the memory and then it spits out.. If I add a delay, let's say 0.01s they come token by token
wrapping it in another async function was the issue
I would like to know why tho... π
Hmm I have no idea π
I'm very new to websockets lol
I do know I had a lot of issues with the websockets package in fastapi. I ended up switching to socket.io and mounting that within my fastapi app
The websockets package seems pretty bare bones / under baked imo
I'll have a look at the socket.io, thanks!