Some loaders return multiple document objects from a single file path
@Logan M , I am using S3 loader from llama_hub. What do you mean by that? so this code has nothing to do with keeping track of node to document relationship? Is that feasible and available in llamaindex yet?
no it is used for keeping track of node-doc relationships. I just mean some document loaders (like the PDF loader) will return multiple document objects for the same file path. So we cant set the filename as the doc_id because there will be duplicates, hence the part
thing
Got it but in this case, I am using Docx Reader which I believe just returns one document object? Since I am using SentenceSplitter to split a document into nodes, how do I use the same logic to keep track of node-doc relationship? By using the default code, I am getting the same node id for all nodes of a docx document. That doesn't sound right
1) What is hash here?
2) ref_doc_id is not unique at node level. how to make this work in that case?
1) hash is just a hash of document metadata + document text
2) I think you are misunderstanding the code slightly. (also fyi, should probably be doing
pipeline.run(documents=documents)
The initial inputs just get combined into one list (i.e. all nodes/documents get merged into a single list here)
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L322C32-L322C32Then, it enters the upsert function here with your list of documents
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L327Since you are inputting documents, the
ref_doc_id
will be
none
at this step, causing it to use the
id_
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L290If the id was already added to the docstore, this will be a hit and we can compare the hash
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L292If the hash doesn't exist, we insert as normal
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L293C4-L293C4If the hash has changed, we delete and re-add
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L298Otherwise, the hash is unchaged, and we skip
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L307Now, this process relies on you attaching and persisting a docstore. I noticed in your code you did not attach a docstore to the pipeline@Logan M , First of all, thank you so much for clarifying this!
1) Something must be wrong with my understanding. My understanding is that if I attach and persist a docstore, and index the exact same data twice, it shouldn't be added twice in my vector_store. Is that understanding correct?
2) If that is correct, can you review this code and tell me what I am doing wrong? Indexing same data twice is adding them twice in the vector_store:
from llama_index.node_parser import SimpleNodeParser
from llama_index.ingestion import IngestionPipeline, DocstoreStrategy
from llama_index.vector_stores import PGVectorStore
from llama_index.embeddings import HuggingFaceEmbedding
from llama_index import SimpleDirectoryReader
from llama_index.storage.docstore import RedisDocumentStore
loader = SimpleDirectoryReader(
input_files = ["Afghanistan.docx"], # Any file that can be broken into at least 2 nodes
)
documents = loader.load_data()
embed_model = embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5", device = "cuda:0")
pg_vector_store = PGVectorStore.from_params(
database= "",
host= "",
password= "",
port="5432",
user="",
table_name= "nodes",
embed_dim=384, # bge-small-v1.5 embedding dimension
hybrid_search=True,
text_search_config="english",
)
pipeline = IngestionPipeline(
transformations=[
SimpleNodeParser(chunk_size=512, chunk_overlap=20),
embed_model,
],
docstore = RedisDocumentStore.from_host_and_port(
"127.0.0.1", 6379, namespace="temp_doc_store"),
docstore_strategy=DocstoreStrategy.UPSERTS,
vector_store = pg_vector_store,
)
nodes = pipeline.run(documents = documents)
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store2"
),
vector_store=PGVectorStore.from_params(
host="localhost",
port="5432",
table_name="test2",
user="postgres",
password="password",
database="vector_db",
embed_dim=384
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)
can you also share the document loader part?
It's the exact same as the notebook
from llama_index import SimpleDirectoryReader
# load documents with deterministic IDs
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
The only thing I changed in that notebook was replacing the vector store with postgres, and removing the ingestion cache
Ah I see! Let me just clear my redis and try it again. Thanks @Logan M
Thanks @Logan M It worked after cleaning up redis.
nice! That's a relief hahaha
@Logan M , I have a follow-up question. If I index some data and docstore and vector store are attached. If I index the data again but this time I am missing a file, will the missing file be removed from the remote docstore and the vector store? I tried it with the current setting and missing file is still present at both places.
It will not be removed if it's missing π€
I feel that would be a rather confusing UX if someone, for example, that had an ingestion pipeline deployed as an API endpoint
Maybe that could be another mode? Unsure
@Logan M , For my use case, I am implementing a Q&A RAG pipeline over a set of documents in a folder. Users can add or remove files from this folder. Every time files are updated (meaning added/removed), I am updating my index. If a file is removed by the user, I think it makes sense to remove it from docstore and vector store as well, right? Otherwise, users might get a question answered based on the deleted document.
Do you think is this something I should implement outside of Llamaindex?
Probably for now I would implement outside of llama-index. Unless you want to make a PR to add the mode to the current ingestion pipeline
By mode, do you mean a new docstrategy? like UPSERT, DUPLICATES_ONLY?
something like UPSERT_AND_DELETE
Got it! I'll give it a try. I have some time right now. Thanks @Logan M
I think to detect missing docs, you'll want to use docstore.get_all_document_hashes()
it returns a dictionary of hash -> id
Lol thanks @Logan M ! I was just looking at the code to find this π Do you read minds lol?
@Logan M , I am working on the PR for upserts followed by delete. I have a quick question. Since I am inputting the documents in the ingestion pipeline, ref_doc_id is None but delete_ref_doc looks at f"{self._namespace}/ref_doc_info" to decide for deletion. I don't have this collection in my remote docstore. I am able to delete from the vector store. Do you know how it works when you are deleting from the docstore in the UPSERTS mode?
My guess it that it doesn't work for the UPSERTS mode as well if documents are inputted. I haven't tested it though
It works fine for documents as inputs actually (or at least, it did in my testing)
notice this line
ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
https://github.com/run-llama/llama_index/blob/d896b3a30e4e1aff7a574bbb2379c7b3d7e6c229/llama_index/ingestion/pipeline.py#L290Maybe not exactly straight forward, but if ref_doc_id is
None
it will use the normal id
Then when adding documents, notice we call these two lines
self.docstore.add_documents([node])
self.docstore.set_document_hash(ref_doc_id, node.hash)
That "ref_doc_id" is either the actual ref doc id or the normal document ID
When the data is eventually inserted into the vector db, it should have been chunked at some point into nodes, which will have a
ref_doc_id
pointing to that original input. And since most vector stores are supposed to have
delete()
methods that delete all nodes with a particular
ref_doc_id
, it should work.
In fact, after writing this out, I think this will only work if you input documents and have a node parser/text splitter in your pipeline π
Room for improvement
hmm, let me triple check before you dive into this
ah ok, so it doesn't have the namespace/ref_doc_info
collection, but it doesn't need it in order to work
Or at least, since we aren't deleting from the docstore, it works π
I think in your case, you can probably use delete_document()
?
If not, probably some updates are needed to the logic
Let me check delete_document(). Thanks for your help, @Logan M
I think the delete works actually in upsert? I just tested
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
],
docstore=SimpleDocumentStore(),
vector_store=vector_store
)
If I ingest two documents, obviously those are both inserted.
print(pipeline.docstore._kvstore._data['docstore/data'].keys())
> dict_keys(['data/test1.txt', 'data/test2.txt'])
If I modify one of the documents, and add a new document, the docstore correctly reflects this
print(pipeline.docstore._kvstore._data['docstore/data'].keys())
> dict_keys(['data/test1.txt', 'data/test2.txt', 'data/test3.txt'])
Or maybe the delete isn't working, and it's just overwriting the key...
Thanks for looking into this, @Logan M . I think it's possible that it's just overwriting the key. For my use case, delete_document() worked so we should be good I believe.