Find answers from the community

s
F
Y
a
P
Updated last month

pipeline is throwing pydantic validation

pipeline is throwing pydantic validation errors when passing in a valid vector store like this:

Plain Text
pg_vector_store = PGVectorStore.from_params(
                    **POSTGRES_SETTINGS.model_dump(exclude_none=True),
                    table_name="embeddings",
                    embed_dim=384,
                )
  pipeline = IngestionPipeline(
    transformations=[
      HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
    ],
    docstore=postgres_docstore,
    vector_store=pg_vector_store,
  )


PGVectorStore inherits from BasePydanticVectorStore and implements all the abstract methods, but i am getting this error:

Plain Text
validation_error = ValidationError(model='IngestionPipeline', errors=[{'loc': ('vector_store',), 'msg': "Can't instantiate abstract class...VectorStore without an implementation for abstract methods 'add', 'client', 'delete', 'query'", 'type': 'type_error'}])

    def __init__(__pydantic_self__, **data: Any) -> None:
        """
        Create a new model by parsing and validating input data from keyword arguments.
    
        Raises ValidationError if the input data cannot be parsed to form a valid model.
        """
        # Uses something other than `self` the first arg to allow "self" as a settable attribute
        values, fields_set, validation_error = validate_model(__pydantic_self__.__class__, data)
        if validation_error:
>           raise validation_error
E           pydantic.v1.error_wrappers.ValidationError: 1 validation error for IngestionPipeline
E           vector_store
E             Can't instantiate abstract class BasePydanticVectorStore without an implementation for abstract methods 'add', 'client', 'delete', 'query' (type=type_error)
L
a
28 comments
Are you mixing legacy and non-legacy imports?
Thats what it seems like at least
bingo bongo tingo tongo, Mr. Logan is right again. I didn't know you can't mix those... So you have to commit to either using all legacy or no legacy imports?
Yea exactly. Otherwise things like isinstance checks don't work
Similar to pydantic if you are familiar with their pydantic.v1 layer
Great to know, thanks a ton. The py-issues-and-help channel is by far the most valuable thing about llama_index. Should be talked about more.
One issue that I find is common within llama_index is the lack of errors when things go wrong. For example, I'm writing some tests to make sure i know how the ingestion pipeline works and I'm am getting very unexpectedly different behavior in two very similar cases.

First I have this test without a vector store, just to make sure i can manipulate the docstore in a predictable way, it inserts about 40 nodes (i can tell by looking at my postgres db)

Plain Text
def test_toy_ingest_pipeline_get_ref_nodes():
  postgres_docstore = PostgresDocumentStore.from_params(
          **POSTGRES_SETTINGS.model_dump(exclude_none=True)
        ) 
  pipeline = IngestionPipeline(
    transformations=[
      HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
    ],
    docstore=postgres_docstore,
    # vector_store=pg_vector_store,
  )

  documents = SimpleDirectoryReader("tests/fixtures/docs").load_data()
  nodes_to_insert = SentenceSplitter(chunk_size=200).get_nodes_from_documents(documents)
  assert len(nodes_to_insert) > 30
  assert nodes_to_insert[0].ref_doc_id != None
  nodes = pipeline.run(documents=nodes_to_insert)
  assert len(nodes) > 30
  docs = pipeline.docstore.docs
  assert len(docs) > 1
  ref_nodes = pipeline.docstore.get_all_ref_doc_info().items()
  assert len(ref_nodes) == 1
  #delte the ref node
  print ("ref_nodes", ref_nodes)
  pipeline.docstore.delete_ref_doc( list(ref_nodes)[0][0])
Then I have this test which I expect to act similarly but instead it only generates one node and 1 embedding with no errors thrown and no indication that anything has gone wrong. What am I missing about how the pipeline deals with vector stores?

Plain Text
def test_toy_ingest_pipeline_get_ref_nodes_with_vector_store():
  postgres_docstore = PostgresDocumentStore.from_params(
          **POSTGRES_SETTINGS.model_dump(exclude_none=True)
        ) 
  #fixme: It doesn't seem to delete everything when i do ref_doc_delete
  pg_vector_store = PGVectorStore.from_params(
                    **POSTGRES_SETTINGS.model_dump(exclude_none=True),
                    table_name="embeddings",
                    embed_dim=384,
                )
  pipeline = IngestionPipeline(
    transformations=[
      HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
    ],
    docstore=postgres_docstore,
    vector_store=pg_vector_store,
    # docstore_strategy=DocstoreStrategy.UPSERTS
  )

  documents = SimpleDirectoryReader("tests/fixtures/docs").load_data()
  nodes_to_insert = SentenceSplitter(chunk_size=200).get_nodes_from_documents(documents)
  assert len(nodes_to_insert) > 30
  print("number of nodes inserting", len(nodes_to_insert))
  assert nodes_to_insert[0].ref_doc_id != None
  nodes = pipeline.run(documents=nodes_to_insert)
  assert len(nodes) > 30 #fails, only one node was inserted
  docs = pipeline.docstore.docs
  assert len(docs) > 30 #fails, one node was inserted
  ref_nodes = pipeline.docstore.get_all_ref_doc_info()
  print ("ref_nodes", ref_nodes)
Hmm, so attaching a vector store or not doesn't really change much

What does matter is what the docstore store is doing. Which on the ingestion pipeline, the docstore is basically trying to manage uperts

So what happens is
  • you input a document/node through the pipeline
  • the ID is used to do a lookup in the docstore
  • if that ID is a hit, it gets the old hash stored in the docstore, and compares it to the incoming hash
  • if the hash is different, it deletes from the docstore (and vector store, if attached), then does a new insert
  • if the ID lookup is a miss, it treats it as a new input, and stores the ID and hash into the docstore, and then does all the transformations and inserts into the vector store (if attached)
Does that explanation help debug, or nah? πŸ˜…
Thank you for the explanation, i had figured that out through much trial and error.

Is there a situation in which calling pipeline.run on 30 nodes all with the same ref_doc_id should result in only a single entry into the vector store? Upon further inspection the only node being inserted is the last node in the list of 30. It is successfully embedded as well. Also, if i start removing nodes from the end of the list of nodes i'm trying to ingest with the pipeline, only the last node in the list is ever inserted. This is demonstrated in the test and has got me unbelievably confused
πŸ€” Sounds pretty sus haha.

Normally, I would let the pipeline run the splitter as well, I wonder if thats related... Let me see if I can reproduce the issue
I didn't realize that the deduping was related to the ref_doc_id, I've been debugging and determined that is why it wasn't working. All my nodes had the same ref_doc_id so they got deduped. Unfortunatley when i let the pipeline handle the splitting it doesn't store the ref_doc so i have no way of determining how many documents (or which documents) have been ingested
Plain Text
def test_toy_ingest_pipeline_get_ref_nodes_with_vector_store():
  postgres_docstore = PostgresDocumentStore.from_params(
          **POSTGRES_SETTINGS.model_dump(exclude_none=True)
        ) 
  #fixme: It doesn't seem to delete everything when i do ref_doc_delete
  pg_vector_store = PGVectorStore.from_params(
                    **POSTGRES_SETTINGS.model_dump(exclude_none=True),
                    table_name="embeddings",
                    embed_dim=384,
                )
  pipeline = IngestionPipeline(
    transformations=[
      SentenceSplitter(chunk_size=200),
      HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"),
    ],
    docstore=postgres_docstore,
    vector_store=pg_vector_store,

  )

  documents = SimpleDirectoryReader("tests/fixtures/docs").load_data()
 
  nodes = pipeline.run(documents=documents)
  assert len(nodes) > 30 
  docs = pipeline.docstore.docs
  assert len(docs) == 1 
  ref_nodes = pipeline.docstore.get_all_ref_doc_info()
  ids_to_delete = ref_nodes.keys()
  assert len(ref_nodes.items()) == 1 #fails, get_all_ref_doc_info always returns {}. I did some debugging and determined that the sentence splitter doesn't maintain the ref_doc_id's when used in the pipeline, but does when used in isolation for some reason
  for id_to_delete in ids_to_delete:
    print ("deleting", id_to_delete)
    pipeline.docstore.delete_ref_doc(id_to_delete)
The behavior from this test is why i took the sentence splitter out of the pipeline in the first place. When I let the pipeline split up the nodes the ref_doc_id's don't get maintained for some reason
I think i may be misunderstanding what the ref_doc_id's are for
ref doc id is definitely maintained though πŸ€”

For example

Plain Text
>>> from llama_index.core.node_parser import SentenceSplitter
>>> splitter = SentenceSplitter(chunk_size=20, chunk_overlap=0)
>>> document = Document.example()
>>> document.id_
'04f284f9-ec97-4a4e-b738-302c3094d0cb'
>>> nodes = splitter([document])
>>> nodes[0].ref_doc_id
'04f284f9-ec97-4a4e-b738-302c3094d0cb'
>>> nodes[5].ref_doc_id
'04f284f9-ec97-4a4e-b738-302c3094d0cb'
>>> nodes[10].ref_doc_id
'04f284f9-ec97-4a4e-b738-302c3094d0cb'
>>> 
All the nodes point to the ID of the parent document
Okay thats what i thought, and thats the behavior i was seeing when running the sentence splitter by itself
its only when running in the pipeline that ref_doc_ids seemed to not be maintained
The problem im really trying to solve is: How can i ingest documents and subsequently list (and or delete) those documents using a vector store and a document store with the pipeline abstraction.
So, I ran this, and tbh it makes sense to me

Plain Text
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core import Document, VectorStoreIndex, MockEmbedding
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from qdrant_client import QdrantClient

documents = [Document.example()] * 10
for i, doc in enumerate(documents):
    doc.id_ = str(i) 

vector_store = QdrantVectorStore("ingest_test2", client=QdrantClient(host="localhost", port=6333))
docstore = SimpleDocumentStore()

pipeline = IngestionPipeline(
    docstore=docstore,
    vector_store=vector_store,
    transformations=[
      SentenceSplitter(chunk_size=25, chunk_overlap=0), 
      MockEmbedding(embed_dim=256)
    ]
)

pipeline.run(documents=documents)

index = VectorStoreIndex.from_vector_store(vector_store, embed_model=MockEmbedding(embed_dim=256))

nodes = index.as_retriever(similarity_top_k=100).retrieve("test")
print("Vector store has " + str(len(nodes)) + " nodes")

nodes = list(docstore.docs.values())
print("Docstore has " + str(len(nodes)) + " nodes")

# Now we can run the pipeline with the same documents again
pipeline.run(documents=documents)

nodes = index.as_retriever(similarity_top_k=100).retrieve("test")
print("Vector store has " + str(len(nodes)) + " nodes")

nodes = list(docstore.docs.values())
print("Docstore has " + str(len(nodes)) + " nodes")


This outputs
Plain Text
Vector store has 19 nodes
Docstore has 1 nodes
Vector store has 19 nodes
Docstore has 1 nodes
Logan, you're the man. Thank you for taking the time to help me understand whats going on a bit better. What you've shown here works and I was able to get pretty much all the functionality i need using that strategy. I think I was thrown off course by the ref_doc methods. I wrote a test to illustrate my confusion: why is ref_doc_info empty in these cases?

Plain Text
def test_ref_doc_info():
  pg_docstore = PostgresDocumentStore.from_params(
          **POSTGRES_SETTINGS.model_dump(exclude_none=True)
        )
  pg_vector_store = PGVectorStore.from_params(
                    **POSTGRES_SETTINGS.model_dump(exclude_none=True),
                    table_name="embeddings",
                    embed_dim=384,
                )
  pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=200),
        MockEmbedding(embed_dim=384),   
      ],
    docstore=pg_docstore,
    vector_store=pg_vector_store,
    # docstore_strategy=DocstoreStrategy.UPSERTS
  )
  document = Document.example()
  nodes = pipeline.run(documents=[document])
  index = VectorStoreIndex.from_vector_store(pg_vector_store, embed_model=MockEmbedding(embed_dim=256))
  all_ref_doc_info = pg_docstore.get_all_ref_doc_info() 
  assert all_ref_doc_info != {} # fails, ref doc info is always empty
  doc_ref_doc_info  = index.ref_doc_info(document.id_)
  assert doc_ref_doc_info != {} # fails, ref doc info is always empty
I think the method you actually want is docstore.get_all_document_hashes()
Whats happening is we are calling docstore.add_documents() on the top-level document objects

If their ref_doc_id is None, than the ref_doc_info collection will be empty (which is expected)

What IS stored is the hash -> doc_id map, and optional the original document itself (by default it is stored, but you can disable storing the original text if you want)

You can see the original document(s) if you do id_to_doc = docstore.docs
Ahhh I see, the ref_doc_info collection is only populated if a document(s) in the docstore has ref_doc_id set. The way I am using it, only nodes in the vectorstore (which stores text) have a ref_doc_id. Because i'm only ever storing top level documents in the docstore, docstore.docs does what get_ref_docs would have done if i had chosen to store the child nodes in the docstore instead of the vector store. Is that correct?
More clearly:

To get "documents" ie original sources, the way to retrieve them is different depending on how you use the docstore. The docstore can be used in the following ways:

A. To store original documents along with their child nodes which requires a ref_doc_info collection to differentiate between original documents and child nodes

B. to store only original documents and have the vector store handle storing all the subnodes which have ref_doc_id set to allow for deletion like this

Plain Text
for doc in pg_docstore.docs.values():
    pg_docstore.delete_document(doc.id_)
    index.delete_ref_doc(doc.id_)
 
Yea pretty much! πŸ‘
Add a reply
Sign up and join the conversation on Discord