transformations = [ HierarchicalNodeParser.from_defaults(chunk_sizes=[4096, 2048]), Settings.embed_model, ] logger.info("Creating pipeline") pipeline = IngestionPipeline(transformations=transformations) # pipeline.disable_cache = False logger.info('Num workers: ' + str(os.cpu_count())) nodes = pipeline.run( documents=createHierarchicalIndexRequest.Documents, num_workers=4, )
pipeline.run()
call. If I remove num_workers
arg it runs but its extremely slow, any advice?from llama_index.core.node_parser import HierarchicalNodeParser from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.core.ingestion import IngestionPipeline from llama_index.core import Document def main(): pipeline = IngestionPipeline(transformations=[ HierarchicalNodeParser.from_defaults(), OpenAIEmbedding(), ], disable_cache=True ) documents = [Document.example()]*100 import time start = time.time() pipeline.run(documents=documents, num_workers=1) end = time.time() print(end-start) start = time.time() pipeline.run(documents=documents, num_workers=4) end = time.time() print(end-start) if __name__ == "__main__": main()
import asyncio from llama_index.core.node_parser import HierarchicalNodeParser from llama_index.embeddings.openai import OpenAIEmbedding from llama_index.core.ingestion import IngestionPipeline from llama_index.core import Document async def main(): pipeline = IngestionPipeline(transformations=[ HierarchicalNodeParser.from_defaults(), OpenAIEmbedding(), ], disable_cache=True ) documents = [Document.example()]*100 import time start = time.time() await pipeline.arun(documents=documents) end = time.time() print(end-start) if __name__ == "__main__": asyncio.run(main())
def run_pipeline_sync(nodes): transformations = [ title_extractor, qa_extractor, SummaryExtractor(summaries=["self"]), ] pipeline = IngestionPipeline(transformations=transformations) pipeline.disable_cache = True async def async_run(): return await pipeline.arun(nodes=nodes, num_workers=4) transformed_nodes = asyncio.run(async_run()) return transformed_nodes
def run_pipeline_sync(nodes): # llm = AzureOpenAI( # model="gpt-4o", # deployment_name='EntelligenceAI', # temperature=0.0, # azure_endpoint=AZURE_OPENAI_ENDPOINT, # api_key=AZURE_OPENAI_API_KEY, # api_version=AZURE_OPENAI_VERSION, # ) llm = Settings.llm transformations = [ TitleExtractor(nodes=5, llm=llm), QuestionsAnsweredExtractor(llm=llm,questions=3), SummaryExtractor(llm=llm,summaries=["self"]), ] pipeline = IngestionPipeline(transformations=transformations) pipeline.disable_cache = True # Define an async wrapper function to call the async method async def async_run(): return await pipeline.arun(nodes=nodes, show_progress=True) # Run the async function using asyncio.run() transformed_nodes = asyncio.run(async_run()) return transformed_nodes
num_workers
it was much faster