Find answers from the community

Updated 3 months ago

I have some code running on a server

I have some code running on a server that's designed to index documents into an existing Qdrant collection. Here's the code:

Plain Text
client = qdrant_client.QdrantClient(host=QDRANT_HOST,
                                    grpc_port=QDRANT_GRPC_PORT,
                                    prefer_grpc=True,
                                    api_key=QDRANT_API_KEY)
vector_store = QdrantVectorStore(client=client,
                                  collection_name=collection_name,
                                  batch_size=20)
index = VectorStoreIndex.from_vector_store(vector_store=vector_store,
                                           service_context=service_context)
for document in documents:
    try:
        log.info(f"SETUP: Source {source_id} Updating document")
        index.update_ref_doc(
            document,
            update_kwargs={
                "delete_kwargs": {
                    "delete_from_docstore": True
                }
            },
        )
    except Exception as err:
        log.info(f"SETUP: Source {source_id} Error: {err}")
        log.info(f"SETUP: Source {source_id} Update failed, trying insert")
        index.insert(document)


This code performs well when processing documents one at a time. However, it encounters issues under multiple concurrent requests. Some requests fail with the error: "UNKNOWN:Error received from peer {grpc_message:"Wrong input: Collection 166850 already exists!", grpc_status:3. Consequently, both index.update_ref_doc in the try block and index.insert(document) in the exception handler block fail.

Can anyone offer some advice on this? Is Qdrant not capable of handling concurrent insertions?
S
L
3 comments
The issue disappared after enabling async ingestion
@Logan M I investigated this issue a bit more and found something interesting, so wanted to let you know:

In llama_index/vector_stores/qdrant/base.py,

both def _create_collection() and async def _acreate_collection() have this exception handler

Plain Text
except (ValueError, UnexpectedResponse) as exc:
            if "already exists" not in str(exc):
                raise exc  # noqa: TRY201
            logger.warning(
                "Collection %s already exists, skipping collection creation.",
                collection_name,
            )


The problem is, this exception handler does not get triggered for Qdrant gRPC clients. For HTTP clients it works fine. For gRPC, I just used a generic exception handler except Exception as exc instead of the above, and that able to handle the exception
Would be an easy fix to make. Would like to not catch a generic exception though
Add a reply
Sign up and join the conversation on Discord