Are you using the latest version of llama-index?
0.9.43, so until yesterday I think was the latest one. Let me try with the 0.9.44
latest version 0.9.44 also fails
thats pretty weird, I've never seen this --- it shouldn't be bassing num_workers to the embeddings though
looking at the code, not really sure how this is possible
The code runs fine for me π
I see you are running in a notebook, you might have to restart the runtime/reboot the notebook for any package changes to show up properly
Yeah, kind of. Ok, so the error is gone (seems like the notebook had some issue with the llamaindex version). I moved from a notebook to a python script. However when I run now the same line of code, it throws a different error (no matter what transformation steps I include as long as num_workers >1):
nodes = await pipeline2.arun(documents=data, num_workers=2)
File ".../Library/Caches/pypoetry/virtualenvs/aie-document-service-IZwXNWBP-py3.10/lib/python3.10/site-packages/llama_index/ingestion/pipeline.py", line 625, in arun
result: List[List[BaseNode]] = await asyncio.gather(*tasks)
File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/homebrew/Cellar/python@3.10/3.10.13/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
thats weird π€ I just ran the notebook when I sent that message earlier and did not run into this
I can try again with a python script
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
from llama_index import Document
from llama_index.embeddings import OpenAIEmbedding
from llama_index.text_splitter import SentenceSplitter
from llama_index.extractors import TitleExtractor
from llama_index.ingestion import IngestionPipeline
# create the pipeline with transformations
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=1024, chunk_overlap=20),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# since we'll be testing performance, using timeit and cProfile
# we're going to disable cache
pipeline.disable_cache = True
documents = [Document.example()] * 20
import asyncio
asyncio.run(pipeline.arun(documents=documents, num_workers=4))
that works for me in a script
I think I found the issue. I was testing without calling the pipeline.disable_cache = True. Seems like when using the cache it crashes (I am using a redis cache)
hmm well that makes a bit more sense -- the redis client itself is likely not picklable for multiprocessing :PSadge:
Would it work with other cache types?
maaaaybe π€ Untested at the moment
I think the long term fix here is overriding __setstate__
and __getstate__
for those caches, and ensuring pickling and re-construction works. This would require serializing not the client, but the params used to create the client instead
Sounds complex π
alright. We'll have to decide whether the cache or parallel processing is more important at the moment for us. Thanks for the help!
btw I also noticed that the cache is built based on a list nodes level, instead of at the node level right? I was testing that yesterday, and processing [doc1, doc3, doc2] will not use the cache results from [doc1, doc2, doc3] π
this is correct -- caching at a more granular level requires many more API calls
For example, if you cached at the node level, and end up chunking your documents into 10,000 nodes, that would mean 10,000 api calls when checking the hashes for the next transform step
instead, the way its implemented is 1 API call per transform step -- a tradeoff I suppose
You could however maybe add some function that makes sure [doc1, doc2] hashes the same as [doc2, doc1] (would need of course to manipulate a little bit the list)?. You could do it just once, when you transform the documents to nodes for example. That way you can find matches independently of the order of documents/nodes and have a hit rate match higher.
This could be specially important when working with parallel threads, where a list of things can vary their order, from run to run, and would be a shame that the cache is not able to identify the nodes where previously already processed.
However I don't know how exactly the cache works, so might also not be a feasible suggestion from my side π¬