import os import logging from typing import Any, AsyncGenerator, List, Optional, Sequence from llama_index.core.prompts.mixin import PromptDictType from llama_index.core.response_synthesizers.base import BaseSynthesizer from llama_index.core.types import RESPONSE_TEXT_TYPE from llama_index.core.base.response.schema import ( RESPONSE_TYPE, ) from llama_index.core.schema import ( MetadataMode, NodeWithScore, QueryType, ) import llama_index.core.instrumentation as instrument dispatcher = instrument.get_dispatcher(__name__) logger = logging.getLogger(__name__) QueryTextType = QueryType import tiktoken # Default value for SOURCE_CONTEXT_WINDOW DEFAULT_SOURCE_CONTEXT_WINDOW = int(os.getenv("SOURCE_CONTEXT_WINDOW", "4096")) async def empty_response_agenerator() -> AsyncGenerator[str, None]: yield "Empty Response" class NoLLM(BaseSynthesizer): def __init__(self, separator: str = "\n\n---\n\n", source_context_window: int = DEFAULT_SOURCE_CONTEXT_WINDOW): super().__init__() self.separator = separator self.source_context_window = source_context_window self.encoder = tiktoken.encoding_for_model("gpt-3.5-turbo") def _get_prompts(self) -> PromptDictType: """Get prompts.""" def _update_prompts(self, prompts_dict: PromptDictType) -> None: """Update prompts.""" def get_response( self, query_str: str, text_chunks: Sequence[str], **response_kwargs: Any, ) -> RESPONSE_TEXT_TYPE: return self.separator.join(text_chunks) async def aget_response( self, query_str: str, text_chunks: Sequence[str], **response_kwargs: Any, ) -> RESPONSE_TEXT_TYPE: return self.separator.join(text_chunks)
def _filter_response_and_nodes(self, source_nodes: List[NodeWithScore]) -> List[NodeWithScore]: sorted_nodes = sorted(source_nodes, key=lambda node: node.score, reverse=True) separator_tokens = self.encoder.encode(self.separator) separator_token_count = len(separator_tokens) highest_score_node = sorted_nodes[0] total_tokens = self.encoder.encode(highest_score_node.node.get_content(metadata_mode=MetadataMode.LLM)) filtered_nodes = [highest_score_node] token_count = len(total_tokens) for node in sorted_nodes[1:]: node_tokens = self.encoder.encode(node.node.get_content(metadata_mode=MetadataMode.LLM)) if token_count + len(node_tokens) + separator_token_count <= self.source_context_window: filtered_nodes.append(node) token_count += len(node_tokens) + separator_token_count return filtered_nodes # TODO: For now we just want to support the asynchronous call @dispatcher.span async def asynthesize( self, query: QueryTextType, nodes: List[NodeWithScore], additional_source_nodes: Optional[Sequence[NodeWithScore]] = None, **response_kwargs: Any, ) -> RESPONSE_TYPE: filtered_nodes = self._filter_response_and_nodes(nodes) return await super().asynthesize(query=query, nodes=filtered_nodes, additional_source_nodes=additional_source_nodes, **response_kwargs) @dispatcher.span def synthesize( self, query: QueryTextType, nodes: List[NodeWithScore], additional_source_nodes: Optional[Sequence[NodeWithScore]] = None, **response_kwargs: Any, ) -> RESPONSE_TYPE: filtered_nodes = self._filter_response_and_nodes(nodes) return super().synthesize(query=query, nodes=filtered_nodes, additional_source_nodes=additional_source_nodes, **response_kwargs)
context = " ".join([node.dict()['node']['text'] for node in response.source_nodes]) print(context)