pg_vector_store = PGVectorStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True), table_name="embeddings", embed_dim=384, ) pipeline = IngestionPipeline( transformations=[ HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), ], docstore=postgres_docstore, vector_store=pg_vector_store, )
validation_error = ValidationError(model='IngestionPipeline', errors=[{'loc': ('vector_store',), 'msg': "Can't instantiate abstract class...VectorStore without an implementation for abstract methods 'add', 'client', 'delete', 'query'", 'type': 'type_error'}]) def __init__(__pydantic_self__, **data: Any) -> None: """ Create a new model by parsing and validating input data from keyword arguments. Raises ValidationError if the input data cannot be parsed to form a valid model. """ # Uses something other than `self` the first arg to allow "self" as a settable attribute values, fields_set, validation_error = validate_model(__pydantic_self__.__class__, data) if validation_error: > raise validation_error E pydantic.v1.error_wrappers.ValidationError: 1 validation error for IngestionPipeline E vector_store E Can't instantiate abstract class BasePydanticVectorStore without an implementation for abstract methods 'add', 'client', 'delete', 'query' (type=type_error)
def test_toy_ingest_pipeline_get_ref_nodes(): postgres_docstore = PostgresDocumentStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True) ) pipeline = IngestionPipeline( transformations=[ HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), ], docstore=postgres_docstore, # vector_store=pg_vector_store, ) documents = SimpleDirectoryReader("tests/fixtures/docs").load_data() nodes_to_insert = SentenceSplitter(chunk_size=200).get_nodes_from_documents(documents) assert len(nodes_to_insert) > 30 assert nodes_to_insert[0].ref_doc_id != None nodes = pipeline.run(documents=nodes_to_insert) assert len(nodes) > 30 docs = pipeline.docstore.docs assert len(docs) > 1 ref_nodes = pipeline.docstore.get_all_ref_doc_info().items() assert len(ref_nodes) == 1 #delte the ref node print ("ref_nodes", ref_nodes) pipeline.docstore.delete_ref_doc( list(ref_nodes)[0][0])
def test_toy_ingest_pipeline_get_ref_nodes_with_vector_store(): postgres_docstore = PostgresDocumentStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True) ) #fixme: It doesn't seem to delete everything when i do ref_doc_delete pg_vector_store = PGVectorStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True), table_name="embeddings", embed_dim=384, ) pipeline = IngestionPipeline( transformations=[ HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), ], docstore=postgres_docstore, vector_store=pg_vector_store, # docstore_strategy=DocstoreStrategy.UPSERTS ) documents = SimpleDirectoryReader("tests/fixtures/docs").load_data() nodes_to_insert = SentenceSplitter(chunk_size=200).get_nodes_from_documents(documents) assert len(nodes_to_insert) > 30 print("number of nodes inserting", len(nodes_to_insert)) assert nodes_to_insert[0].ref_doc_id != None nodes = pipeline.run(documents=nodes_to_insert) assert len(nodes) > 30 #fails, only one node was inserted docs = pipeline.docstore.docs assert len(docs) > 30 #fails, one node was inserted ref_nodes = pipeline.docstore.get_all_ref_doc_info() print ("ref_nodes", ref_nodes)
pipeline.run
on 30 nodes all with the same ref_doc_id should result in only a single entry into the vector store? Upon further inspection the only node being inserted is the last node in the list of 30. It is successfully embedded as well. Also, if i start removing nodes from the end of the list of nodes i'm trying to ingest with the pipeline, only the last node in the list is ever inserted. This is demonstrated in the test and has got me unbelievably confuseddef test_toy_ingest_pipeline_get_ref_nodes_with_vector_store(): postgres_docstore = PostgresDocumentStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True) ) #fixme: It doesn't seem to delete everything when i do ref_doc_delete pg_vector_store = PGVectorStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True), table_name="embeddings", embed_dim=384, ) pipeline = IngestionPipeline( transformations=[ SentenceSplitter(chunk_size=200), HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5"), ], docstore=postgres_docstore, vector_store=pg_vector_store, ) documents = SimpleDirectoryReader("tests/fixtures/docs").load_data() nodes = pipeline.run(documents=documents) assert len(nodes) > 30 docs = pipeline.docstore.docs assert len(docs) == 1 ref_nodes = pipeline.docstore.get_all_ref_doc_info() ids_to_delete = ref_nodes.keys() assert len(ref_nodes.items()) == 1 #fails, get_all_ref_doc_info always returns {}. I did some debugging and determined that the sentence splitter doesn't maintain the ref_doc_id's when used in the pipeline, but does when used in isolation for some reason for id_to_delete in ids_to_delete: print ("deleting", id_to_delete) pipeline.docstore.delete_ref_doc(id_to_delete)
>>> from llama_index.core.node_parser import SentenceSplitter >>> splitter = SentenceSplitter(chunk_size=20, chunk_overlap=0) >>> document = Document.example() >>> document.id_ '04f284f9-ec97-4a4e-b738-302c3094d0cb' >>> nodes = splitter([document]) >>> nodes[0].ref_doc_id '04f284f9-ec97-4a4e-b738-302c3094d0cb' >>> nodes[5].ref_doc_id '04f284f9-ec97-4a4e-b738-302c3094d0cb' >>> nodes[10].ref_doc_id '04f284f9-ec97-4a4e-b738-302c3094d0cb' >>>
from llama_index.core.ingestion import IngestionPipeline from llama_index.core import Document, VectorStoreIndex, MockEmbedding from llama_index.vector_stores.qdrant import QdrantVectorStore from llama_index.core.storage.docstore import SimpleDocumentStore from llama_index.core.node_parser import SentenceSplitter from qdrant_client import QdrantClient documents = [Document.example()] * 10 for i, doc in enumerate(documents): doc.id_ = str(i) vector_store = QdrantVectorStore("ingest_test2", client=QdrantClient(host="localhost", port=6333)) docstore = SimpleDocumentStore() pipeline = IngestionPipeline( docstore=docstore, vector_store=vector_store, transformations=[ SentenceSplitter(chunk_size=25, chunk_overlap=0), MockEmbedding(embed_dim=256) ] ) pipeline.run(documents=documents) index = VectorStoreIndex.from_vector_store(vector_store, embed_model=MockEmbedding(embed_dim=256)) nodes = index.as_retriever(similarity_top_k=100).retrieve("test") print("Vector store has " + str(len(nodes)) + " nodes") nodes = list(docstore.docs.values()) print("Docstore has " + str(len(nodes)) + " nodes") # Now we can run the pipeline with the same documents again pipeline.run(documents=documents) nodes = index.as_retriever(similarity_top_k=100).retrieve("test") print("Vector store has " + str(len(nodes)) + " nodes") nodes = list(docstore.docs.values()) print("Docstore has " + str(len(nodes)) + " nodes")
Vector store has 19 nodes Docstore has 1 nodes Vector store has 19 nodes Docstore has 1 nodes
def test_ref_doc_info(): pg_docstore = PostgresDocumentStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True) ) pg_vector_store = PGVectorStore.from_params( **POSTGRES_SETTINGS.model_dump(exclude_none=True), table_name="embeddings", embed_dim=384, ) pipeline = IngestionPipeline( transformations=[ SentenceSplitter(chunk_size=200), MockEmbedding(embed_dim=384), ], docstore=pg_docstore, vector_store=pg_vector_store, # docstore_strategy=DocstoreStrategy.UPSERTS ) document = Document.example() nodes = pipeline.run(documents=[document]) index = VectorStoreIndex.from_vector_store(pg_vector_store, embed_model=MockEmbedding(embed_dim=256)) all_ref_doc_info = pg_docstore.get_all_ref_doc_info() assert all_ref_doc_info != {} # fails, ref doc info is always empty doc_ref_doc_info = index.ref_doc_info(document.id_) assert doc_ref_doc_info != {} # fails, ref doc info is always empty
docstore.add_documents()
on the top-level document objectsid_to_doc = docstore.docs
for doc in pg_docstore.docs.values(): pg_docstore.delete_document(doc.id_) index.delete_ref_doc(doc.id_)