There's a larger question I have around how to support input validation to ensure that tools don't fire with the wrong info, but maybe the answer to this question will lead me in the right direction.
You could replace the human-in-the-loo step with something more progamatic if you wanted
Thank you @Logan M , will take a look
Ok, I've gone through the tool agent concierge flow. What I'm not understanding is why "Multiply 6 * 2 and then add 5" doesn't work. Shouldn't the transfer let it do the next step?
It seems like it can't jump from workflow to workflow in the same thread. If I put the tools in the same Agent config, it works.
New issue. I have a workflow that requires implicit inputs. Say you want to cancel one of your insurance policies, and your tool returns a list. It seems like the workflow is autoselecting the first value in a list rather than asking which one it should cancel.
Here's an example code for my Agent Config and tools.
from pydantic import Field
from llama_index.core.tools import FunctionTool
from models.agentconfig import AgentConfig
def get_account_ids(email: str = Field(description="An email address, formatted as name@example.com")) -> list[str]:
"""Used to get a list of account IDs for a user for a given email address."""
return ["1234567890"]
def get_policies_for_account_id(account_id: str) -> list[str]:
"""Used to get a list of policies for a given account ID."""
return ["Policy_1", "Policy_2"]
def send_cancel_policy_form(account_id: str, policy: str) -> str:
"""Used to send a cancel policy form to a customer."""
return "Cancel Policy Form sent for Policy: " + policy
get_account_ids_tool = FunctionTool.from_defaults(fn=get_account_ids)
get_policies_for_account_id_tool = FunctionTool.from_defaults(fn=get_policies_for_account_id)
send_cancel_policy_form_tool = FunctionTool.from_defaults(fn=send_cancel_policy_form)
tools = [get_account_ids_tool, get_policies_for_account_id_tool, send_cancel_policy_form_tool]
SendCancelPolicyFormAgentConfig = AgentConfig(
name="Send Cancel Policy Form Agent",
description="Used to send a cancel policy form to a customer.",
system_prompt="You are an agent that can send a cancel policy form to a customer. Do not help the user with anything else.",
tools=tools,
)
I set the send_cancel_policy_form
to require human approval and now it's asking for which policy. However, if I send back one of the policy options, the script times out.
@Logan M , I see that handle_tool_approval
appears to assume a boolean input. How would you handle a more general "Requires human input"?
Seems like in your case, you'd want to add a step for human processing on the tool output?
I see that handle_tool_approval appears to assume a boolean input. How would you handle a more general "Requires human input"?
-- the same way, except instead of approving the tool call, you could add text to the tools output, or replace the tool call call with whatever the person said. Its kind of up to you
That's where I started going, I think
@step
async def handle_input(self, ctx: Context, ev: ToolInputEvent) -> ToolCallEvent | ToolCallResultEvent:
""""Handles the input required for a tool call."""
if ev.input:
active_speaker = await ctx.get("active_speaker")
agent_config = (await ctx.get("agent_configs"))[active_speaker]
return ToolCallEvent(
tools=agent_config.tools,
tool_call=ToolSelection(
tool_id=ev.tool_id,
tool_name=ev.tool_name,
tool_kwargs=ev.tool_kwargs,
),
)
else:
return ToolCallResultEvent(
chat_message=ChatMessage(
role="tool",
content=ev.response or self.default_tool_reject_str,
)
)
How do you "add text to the tools output" ?
Ok. Interesting. I had added the input requirement for the get_policies function, when apparently it needed to be on the send_forms function. Seems to work now!
Ok, one more hiccup. With this code, if the user provides the information without being prompted, it crashes.
so if I say 'their email is x@y.com and policy number is Policy_1, it's timing out.
Hmm, I'm not 100% sure what you mean by "provides the information without being prompted"
The tool requires a policy id, so I added the "requires_human_input" flag. When I one-shot the request and say "here is all the data you need to do x thing" it times out.
How are you calling the workflow? I think you'll still need to call handler.stream_events()
and call handler.ctx.send_event(...)
when needed right? Otherwise it will get stuck and time out
I have my code as part of a FastAPI. This if the function my endpoint calls to create the Workflow and execute it (I run
response = await on_message(event)
and return the result):
async def on_message(chatrequest: ChatRequest) -> str:
workflow = ConciergeWorkflow()
handler = workflow.run(
agent_configs=[MathAgentConfig, SendCancelPolicyFormAgentConfig,TransferToHumanAgentConfig],
user_msg=chatrequest.message.content,
chat_history=chatrequest.chat_history,
initial_state={"favorite number": "5"},
llm=llm,
)
async for event in handler.stream_events():
if isinstance(event, ProgressEvent):
print(event.msg)
print("-----------")
final_result = await handler
print(final_result)
return final_result["response"]
Right, if an agent calls a tool that needs human response/approval, you aren't checking for it π You are only checking for progress events
In the original example, you need to check for
if isinstance(event, ToolRequestEvent):
and then run
handler.ctx.send_event(ToolApprovedEvent(...))
So that execution can continue
I thought this part was supposed to happen in the Concierge Workflow
Isn't that for loop just printing output? it's not actually executing anything
Here is my Workflow Class
Oh, interesting. Yea, I think I missed that part.
Yea without that send_event, the workflow will either timeout (if you set a timeout) or hang forever because it's waiting for that response event
This is the "human in the loop" portion
Do you have a suggestion for how to transition approved = input("Do you approve? (y/n): ")
for an API response?
I get an EOF error on the input("USER>>")
code. I assume this opens a request box in a notebook?
In a notebook or CLI app it works
To make it come from an api response, you'd need to, I guess, make an API lol
If this was on a fastapi server, I might create a websocket. Or, if you want pure REST, you'll have to serialize the run and resume it once you have a response
from llama_index.core.workflow.context_serializers import JsonSerializer, JsonPickleSerializer
ctx_dict = handler.ctx.to_dict(serializer=JsonSerializer())
....
resumed_ctx = Context.from_dict(w, ctx_dict, serializer=JsonSerializer())
handler = w.run(ctx=resumed_ctx)
handler.send_event(...)
...
Where does the Context
value from from in that example?
from llama_index.core.workflow import Context
When I await the handler, I don't see a ctx value in the final_result
I see it inside the workflow object though...
When you await the handler, it just returns the result. The context is only accessed through handler.ctx
(I might be misunderstanding what you meant)
nvm. I'm seeing that the agent config in the context is not serializable.
JsonPickleSerializer
might be helpful there
(its opt-in, because, yanno. pickle lol)
Can't pickle local object π¦
ah dang, thats a death sentence lol
(imo the websocket approach is probably way easier. Websockets in fastapi are so easy these days)
In that example, the webscoket would close either when the workflow run ends, or when an exception happens
They way I setup the multi-agent concierge is that all the state (i.e. chat history) is passed into workflow.run(), so you can manage that any way you want (its a pydantic object, so easy to serialize)
So following that example, I would return the workflow.run() result, and either have the caller manage the chat history, or some other db
would just passing a chatmemorybuffer to the workflow run suffice?
That does not seem to work. hmmm
I must be missing something fundamental about how to make this system work.
nope. Maybe refer to the original main.py I linked earlier. Its passing in the chat history as a list of ChatMessage objects
and the caller of the workflow has to manage the memory
Is it supposed to append the messages that come in to handler.chat_history?
Looking at main.py. memory.put()
?
So you execute workflow.run at every new 'turn' ?
technically each workflow.run()
in this case is like a single pass/turn through the entire system
Thanks for the help. I got the system running!