Find answers from the community

Updated 3 months ago

Async ingestion pipeline loads no data

Async ingestion pipeline loads no data after completion

Its not returning any errors...

My code:
Plain Text
    def init_ingestion_pipeline(self) -> IngestionPipeline:
        '''Initializes the ingestion pipeline for the vector store'''

        pipeline = IngestionPipeline(
            transformations=[
                AzureOpenAIEmbedding(
                    model="text-embedding-ada-002"
                    , azure_deployment="text-embedding-ada-002"
                    , azure_endpoint=str(settings.azure_openai_api_base)
                    , api_version=str(settings.azure_openai_api_version)
                    , api_key=str(settings.azure_openai_api_key)
                )
            ]
            , vector_store=self.vector_store
        )

        self.ingestion_pipeline = pipeline

        return pipeline

    async def ingest(self, chunks: list[dict]):
        '''Ingests a list of chunks into the vector store asynchronously'''
        
        if hasattr(self, 'ingestion_pipeline') == False:
            self.init_ingestion_pipeline()
        print('################ vector store and ingestion pipeline initialized')
        
        processed_chunks = await process_chunks(chunks)
        print('################ chunks processed')

        #await self.ingestion_pipeline.arun(documents=processed_chunks)
        nodes = self.ingestion_pipeline.run(documents=processed_chunks, show_progress=True)
        print('################ ingestion pipeline completed')


The code above is in a class that I'm calling:

Plain Text
vector_store = PFVectorStore() #Custom class, not a vector store from LlamaIndex
run(vector_store.ingest(test_data)) #Referencing the ingest function from an ingestion pipeline, ingest is a custom wrapper over the ingestion pipeline
n
k
L
99 comments
Here is my data attempting to be inserted:

Plain Text
test_data = [
    {
        "text": '''
            <<Patrick>>
                How far from the Sun is the Earth?

            <<PF>>    
                The average distance from the Earth to the Sun is about 93 million miles (150 million kilometers). 
                This distance is often referred to as one Astronomical Unit (AU), which is a standard unit of measurement 
                used in astronomy to describe distances within our solar system.       
        '''
        , "metadata": generate_metadata("pm")
    }
    , {
        "text": '''
            <<Patrick>>
                How many people live in Africa?

            <<PF>>    
                As of 2023, the estimated population of Africa is approximately 1.44 billion people. 
                This estimate is based on an annual growth rate of 2.5% from the last known accurate figure of 1.37 billion in 2021     
        '''
        , "metadata": generate_metadata("pm")
    }
    , {
        "text": '''
            <<Patrick>>
                What is Joe Biden's middle name?

            <<PF>>    
                Joe Biden's middle name is Robinette. His full name is Joseph Robinette Biden Jr.
        '''
        , "metadata": generate_metadata("pm")
    }
]
@kapa.ai

Based on my code above, can you help me find out whats wrong?
My code is not throwing errors but after it completes nothing has been inserted into my database
Let me check the knowledge base for you. One moment! :books:
----------------------
I'm sorry, but I can't provide the help you're looking for because the extracts you've provided don't include any code related to your issue. However, if you're working with LlamaIndex and SQLDatabase, make sure you're correctly establishing a connection to your database and that your insert statements are being executed properly.

Here's an example of how to insert data into a SQLDatabase using LlamaIndex, based on the extracts you've provided:

Plain Text
from sqlalchemy import insert

rows = [
    {"city_name": "Toronto", "population": 2930000, "country": "Canada"},
    {"city_name": "Tokyo", "population": 13960000, "country": "Japan"},
    {
        "city_name": "Chicago",
        "population": 2679000,
        "country": "United States",
    },
    {"city_name": "Seoul", "population": 9776000, "country": "South Korea"},
]
for row in rows:
    stmt = insert(city_stats_table).values(**row)
    with engine.begin() as connection:
        cursor = connection.execute(stmt)

(Source: LlamaIndex SQLDatabase Example ())

If you're still having trouble, I recommend checking your database connection and ensuring your SQL statements are correct.
----------------------
Note: you can ask me a follow up question by @ mentioning me again :speech_balloon:
----------------------
I hate to bump this but kapa isn't helping and I think the timing of my question allowed it to be burried. Any idea what might be the issue here? @Logan M
Curious what process_chunks() is doing πŸ‘€

Are there nodes returned when you call run() ?
Plain Text
from llama_index import Document
from pf.cbcore.utils.llm import pf_llm
import asyncio
import datetime

async def async_ensure_chunk(text: str, chunk_limit: int=150) -> str:
    '''Summarizes large chunks into smaller chunks if the chunk is over the chunk limit.'''
    
    chunk = text
    if len(text.split()) > (chunk_limit + int(chunk_limit * 0.05)):
        prompt = f'Please summarize the following text to be at or within a chunk/word limit of {chunk_limit}:\n\n' + text
        chunk = await pf_llm.apredict(prompt)
    
    return chunk

def tag_chunk(chunk: str, tags: dict) -> Document:
    '''Tags a given chunk with given tags'''

    doc = Document(text=chunk)
    doc.metadata.update(tags)

    return doc 

def generate_metadata(user: str, env: str="react") -> dict:
    '''Returns relevant metadata'''

    local_timezone = datetime.datetime.now().astimezone().tzinfo
    timestamp = datetime.datetime.now(local_timezone)

    metadata = {
        "user": user
        , "ctimestamp": str(timestamp)
        , "environment": env
    }

    return metadata

async def process_chunks(texts: list[dict], chunk_limit: int=150) -> list[Document]:
    '''Processes a list of texts into a list of tagged and potentially summarized chunks'''
    
    async def process_chunk(raw):
        text = raw.get("text")
        md = raw.get("metadata")
        chunk = await async_ensure_chunk(text, chunk_limit)
        return tag_chunk(chunk, md)

    return await asyncio.gather(*(process_chunk(raw) for raw in texts))
Process chunks is an async way for me to summarize (if the chunk is over 150 tokens) and tag with relevant metadata before sending to the vector store. Let me see what nodes are returned..
Yea that process chunks function seems alright, it's returning document objects
Nodes are returned
They look like document objects with the embedding element filled w/ embeddings, whereas before non-embedded documents I think have that ement as None or False
The nodes look as I would expect them to
Ok, thats good πŸ€”
So, since you attached the vector store to the pipeline, the should be in your vector db now
And you know for sure?
There have been 25 rows since before I ran any ingestion pipelines
Attachment
image.png
Those last ones are ones I ran in a notebook not with the scripts I have pasted above
if you run vector_store.add([nodes[0]]) does that end up inserting a node?
I'm also positive my database, schema, and table info in my VectorStore object name match what I'm querying here in my database tool.
nodes as in nodes is the object I index after running the ingest function or can I just index the processed_chunks list for a node.
It will have to be a node with an embedding attached, so after ingest
maybe just try on a single document chunk
Looking at the code for ingestion_pipeline.run(), there is this conditional JUST before returning the nodes
Attachment
image.png
So like... it's most certainly calling add(), unless one of these conditions is false
Let me try that
No dice, even when I add one in particular it doesn't load.
I was able to load some yesterday using dummy data and the guides above but never with an ingestion pipeline. Just by running insert() on an Vector Index.
I'm trying to use ingestion pipeliens because they're easier to read and are async
I'm going to paste what nodes[0] looks like. That may be a factor here. I'm positive its embedded but we'll see...
Plain Text
        self.vector_store.add([nodes[0]])
        print('################ adding')
        print(nodes[0])
        print(type(nodes[0]))
        print(hasattr(nodes[0], 'embedding'))
Plain Text
Doc ID: 3e248a52-448c-4a5b-b03b-12c6217602c6
Text: <<Patrick>>                 How far from the Sun is the Earth?
<<PF>>                     The average distance from the Earth to
the Sun is about 93 million miles (150 million kilometers).
This distance is often referred to as one Astronomical Unit (AU),
which is a standard unit of measurement                  ...

<class 'llama_index.schema.Document'>
True
So it is indeed a document, and it indeed has an embedding element.
Actually, that would be true even if it wasn't embedded.. let me check what the type of the embedding value is.
Ohhh maybe one thing that might be causing issues (not 100% sure though)

You should probably be running a SentenceSplitter or TokenTextSplitter in your ingestion pipeline no?
I'm not splitting anything, just summarizing them.
Yea, curious if the embedding is a List[float]
Jsut checked, it is
Looks like embeddings to me.
The fact that vector_store.add() doesn't work is pretty concerning lol
Do I need this? The examples I'm providing are not big and I would rather summarize than split because I want to keep Prompts & Responses in a single chunk
Otherwise I worry our RAG will perform worse.
So I'm opting for summarizing basically.
Maybe I need to write a Custom Transformer to adhere to the pattern above? I considered that but didn't think it was necessary (I also only thought of it after I had already written my methods outside of the framework so)
Yea I totally feel you. I just wonder if them being a Document object instead of a TextNode object is problematic
It needs to be a TextNode?
I was able to load a Document in a simple example with insert earlier yesterday. But yeah never tried it with an ingestion pipeline.
I don't THINK so (document and textnode are the same nearly), just reading the source code trying to explain this haha
One more test
Did I find a bug? 😁
Logical error it seems?
Maybe try this -- this is the most simple sanity test I can think of haha

Plain Text
from llama_index.schema import TextNode

node = TextNode(text="hello world")
node.embedding = embed_model.get_text_embedding(node.text)

vector_store.add([node])
Can I extract my embed_model from my service context?
Like is that something I can just service_context.embed_model.get_text_embedding()
yea that works
Just tried it yeah
alright running it
no dice 😦
Plain Text
        node = TextNode(text="Testing with Logan")
        node.embedding = self.service_context.embed_model.get_text_embedding(node.text)
        self.vector_store.add([nodes[0]])
Oh, forgot to update the last line
Plain Text
node = TextNode(text="Testing with Logan")
node.embedding = 
self.service_context.embed_model.get_text_embedding(node.text)
self.vector_store.add([node])
The embeddings are there too
It runs, but nothing is in the database
So something is up with either vector store or the tool you are using to view the data πŸ€”
Restarting the tool..
I am going to try and insert a different way that worked for me
One thing to note, for very silly and old reasons we can't easily change, pgvector store adds to the table name you give it

https://github.com/run-llama/llama_index/blob/f9370f0c4be69ea5284b126492aecf2416da9d3d/llama_index/vector_stores/postgres.py#L49
Omg thats it.. I think I figured it out.
Yeah for some reason data_ is appended to the embeddings which is fine
So I added that in my new pgvector store instantiation, when really I should have just added the original name and let pgvector append data_ itsellf.
Let me try now
Guess what @Logan M
Jk it worked. And it even loaded the originals we were stuck on
ayyyy finally
:dotsHARDSTYLE:
I can't believe it was something as simple as that.
You mentioning this jogged my memory.
yeaaaa I always forget about that too haha
But now we can't change it without breaking everyones vector stores :cryingskull:
Well, you could like
Look for f'data_{vs_name} as well as vs_name right?
Still might be confusing in case anyone adds data at the beginning though
yeaaaa... probably a proper fix is adding a deprecation warning for a month or two, to give people notice. But just hasn't been a priority yet haha
Yeah its not really a big deal, I don't blame you.
It broke again, but like, because I was running the final method async (in the example above I wans't because I was avoiding another error lol)
See my most recent thread..
Add a reply
Sign up and join the conversation on Discord