Find answers from the community

Updated 4 months ago

Is there an issue in IngestionPipeline

Is there an issue in IngestionPipeline when using parallel processing mode?
Plain Text
        transformations = [
            HierarchicalNodeParser.from_defaults(chunk_sizes=[4096, 2048]),
            Settings.embed_model,
        ]
        logger.info("Creating pipeline")
        pipeline = IngestionPipeline(transformations=transformations)
        # pipeline.disable_cache = False
        logger.info('Num workers: ' + str(os.cpu_count()))

        nodes = pipeline.run(
            documents=createHierarchicalIndexRequest.Documents,
            num_workers=4,
        )


My pipeline doesn't return any err messages nor executes further after pipeline.run() call. If I remove num_workers arg it runs but its extremely slow, any advice?
L
r
13 comments
Seems to work for me, but its only slightly faster to use num workers (multithreading with python isn't really real multithreading)
Plain Text
from llama_index.core.node_parser import HierarchicalNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core import Document

def main():
  pipeline = IngestionPipeline(transformations=[
    HierarchicalNodeParser.from_defaults(),
    OpenAIEmbedding(),
  ],
  disable_cache=True
  )

  documents = [Document.example()]*100

  import time

  start = time.time()
  pipeline.run(documents=documents, num_workers=1)
  end = time.time()
  print(end-start)

  start = time.time()
  pipeline.run(documents=documents, num_workers=4)
  end = time.time()
  print(end-start)

if __name__ == "__main__":
  main()
Waaay faster to just use async
Plain Text
import asyncio
from llama_index.core.node_parser import HierarchicalNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core import Document

async def main():
  pipeline = IngestionPipeline(transformations=[
    HierarchicalNodeParser.from_defaults(),
    OpenAIEmbedding(),
  ],
  disable_cache=True
  )

  documents = [Document.example()]*100

  import time

  start = time.time()
  await pipeline.arun(documents=documents)
  end = time.time()
  print(end-start)


if __name__ == "__main__":
  asyncio.run(main())
also when I try to run async using Anthropic - 'Anthropic' object has no attribute '_aclient
Plain Text
def run_pipeline_sync(nodes):
    transformations = [
        title_extractor,
        qa_extractor,
        SummaryExtractor(summaries=["self"]),
    ]
    pipeline = IngestionPipeline(transformations=transformations)
    pipeline.disable_cache = True
    
    async def async_run():
        return await pipeline.arun(nodes=nodes, num_workers=4)
    
    transformed_nodes = asyncio.run(async_run())
    return transformed_nodes
just use async πŸ˜… Your life will be easier.
I need to remove this num_workers param, multiprocessing is too flakey
yes I used async above using arun method
I used async with num_workers , now I have used without still getting the error
Okay it worked now with anthropic , thanks !
Plain Text
def run_pipeline_sync(nodes):

    # llm = AzureOpenAI(
    # model="gpt-4o",
    # deployment_name='EntelligenceAI',
    # temperature=0.0,
    # azure_endpoint=AZURE_OPENAI_ENDPOINT,
    # api_key=AZURE_OPENAI_API_KEY,
    # api_version=AZURE_OPENAI_VERSION,
    # )
    llm = Settings.llm
    transformations = [
        TitleExtractor(nodes=5, llm=llm),
        QuestionsAnsweredExtractor(llm=llm,questions=3),
        SummaryExtractor(llm=llm,summaries=["self"]),
    ]
    pipeline = IngestionPipeline(transformations=transformations)
    pipeline.disable_cache = True

    # Define an async wrapper function to call the async method
    async def async_run():
        return await pipeline.arun(nodes=nodes, show_progress=True)

    # Run the async function using asyncio.run()
    transformed_nodes = asyncio.run(async_run())
    return transformed_nodes

Although when I used azure LLM with async + num_workers it was much faster
because anthropic didnt work in the above snippet with async + parallelization
I cant directly make the function async due to some django constraints we have in the backend thats why using asyncio to fetch the nodes and return it
Add a reply
Sign up and join the conversation on Discord