Find answers from the community

Updated 6 months ago

Is there an issue in IngestionPipeline

At a glance

The community member is experiencing an issue with the IngestionPipeline when using parallel processing mode. They have tried using the num_workers argument, but the pipeline doesn't return any error messages and doesn't execute further. Removing the num_workers argument makes the pipeline run, but it's extremely slow.

Other community members have provided suggestions and insights:

  • Using parallel processing (multithreading) with Python may not provide a significant speed-up.
  • Using async/await is recommended as it can be much faster.
  • There were issues with using async and the Anthropic library, but the community member was able to get it working with Anthropic.
  • The community member also tried using async with Azure OpenAI, which was faster than the Anthropic implementation.
  • Due to Django constraints, the community member couldn't directly make the function async, so they used asyncio to fetch the nodes and return them.

There is no explicitly marked answer, but the community members have provided several suggestions and insights that may help the original poster resolve their issue.

Is there an issue in IngestionPipeline when using parallel processing mode?
Plain Text
        transformations = [
            HierarchicalNodeParser.from_defaults(chunk_sizes=[4096, 2048]),
            Settings.embed_model,
        ]
        logger.info("Creating pipeline")
        pipeline = IngestionPipeline(transformations=transformations)
        # pipeline.disable_cache = False
        logger.info('Num workers: ' + str(os.cpu_count()))

        nodes = pipeline.run(
            documents=createHierarchicalIndexRequest.Documents,
            num_workers=4,
        )


My pipeline doesn't return any err messages nor executes further after pipeline.run() call. If I remove num_workers arg it runs but its extremely slow, any advice?
L
r
13 comments
Seems to work for me, but its only slightly faster to use num workers (multithreading with python isn't really real multithreading)
Plain Text
from llama_index.core.node_parser import HierarchicalNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core import Document

def main():
  pipeline = IngestionPipeline(transformations=[
    HierarchicalNodeParser.from_defaults(),
    OpenAIEmbedding(),
  ],
  disable_cache=True
  )

  documents = [Document.example()]*100

  import time

  start = time.time()
  pipeline.run(documents=documents, num_workers=1)
  end = time.time()
  print(end-start)

  start = time.time()
  pipeline.run(documents=documents, num_workers=4)
  end = time.time()
  print(end-start)

if __name__ == "__main__":
  main()
Waaay faster to just use async
Plain Text
import asyncio
from llama_index.core.node_parser import HierarchicalNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core import Document

async def main():
  pipeline = IngestionPipeline(transformations=[
    HierarchicalNodeParser.from_defaults(),
    OpenAIEmbedding(),
  ],
  disable_cache=True
  )

  documents = [Document.example()]*100

  import time

  start = time.time()
  await pipeline.arun(documents=documents)
  end = time.time()
  print(end-start)


if __name__ == "__main__":
  asyncio.run(main())
also when I try to run async using Anthropic - 'Anthropic' object has no attribute '_aclient
Plain Text
def run_pipeline_sync(nodes):
    transformations = [
        title_extractor,
        qa_extractor,
        SummaryExtractor(summaries=["self"]),
    ]
    pipeline = IngestionPipeline(transformations=transformations)
    pipeline.disable_cache = True
    
    async def async_run():
        return await pipeline.arun(nodes=nodes, num_workers=4)
    
    transformed_nodes = asyncio.run(async_run())
    return transformed_nodes
just use async πŸ˜… Your life will be easier.
I need to remove this num_workers param, multiprocessing is too flakey
yes I used async above using arun method
I used async with num_workers , now I have used without still getting the error
Okay it worked now with anthropic , thanks !
Plain Text
def run_pipeline_sync(nodes):

    # llm = AzureOpenAI(
    # model="gpt-4o",
    # deployment_name='EntelligenceAI',
    # temperature=0.0,
    # azure_endpoint=AZURE_OPENAI_ENDPOINT,
    # api_key=AZURE_OPENAI_API_KEY,
    # api_version=AZURE_OPENAI_VERSION,
    # )
    llm = Settings.llm
    transformations = [
        TitleExtractor(nodes=5, llm=llm),
        QuestionsAnsweredExtractor(llm=llm,questions=3),
        SummaryExtractor(llm=llm,summaries=["self"]),
    ]
    pipeline = IngestionPipeline(transformations=transformations)
    pipeline.disable_cache = True

    # Define an async wrapper function to call the async method
    async def async_run():
        return await pipeline.arun(nodes=nodes, show_progress=True)

    # Run the async function using asyncio.run()
    transformed_nodes = asyncio.run(async_run())
    return transformed_nodes

Although when I used azure LLM with async + num_workers it was much faster
because anthropic didnt work in the above snippet with async + parallelization
I cant directly make the function async due to some django constraints we have in the backend thats why using asyncio to fetch the nodes and return it
Add a reply
Sign up and join the conversation on Discord