def load_and_index_json(directory_path):
if not os.path.exists(directory_path):
logger.error(f"Folder {directory_path} does not exist")
sys.exit(1)
reader = JSONReader(
levels_back=0,
collapse_length=None,
ensure_ascii=False,
is_jsonl=False,
clean_json=True,
)
json_files = glob.glob(os.path.join(directory_path, "*.json"))
def process_file(json_file):
return reader.load_data(input_file=json_file, extra_info={})
documents = []
max_workers = min(len(json_files), multiprocessing.cpu_count())
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_file, json_file) for json_file in json_files]
for future in as_completed(futures):
try:
result = future.result()
if isinstance(result, list):
documents.extend(result)
else:
print(f"Unexpected result type: {type(result)}")
except Exception as e:
print(f"Future processing failed: {e}")
vector_store = create_vector_store()
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents, storage_context=storage_context, show_progress=True
)
return index
def query_index(index, query):
start_time = time.time()
query_engine = index.as_query_engine()
response = query_engine.query(query)
end_time = time.time()
elapsed_time = end_time - start_time
query_time_logger.info(f"Query executed in {elapsed_time:.2f} seconds.")
return response