from llama_index.query_pipeline import CustomQueryComponent from typing import Dict, Any class MyComponent(CustomQueryComponent): """My component.""" @property def _input_keys(self) -> set: """Input keys dict.""" return {"sql", "response"} @property def _output_keys(self) -> set: # can do multi-outputs too return {"sql", "response"} def _run_component(self, **kwargs) -> Dict[str, Any]: """Run the component.""" sql = kwargs.get("sql") response = kwargs.get("response") return {"sql": sql, "response": response} modules={ ..., "final_output": MyComponent()) ... qp.add_link("text2sql_llm", "final_output", dest_key="sql") qp.add_link("response_synthesis_llm", "final_output", dest_key="response")