Find answers from the community

Updated 3 months ago

Hi, im trying do the document management

Hi, im trying do the document management, but dont works:
This is my code (i'm using ingestion pipeline, qdrant as vector store and redis as docstore, the docs are loading by a bucket):

def load_and_index(self): try: docs = self.loader() loaded_doc_ids = {doc.id_ for doc in docs} print(len(docs)) if docs: for doc in docs: nodes = self.pipeline.run(documents=[doc]) print(f"Ingestรฃo de {len(nodes)} Nodes do documento: {doc.metadata['file_name']}") docstore_hashes = set(self.pipeline.docstore.get_all_document_hashes().keys()) print(f"Hashs docstore: {docstore_hashes}") docstore_ids = set(self.pipeline.docstore.get_all_ref_doc_info().keys()) print(f"IDs docstore: {docstore_ids}") docs_exclu = docstore_ids - loaded_docs_ids if self.exclude_docs: for doc_id in docs_exclu: self.pipeline.docstore.delete_document(doc_id) self.index.delete_ref_doc(doc_id, delete_from_docstore=True) print(f"Doc delete: {doc_id}")

I think the code is not working because when i print(f"IDs docstore: {docstore_ids}"), the output is: IDs docstore: set()
Any help?
L
h
21 comments
What is going on here ๐Ÿ˜…
where is the pipeline?
If you attach a vector store and docstore to the pipeline, this is all handled for you. But the key point is that the same document should always have the same document ID
Thats the anchor for comparing hashes
so I'm trying to implement the following logic.
I have data from a bucket, which is being ingested by my ingestion pipeline with qdrant and redis. when I add a new file to the bucket, ok, the pipeline continues normally. but when I delete a file from the bucket it is not deleted from the docstore and vectorstore. I can't make this match.
when i use set(self.pipeline.docstore.get_all_ref_doc_info().keys())
i get nothing, but if i use .values() i get all the RefDocInfo
So you are just feeding the enitre bucket to the pipeline every time, and you want to to detect both changed documents (i.e. upserts) and also remove any documents there were in the bucket before but aren't now
There's a mode for that ๐Ÿ™‚
Plain Text
from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy

pipeline = IngestionPipeline(
  docstore=docstore,
  vector_store=vector_store,
  transformations=[...],
  docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE,
@Logan M hey, im sorry for bothering you. but my document id, which is a string, is being transformed into bytes (I don't know where) and is preventing verification, saying that that id does not exist (as it is not possible to match between bytes and string)
try: docs = self.loader() if docs: for doc in docs: print(type(doc.id_)) print(doc.id_) print(type(doc.doc_id)) print(doc.doc_id) nodes = self.pipeline.run(documents=[doc]) print(f" {len(nodes)} Nodes doc : {doc.metadata['file_name']}") except Exception as e: print(f"error: {e}")
the output is:
<class 'str'> d7942a16845189cd57fc00bf0d88a339 <class 'str'> d7942a16845189cd57fc00bf0d88a339 error: doc_id b'd7942a16845189cd57fc00bf0d88a339' not found.
what docstore are you using? I don't think the issue is it being bytes, I know the redis docstore stores everything in bytes
client = QdrantClient(self.qdrant_host, port=self.qdrant_port) vector_store = QdrantVectorStore(client=client, collection_name=self.qdrant_collection, enable_hybrid=True, batch_size=20) pipeline = IngestionPipeline( transformations=transformations, docstore=RedisDocumentStore.from_host_and_port(self.redis_host, self.redis_port, namespace=self.docstore_namespace), vector_store=vector_store, cache=IngestionCache( cache=RedisCache.from_host_and_port(self.redis_host, self.redis_port), collection=self.redis_collection, ), docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE, )
It seems like you are letting the document IDs randomly generate each time? Maybe? Normally you want to make sure they are set to something consistant, like a file path

How are you loading the documents?
im loading like this:

def loader(self): documents = [] s3_client = boto3.client('s3', aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key ) s3r = boto3.resource('s3', aws_access_key_id=self.aws_access_key_id, aws_secret_access_key=self.aws_secret_access_key) bucket = s3r.Bucket(self.bucket) objects = list(bucket.objects.all()) if len(objects) > 0: for obj in tqdm(objects): key = obj.key # Obter o objeto atual doc = s3_client.get_object(Bucket=self.bucket, Key=key) conteudo = doc['Body'].read().decode('utf-8') etag = doc['ETag'].strip('"') etag = str(etag) document = Document(id_=etag, text=conteudo, metadata={'file_name': key}) documents.append(document) else: print("No contents found.") return documents



you recommend me try with file_name as id and dont use etag?
im not using llamaindex integration with aws/bucket because i was having an error
and I'm assuming the etag wont ever change?
Ok, finally found time to test this.

Indeed, there was a small bug in RedisKVStore, will make a PR

After that change, this code seemed to work

Plain Text
from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy
from llama_index.core import Document, VectorStoreIndex, MockEmbedding
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from qdrant_client import QdrantClient

documents = [Document.example(), Document.example(), Document.example()]
for i, doc in enumerate(documents):
    doc.id_ = str(i) 

vector_store = QdrantVectorStore("ingest_test7", client=QdrantClient(":memory:"))
docstore = RedisDocumentStore.from_host_and_port("localhost", 6379, "ingest_test7")

pipeline = IngestionPipeline(
    docstore=docstore,
    vector_store=vector_store,
    transformations=[
      SentenceSplitter(chunk_size=25, chunk_overlap=0), 
      MockEmbedding(embed_dim=256)
    ],
    docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE
)

pipeline.run(documents=documents)

index = VectorStoreIndex.from_vector_store(vector_store, embed_model=MockEmbedding(embed_dim=256))

nodes = index.as_retriever(similarity_top_k=100).retrieve("test")
print("Vector store has " + str(len(nodes)) + " nodes")

nodes = list(docstore.docs.values())
print("Docstore has " + str(len(nodes)) + " nodes")

# Now we can run the pipeline with two less
pipeline.run(documents=documents[:-2])

nodes = index.as_retriever(similarity_top_k=100).retrieve("test")
print("Vector store has " + str(len(nodes)) + " nodes")

nodes = list(docstore.docs.values())
print("Docstore has " + str(len(nodes)) + " nodes")
thank you a lot
Add a reply
Sign up and join the conversation on Discord