Pipeline setup
def setup_pipeline(self):
"""Set up the entire pipeline."""
logger.info(f"Setting up pipeline for {self.tenant}/{self.project}...")
try:
# Setup necessary components
if self.application_config.use_azure_blob:
documents = self.get_blobs_from_container()
logger.info(f"{documents[0]}")
Function to get blobs from the container
def get_blobs_from_container(self):
if self.tenant not in self.azure_blob_manager.list_containers():
logger.info(f"Creating container for tenant {self.tenant}")
self.azure_blob_manager.create_container(self.tenant)
logger.info(f"Uploading data from from {self.tenant}")
self.azure_blob_manager.upload_directory(self.tenant,self.tenant, self.project, self.application_config.data_path)
logger.info(f"Loading documents from Existing Azure Blob Storage for tenant {self.tenant} ---> {self.application_config.data_path}")
reader = AzStorageBlobReader(
container_name=self.tenant,
connection_string="",
)
documents = reader.load_data()
return documents
Ingesting nodes to vector store
ingestion = IngestionPipeline(
transformations=[insert_metadata, parser, embed_model],
vector_store=vector_store,
docstore=docstore,
docstore_strategy=DocstoreStrategy.UPSERTS,
)
nodes = ingestion.run(
documents=documents, tenant=self.tenant, project=self.project
)