Find answers from the community

Updated 11 months ago

I'm encountering difficulties while

At a glance
I'm encountering difficulties while attempting to integrate Llama-Index into a Celery task for concurrent processing. Specifically, I'm facing issues with vector creation using the VectorStoreIndex class within Celery tasks. I'm seeking clarification on how to properly use Llama-Index with Celery concurrency or insights into why vector creation fails within Celery tasks.

When attempting to create VectorStoreIndex instances within Celery tasks for concurrent processing, the process gets stuck at the vector creation step. Despite using Celery's concurrency features and ensuring proper task execution, the vector creation process doesn't proceed as expected.
Plain Text
document = Document(text=str(data))
index = VectorStoreIndex.from_documents([document], service_context=self.service_context)
L
p
3 comments
I'm not a celery expert, but you probably want to create a service context for each task right?
hi @Logan M , I tried creating service_contexts for each single task as well but still my process execution gets stuck while creating the vectordb instance. I have attached a small code snippet on how I plan on using llama-index in celery. Any thoughts on why I might be facing these issues ?
Plain Text
class RagHandler:
 
    def get_response(self, transcript_id, project_id):
      
        try:
            chapters = create_chapters(transcript_id=transcript_id, project_id=project_id)
            raw_result = self.run_rag(chapters=chapters, project_id=project_id)
            return raw_result
        except Exception as e:
            logger.error(f"Error in get_response: {e}")
            raise RuntimeError("Error occurred while processing transcript") from e

    def run_rag(self, chapters, project_id):
      
        try:
            result = []
            for chapter in chapters:
                clips=self.process_chapter(chapter=chapter, project_id=project_id)
                if clips is not None:
                    result.extend(clips)
            return result

        except Exception as e:
            logger.error(f"Error in run_rag: {e}")
            raise RuntimeError(f"Error occurred while running RAG") from e

    def process_chapter(
        self, chapter):
        try:
            document = Document(text=str(chapter.dict()))
            llm = OpenAI(
                model=RAG_MODEL,
                temperature=RAG_MODEL_TEMPERATURE,
                api_key=OPENAI_API_KEY,
            )
            service_context = ServiceContext.from_defaults(llm=llm, embed_model="local")
            index = VectorStoreIndex.from_documents(
                [document], service_context=service_context
            )
            query_engine = index.as_query_engine()
            response = query_engine.query(PROMPT)
            return response

        except Exception as e:
            logger.error(f"Could not process Chapter")
Yea not sure tbh. Admittedly I've never used celery lol so hard to have an intuition for what might be going on
Add a reply
Sign up and join the conversation on Discord