Find answers from the community

Updated 11 months ago

Here's the min reproducible code:

Here's the min reproducible code:

from llama_index.node_parser import SimpleNodeParser
from llama_index.ingestion import IngestionPipeline
from llama_index.vector_stores import PGVectorStore
from llama_index.embeddings import HuggingFaceEmbedding
from llama_index import SimpleDirectoryReader

loader = SimpleDirectoryReader(
input_files = [""], # Any file that can be broken into at least 2 nodes
filename_as_id=True,
)

documents = loader.load_data()

embed_model = embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5", device = "cuda:0")
pg_vector_store = PGVectorStore.from_params(
database= "",
host= "",
password= "",
port="5432",
user="",
table_name= "nodes",
embed_dim=384, # bge-small-v1.5 embedding dimension
hybrid_search=True,
text_search_config="english",
)

pipeline = IngestionPipeline(
transformations=[
SimpleNodeParser(chunk_size=512, chunk_overlap=20),
embed_model,
],
vector_store = pg_vector_store,
)

nodes = pipeline.run(documents = documents)
L
A
50 comments
Some loaders return multiple document objects from a single file path
@Logan M , I am using S3 loader from llama_hub. What do you mean by that? so this code has nothing to do with keeping track of node to document relationship? Is that feasible and available in llamaindex yet?
no it is used for keeping track of node-doc relationships. I just mean some document loaders (like the PDF loader) will return multiple document objects for the same file path. So we cant set the filename as the doc_id because there will be duplicates, hence the part thing
Got it but in this case, I am using Docx Reader which I believe just returns one document object? Since I am using SentenceSplitter to split a document into nodes, how do I use the same logic to keep track of node-doc relationship? By using the default code, I am getting the same node id for all nodes of a docx document. That doesn't sound right
By using the default code, I am getting the same node id for all nodes of a docx document.

That's doesn't sound entirely correct. The actual node.id_ is automatically generated and unique.

node.ref_doc_id points to the the parent document id, which is also unique

I'm guessing you are seeing doc_id in the vector store metadata -- this is just some backwards compat field that also points to ref_doc_id
https://github.com/run-llama/llama_index/blob/04ccaf46c860137bd126e95a4252f5bf51ad953e/llama_index/vector_stores/utils.py#L59
This is what I am trying to do:
https://discord.com/channels/1059199217496772688/1185228922577748101/1185246361793331261

Simply following the example isn't working for me and all nodes are inserted every time. I am trying to index the same data and dedup. When I look at the upsert code, I see that it uses ref_doc_id and hash to decide what to do. My questions are:
1) What is hash here?

2) ref_doc_id is not unique at node level. how to make this work in that case?
1) hash is just a hash of document metadata + document text

2) I think you are misunderstanding the code slightly. (also fyi, should probably be doing pipeline.run(documents=documents)

The initial inputs just get combined into one list (i.e. all nodes/documents get merged into a single list here)
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L322C32-L322C32

Then, it enters the upsert function here with your list of documents
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L327

Since you are inputting documents, the ref_doc_id will be none at this step, causing it to use the id_
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L290

If the id was already added to the docstore, this will be a hit and we can compare the hash
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L292

If the hash doesn't exist, we insert as normal
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L293C4-L293C4

If the hash has changed, we delete and re-add
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L298

Otherwise, the hash is unchaged, and we skip
https://github.com/run-llama/llama_index/blob/22cef9cb22f092e50718ab166f3b0dfad4e40168/llama_index/ingestion/pipeline.py#L307

Now, this process relies on you attaching and persisting a docstore. I noticed in your code you did not attach a docstore to the pipeline
@Logan M , First of all, thank you so much for clarifying this!

1) Something must be wrong with my understanding. My understanding is that if I attach and persist a docstore, and index the exact same data twice, it shouldn't be added twice in my vector_store. Is that understanding correct?

2) If that is correct, can you review this code and tell me what I am doing wrong? Indexing same data twice is adding them twice in the vector_store:

from llama_index.node_parser import SimpleNodeParser
from llama_index.ingestion import IngestionPipeline, DocstoreStrategy
from llama_index.vector_stores import PGVectorStore
from llama_index.embeddings import HuggingFaceEmbedding
from llama_index import SimpleDirectoryReader
from llama_index.storage.docstore import RedisDocumentStore

loader = SimpleDirectoryReader(
input_files = ["Afghanistan.docx"], # Any file that can be broken into at least 2 nodes
)

documents = loader.load_data()

embed_model = embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5", device = "cuda:0")
pg_vector_store = PGVectorStore.from_params(
database= "",
host= "",
password= "",
port="5432",
user="",
table_name= "nodes",
embed_dim=384, # bge-small-v1.5 embedding dimension
hybrid_search=True,
text_search_config="english",
)

pipeline = IngestionPipeline(
transformations=[
SimpleNodeParser(chunk_size=512, chunk_overlap=20),
embed_model,
],
docstore = RedisDocumentStore.from_host_and_port(
"127.0.0.1", 6379, namespace="temp_doc_store"),
docstore_strategy=DocstoreStrategy.UPSERTS,
vector_store = pg_vector_store,
)

nodes = pipeline.run(documents = documents)
I just ran this notebook, swapped the redis vector store with pgvector, and it worked fine πŸ€” https://docs.llamaindex.ai/en/stable/examples/ingestion/redis_ingestion_pipeline.html
Plain Text
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embed_model,
    ],
    docstore=RedisDocumentStore.from_host_and_port(
        "localhost", 6379, namespace="document_store2"
    ),
    vector_store=PGVectorStore.from_params(
      host="localhost", 
      port="5432", 
      table_name="test2", 
      user="postgres", 
      password="password", 
      database="vector_db", 
      embed_dim=384
    ),
    docstore_strategy=DocstoreStrategy.UPSERTS,
)
can you also share the document loader part?
It's the exact same as the notebook
Plain Text
from llama_index import SimpleDirectoryReader

# load documents with deterministic IDs
documents = SimpleDirectoryReader(
    "./test_redis_data", filename_as_id=True
).load_data()
The only thing I changed in that notebook was replacing the vector store with postgres, and removing the ingestion cache
Ah I see! Let me just clear my redis and try it again. Thanks @Logan M
Thanks @Logan M It worked after cleaning up redis.
nice! That's a relief hahaha
@Logan M , I have a follow-up question. If I index some data and docstore and vector store are attached. If I index the data again but this time I am missing a file, will the missing file be removed from the remote docstore and the vector store? I tried it with the current setting and missing file is still present at both places.
It will not be removed if it's missing πŸ€”

I feel that would be a rather confusing UX if someone, for example, that had an ingestion pipeline deployed as an API endpoint
Maybe that could be another mode? Unsure
@Logan M , For my use case, I am implementing a Q&A RAG pipeline over a set of documents in a folder. Users can add or remove files from this folder. Every time files are updated (meaning added/removed), I am updating my index. If a file is removed by the user, I think it makes sense to remove it from docstore and vector store as well, right? Otherwise, users might get a question answered based on the deleted document.

Do you think is this something I should implement outside of Llamaindex?
Probably for now I would implement outside of llama-index. Unless you want to make a PR to add the mode to the current ingestion pipeline
By mode, do you mean a new docstrategy? like UPSERT, DUPLICATES_ONLY?
something like UPSERT_AND_DELETE
or some better name lol
Got it! I'll give it a try. I have some time right now. Thanks @Logan M
I think to detect missing docs, you'll want to use docstore.get_all_document_hashes() it returns a dictionary of hash -> id
Lol thanks @Logan M ! I was just looking at the code to find this πŸ™‚ Do you read minds lol?
@Logan M , I am working on the PR for upserts followed by delete. I have a quick question. Since I am inputting the documents in the ingestion pipeline, ref_doc_id is None but delete_ref_doc looks at f"{self._namespace}/ref_doc_info" to decide for deletion. I don't have this collection in my remote docstore. I am able to delete from the vector store. Do you know how it works when you are deleting from the docstore in the UPSERTS mode?
My guess it that it doesn't work for the UPSERTS mode as well if documents are inputted. I haven't tested it though
It works fine for documents as inputs actually (or at least, it did in my testing)

notice this line
ref_doc_id = node.ref_doc_id if node.ref_doc_id else node.id_
https://github.com/run-llama/llama_index/blob/d896b3a30e4e1aff7a574bbb2379c7b3d7e6c229/llama_index/ingestion/pipeline.py#L290

Maybe not exactly straight forward, but if ref_doc_id is None it will use the normal id

Then when adding documents, notice we call these two lines
Plain Text
self.docstore.add_documents([node])
self.docstore.set_document_hash(ref_doc_id, node.hash)

That "ref_doc_id" is either the actual ref doc id or the normal document ID

When the data is eventually inserted into the vector db, it should have been chunked at some point into nodes, which will have a ref_doc_id pointing to that original input. And since most vector stores are supposed to have delete() methods that delete all nodes with a particular ref_doc_id, it should work.
In fact, after writing this out, I think this will only work if you input documents and have a node parser/text splitter in your pipeline πŸ˜… Room for improvement
delete_ref_doc uses get_ref_doc_info to find the ref_doc_info
https://github.com/run-llama/llama_index/blob/4ae3e1d0b028d3059eb2da989cd45ac6ed0e4aab/llama_index/storage/docstore/keyval_docstore.py#L215C24-L215C24

get_ref_doc_info looks at self._ref_doc_collection
https://github.com/run-llama/llama_index/blob/4ae3e1d0b028d3059eb2da989cd45ac6ed0e4aab/llama_index/storage/docstore/keyval_docstore.py#L127

self._ref_doc_collection points to f"{self._namespace}/ref_doc_info"
https://github.com/run-llama/llama_index/blob/4ae3e1d0b028d3059eb2da989cd45ac6ed0e4aab/llama_index/storage/docstore/keyval_docstore.py#L50

I don't have namespace/ref_doc_info in my docstore.My docstore only has namespace/data and namespace/metadata.

When I look at add_documents code here

https://github.com/run-llama/llama_index/blob/4ae3e1d0b028d3059eb2da989cd45ac6ed0e4aab/llama_index/storage/docstore/keyval_docstore.py#L87

namespace/ref_doc_info is added based on the condition if the node is TextNode and node.ref_doc_id is not None. In my case node is TextNone so my guess for not having namespace/ref_doc_info in my doctore is that node.ref_doc_id is None

If you are sure it works for UPSERTS when documents are inputted, I can do some more debugging but wanted to check with you before I spend more time on this
hmm, let me triple check before you dive into this
ah ok, so it doesn't have the namespace/ref_doc_info collection, but it doesn't need it in order to work
Or at least, since we aren't deleting from the docstore, it works πŸ˜…
I think in your case, you can probably use delete_document() ?
If not, probably some updates are needed to the logic
In case of update in upsert, I think we are deleting from docstore here but since the error flag is set to False, we don't see an error but it doesn't delete from docstore I believe:

https://github.com/run-llama/llama_index/blob/4ae3e1d0b028d3059eb2da989cd45ac6ed0e4aab/llama_index/ingestion/pipeline.py#L299C29-L299C29
Let me check delete_document(). Thanks for your help, @Logan M
I think the delete works actually in upsert? I just tested

Plain Text
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
    ],
    docstore=SimpleDocumentStore(),
    vector_store=vector_store
)


If I ingest two documents, obviously those are both inserted.

Plain Text
print(pipeline.docstore._kvstore._data['docstore/data'].keys())
> dict_keys(['data/test1.txt', 'data/test2.txt'])


If I modify one of the documents, and add a new document, the docstore correctly reflects this

Plain Text
print(pipeline.docstore._kvstore._data['docstore/data'].keys())
> dict_keys(['data/test1.txt', 'data/test2.txt', 'data/test3.txt'])
Or maybe the delete isn't working, and it's just overwriting the key...
tricky πŸ₯΄
Thanks for looking into this, @Logan M . I think it's possible that it's just overwriting the key. For my use case, delete_document() worked so we should be good I believe.
ok awesome πŸ™‚
Add a reply
Sign up and join the conversation on Discord