Find answers from the community

Updated last year

Custom LLM Streaming

I can't find a doc on how to implement streaming for custom LLM, the CompletionResponseGen section is not implemented in the doc
E
B
L
33 comments
Are you using a HuggingFaceLLM or LangchainLLM? these ones is already supported by llamaindex if not you will need to do some hacks
@Emanuel Ferreira I use just API requests with stream=True
Any idea how to use the object CompletionResponseGen?
maybe @Logan M can help better on that
@Logan M i try to use yield CompletionResponseGen(text=generated_text) but i get error Generator() takes no arguments
@Bar Haim yea, if you look at those examples above, you need to return a function that will yield ChatResponse(..) objects
Thanks, it works
however when I use it as stream model for index query and using for token in response.response_gen the token are empty strings
but when using with complete
Plain Text
    llm = BamLLM()
    resp = llm.stream_complete('1 + 1')
    for delta in resp:
        print(delta, end='')

it works
I want to use it with index.query
this code:
Plain Text
streaming_response = self.query_engine.query(
            prompt
        )
        for token in streaming_response.response_gen:
            print(token)

give me empty strings
it prints but empty strings
hmm, are you able to share the code for the stream_complete function? If not, I can try and make an example
Plain Text
    @llm_completion_callback()
    def stream_complete(self, prompt: str, **kwargs: Any) -> CompletionResponseGen:
        data = {
            "model_id": os.getenv('MODEL_NAME'),
            "inputs": [prompt],
            "parameters": {
                "temperature": float(os.getenv('TEMPERATURE')),
                "max_new_tokens": int(os.getenv('MAX_OUTPUT_TOKENS')),
                "stream": True
            }
        }
        headers = {
            "Authorization": f"Bearer {os.getenv('GENAI_KEY')}",
        }
        response = requests.post(os.getenv('GENAI_API'), json=data, headers=headers, stream=True)
        if response.status_code == 200:
            for chunk in response.iter_content(chunk_size=4096):
                try:
                    if chunk:
                        output_str = chunk.decode('utf-8')
                        if output_str.startswith('data: '):
                            output_str = output_str[len('data: '):]
                        data = json.loads(output_str)
                        generated_text = data['results'][0]['generated_text']
                        yield CompletionResponse(text=generated_text)
                except Exception as ex:
                    print(str(ex))
@Logan M this is my steam_complete
if i use it with
Plain Text
 llm = BamLLM()
    resp = llm.stream_complete('1 + 1')
    for delta in resp:
        print(delta, end='')

it is working, but not when using with index query engine
cool! Let me double check the code
when using
Plain Text
 streaming_response = self.query_engine.query(
            prompt
        )
        for token in streaming_response.response_gen:
            print(token)

it's all empty
Attachment
image.png
if i add a print on the method itself before the yields it is printing ok
it shows just the printings
Attachment
image.png
Plain Text
    @llm_completion_callback()
    def stream_complete(self, prompt: str, **kwargs: Any) -> CompletionResponseGen:
        data = {
            "model_id": os.getenv('MODEL_NAME'),
            "inputs": [prompt],
            "parameters": {
                "temperature": float(os.getenv('TEMPERATURE')),
                "max_new_tokens": int(os.getenv('MAX_OUTPUT_TOKENS')),
                "stream": True
            }
        }
        headers = {
            "Authorization": f"Bearer {os.getenv('GENAI_KEY')}",
        }
        response = requests.post(os.getenv('GENAI_API'), json=data, headers=headers, stream=True)

        def gen():
            content = ""
            if response.status_code == 200:
              for chunk in response.iter_content(chunk_size=4096):
                  try:
                      if chunk:
                          output_str = chunk.decode('utf-8')
                          if output_str.startswith('data: '):
                              output_str = output_str[len('data: '):]
                          data = json.loads(output_str)
                          generated_text = data['results'][0]['generated_text']
                          content += generated_text
                          yield CompletionResponse(text=content, delta=generated_text)
                  except Exception as ex:
                      print(str(ex))
            else:
                yield CompletionResponse(text="Network Error")

        return gen()
I think you were missing the delta
also, it should probably return a gen() function, just to be consistent with the other LLMs
it works, i'm trying to understand what was changed
other than the gen()
got it, thanks πŸ‘
it was missing in the docs
yea, content += as well as setting delta is the main change
Hopefully it works lol just working from the existing source code in llama-index πŸ˜†
Add a reply
Sign up and join the conversation on Discord