def load_and_index(self):
try:
docs = self.loader()
loaded_doc_ids = {doc.id_ for doc in docs}
print(len(docs))
if docs:
for doc in docs:
nodes = self.pipeline.run(documents=[doc])
print(f"Ingestão de {len(nodes)} Nodes do documento: {doc.metadata['file_name']}")
docstore_hashes = set(self.pipeline.docstore.get_all_document_hashes().keys())
print(f"Hashs docstore: {docstore_hashes}")
docstore_ids = set(self.pipeline.docstore.get_all_ref_doc_info().keys())
print(f"IDs docstore: {docstore_ids}")
docs_exclu = docstore_ids - loaded_docs_ids
if self.exclude_docs:
for doc_id in docs_exclu:
self.pipeline.docstore.delete_document(doc_id)
self.index.delete_ref_doc(doc_id, delete_from_docstore=True)
print(f"Doc delete: {doc_id}")
from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy pipeline = IngestionPipeline( docstore=docstore, vector_store=vector_store, transformations=[...], docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE,
try:
docs = self.loader()
if docs:
for doc in docs:
print(type(doc.id_))
print(doc.id_)
print(type(doc.doc_id))
print(doc.doc_id)
nodes = self.pipeline.run(documents=[doc])
print(f" {len(nodes)} Nodes doc : {doc.metadata['file_name']}")
except Exception as e:
print(f"error: {e}")
<class 'str'>
d7942a16845189cd57fc00bf0d88a339
<class 'str'>
d7942a16845189cd57fc00bf0d88a339
error: doc_id b'd7942a16845189cd57fc00bf0d88a339' not found.
client = QdrantClient(self.qdrant_host, port=self.qdrant_port)
vector_store = QdrantVectorStore(client=client, collection_name=self.qdrant_collection,
enable_hybrid=True, batch_size=20)
pipeline = IngestionPipeline(
transformations=transformations,
docstore=RedisDocumentStore.from_host_and_port(self.redis_host, self.redis_port,
namespace=self.docstore_namespace),
vector_store=vector_store,
cache=IngestionCache(
cache=RedisCache.from_host_and_port(self.redis_host, self.redis_port),
collection=self.redis_collection,
),
docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE,
)
def loader(self):
documents = []
s3_client = boto3.client('s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key
)
s3r = boto3.resource('s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key)
bucket = s3r.Bucket(self.bucket)
objects = list(bucket.objects.all())
if len(objects) > 0:
for obj in tqdm(objects):
key = obj.key
# Obter o objeto atual
doc = s3_client.get_object(Bucket=self.bucket, Key=key)
conteudo = doc['Body'].read().decode('utf-8')
etag = doc['ETag'].strip('"')
etag = str(etag)
document = Document(id_=etag, text=conteudo, metadata={'file_name': key})
documents.append(document)
else:
print("No contents found.")
return documents
from llama_index.core.ingestion import IngestionPipeline, DocstoreStrategy from llama_index.core import Document, VectorStoreIndex, MockEmbedding from llama_index.vector_stores.qdrant import QdrantVectorStore from llama_index.storage.docstore.redis import RedisDocumentStore from llama_index.core.node_parser import SentenceSplitter from qdrant_client import QdrantClient documents = [Document.example(), Document.example(), Document.example()] for i, doc in enumerate(documents): doc.id_ = str(i) vector_store = QdrantVectorStore("ingest_test7", client=QdrantClient(":memory:")) docstore = RedisDocumentStore.from_host_and_port("localhost", 6379, "ingest_test7") pipeline = IngestionPipeline( docstore=docstore, vector_store=vector_store, transformations=[ SentenceSplitter(chunk_size=25, chunk_overlap=0), MockEmbedding(embed_dim=256) ], docstore_strategy=DocstoreStrategy.UPSERTS_AND_DELETE ) pipeline.run(documents=documents) index = VectorStoreIndex.from_vector_store(vector_store, embed_model=MockEmbedding(embed_dim=256)) nodes = index.as_retriever(similarity_top_k=100).retrieve("test") print("Vector store has " + str(len(nodes)) + " nodes") nodes = list(docstore.docs.values()) print("Docstore has " + str(len(nodes)) + " nodes") # Now we can run the pipeline with two less pipeline.run(documents=documents[:-2]) nodes = index.as_retriever(similarity_top_k=100).retrieve("test") print("Vector store has " + str(len(nodes)) + " nodes") nodes = list(docstore.docs.values()) print("Docstore has " + str(len(nodes)) + " nodes")