🧠AI Agent 与工具调用
Agent面试实战题
难度:⭐⭐ | 高频指数:🔥🔥🔥 | 应用岗相关度:★★★
面试回答
常见问法
- “设计一个能自动完成任务的 Agent 系统”
- “ReAct 模式是什么?你怎么用的?”
- “Agent 死循环怎么防?错误怎么处理?”
- “多 Agent 协作怎么设计?“
回答
Agent 的核心是让 LLM 不只是回答问题,而是能规划任务、调用工具、观察结果、决定下一步。面试时要讲清楚规划策略、执行机制和容错设计。
# ReAct 模式实现
class ReActAgent:
"""
ReAct = Reasoning + Acting
循环:思考(Thought) → 行动(Action) → 观察(Observation) → 思考...
直到任务完成或达到最大步数
"""
def __init__(self, llm, tools, max_steps=10):
self.llm = llm
self.tools = {t.name: t for t in tools}
self.max_steps = max_steps
self.history = []
def run(self, task):
"""执行任务"""
for step in range(self.max_steps):
# 1. Reasoning:让模型思考下一步
thought = self.think(task, self.history)
# 2. 判断是否完成
if thought.get('is_final_answer'):
return {
'status': 'success',
'answer': thought['answer'],
'steps': len(self.history)
}
# 3. Acting:执行工具调用
action = thought.get('action')
if action:
observation = self.act(action)
else:
observation = "没有指定工具调用"
# 4. 记录历史
self.history.append({
'step': step + 1,
'thought': thought.get('reasoning', ''),
'action': action,
'observation': observation
})
# 超过最大步数
return {
'status': 'max_steps_reached',
'partial_result': self.history[-1] if self.history else None,
'steps': self.max_steps
}
def think(self, task, history):
"""让模型思考下一步"""
prompt = f"""你是一个任务执行Agent。根据任务和历史记录,决定下一步。
任务:{task}
可用工具:
{self.format_tools()}
历史记录:
{self.format_history(history)}
请按以下格式回答:
思考:[你的推理过程]
行动:[工具名称]({{"参数名": "参数值"}})
或者
思考:[你的推理过程]
最终答案:[任务的最终回答]
"""
response = self.llm.generate(prompt)
return self.parse_thought(response)
def act(self, action):
"""执行工具调用"""
tool_name = action.get('tool')
params = action.get('params', {})
if tool_name not in self.tools:
return f"错误:工具 {tool_name} 不存在"
try:
result = self.tools[tool_name].execute(**params)
return str(result)
except Exception as e:
return f"工具执行失败:{str(e)}"
def format_tools(self):
"""格式化工具描述"""
descriptions = []
for name, tool in self.tools.items():
descriptions.append(
f"- {name}: {tool.description}\n"
f" 参数: {tool.parameters}"
)
return "\n".join(descriptions)
def format_history(self, history):
"""格式化历史记录"""
if not history:
return "(无历史记录)"
lines = []
for h in history:
lines.append(f"步骤{h['step']}:")
lines.append(f" 思考: {h['thought']}")
lines.append(f" 行动: {h['action']}")
lines.append(f" 观察: {h['observation']}")
return "\n".join(lines)
def parse_thought(self, response):
"""解析模型输出"""
import re
# 检查是否是最终答案
final_match = re.search(r'最终答案[::]\s*(.+)', response, re.DOTALL)
if final_match:
return {
'is_final_answer': True,
'answer': final_match.group(1).strip()
}
# 解析行动
action_match = re.search(
r'行动[::]\s*(\w+)\((.+?)\)', response, re.DOTALL
)
reasoning_match = re.search(r'思考[::]\s*(.+?)(?=行动|最终)', response, re.DOTALL)
result = {
'is_final_answer': False,
'reasoning': reasoning_match.group(1).strip() if reasoning_match else ''
}
if action_match:
import json
tool_name = action_match.group(1)
try:
params = json.loads(action_match.group(2))
except json.JSONDecodeError:
params = {}
result['action'] = {'tool': tool_name, 'params': params}
return result
追问
规划策略怎么选?
# 两种主要规划策略对比
class PlanningStrategies:
"""
策略1: Plan-then-Execute(先规划再执行)
- 适合:任务明确、步骤可预测
- 优点:全局最优、可预览计划
- 缺点:计划可能因中间结果变化而失效
策略2: Adaptive(边做边调整)
- 适合:任务不确定、需要根据中间结果调整
- 优点:灵活、能处理意外情况
- 缺点:可能走弯路、难以预估总步数
"""
def plan_then_execute(self, task):
"""先规划再执行"""
# 1. 生成完整计划
plan = self.llm.generate(f"为以下任务生成执行计划:{task}")
steps = self.parse_plan(plan)
# 2. 逐步执行
results = []
for step in steps:
result = self.execute_step(step)
results.append(result)
# 如果某步失败,重新规划
if result['status'] == 'failed':
remaining_steps = self.replan(task, results, steps)
steps = steps[:len(results)] + remaining_steps
return results
def adaptive_execution(self, task):
"""边做边调整"""
context = {'task': task, 'results': [], 'step': 0}
while not self.is_task_complete(context):
# 每一步都重新决策
next_action = self.decide_next_step(context)
result = self.execute_step(next_action)
context['results'].append(result)
context['step'] += 1
# 安全检查
if context['step'] > 15:
break
return context['results']
原理展开
错误处理与死循环防护
class RobustAgent:
"""带完整错误处理的Agent"""
def __init__(self, llm, tools, config=None):
self.llm = llm
self.tools = tools
self.config = config or {}
# 防护参数
self.max_steps = self.config.get('max_steps', 10)
self.max_retries_per_tool = self.config.get('max_retries', 2)
self.max_same_action = self.config.get('max_same_action', 3)
self.timeout_seconds = self.config.get('timeout', 60)
def run_with_safeguards(self, task):
"""带安全防护的执行"""
action_counter = {} # 记录每个action的调用次数
start_time = time.time()
for step in range(self.max_steps):
# 超时检查
if time.time() - start_time > self.timeout_seconds:
return self.graceful_timeout(task, step)
# 获取下一步动作
action = self.get_next_action(task)
# 死循环检测
action_key = str(action)
action_counter[action_key] = action_counter.get(action_key, 0) + 1
if action_counter[action_key] > self.max_same_action:
# 检测到重复动作,强制换策略
action = self.force_alternative_action(task, action)
if action is None:
return self.graceful_exit(
"检测到死循环,无法找到替代方案", step
)
# 执行动作(带重试)
result = self.execute_with_retry(action)
if result.get('is_complete'):
return result
return self.graceful_exit("达到最大步数限制", self.max_steps)
def execute_with_retry(self, action):
"""带重试的工具执行"""
for attempt in range(self.max_retries_per_tool):
try:
result = self.execute_action(action)
if result['status'] == 'success':
return result
# 可重试的错误
if result.get('retryable', False):
time.sleep(2 ** attempt) # 指数退避
continue
else:
# 不可重试的错误,换策略
return self.handle_non_retryable_error(action, result)
except Exception as e:
if attempt == self.max_retries_per_tool - 1:
return {
'status': 'error',
'error': str(e),
'action': action
}
time.sleep(2 ** attempt)
return {'status': 'max_retries_exceeded', 'action': action}
def force_alternative_action(self, task, stuck_action):
"""强制选择替代方案"""
prompt = f"""
之前的动作 {stuck_action} 已经重复执行多次但没有进展。
请选择一个完全不同的方法来完成任务:{task}
要求:
1. 不能再使用相同的工具和参数
2. 尝试从不同角度解决问题
3. 如果确实无法完成,请直接给出最终答案
"""
response = self.llm.generate(prompt)
return self.parse_action(response)
def graceful_exit(self, reason, steps_taken):
"""优雅退出"""
return {
'status': 'terminated',
'reason': reason,
'steps_taken': steps_taken,
'partial_results': self.get_partial_results()
}
def graceful_timeout(self, task, steps_taken):
"""超时处理"""
# 尝试基于已有结果给出部分回答
partial = self.summarize_progress(task)
return {
'status': 'timeout',
'partial_answer': partial,
'steps_taken': steps_taken
}
记忆管理
class AgentMemory:
"""
Agent记忆管理
短期记忆:当前对话历史(上下文窗口内)
长期记忆:向量存储的历史经验
工作记忆:当前任务的中间状态
"""
def __init__(self, vector_store, max_short_term=20):
self.vector_store = vector_store
self.max_short_term = max_short_term
self.short_term = [] # 当前对话
self.working_memory = {} # 当前任务状态
def add_to_short_term(self, message):
"""添加到短期记忆"""
self.short_term.append(message)
# 超出限制时压缩
if len(self.short_term) > self.max_short_term:
self.compress_short_term()
def compress_short_term(self):
"""压缩短期记忆"""
# 保留最近的消息,压缩早期的
early_messages = self.short_term[:self.max_short_term // 2]
recent_messages = self.short_term[self.max_short_term // 2:]
# 用LLM总结早期消息
summary = self.summarize(early_messages)
self.short_term = [
{'role': 'system', 'content': f'之前的对话摘要:{summary}'}
] + recent_messages
def save_to_long_term(self, experience):
"""保存到长期记忆"""
# 将经验向量化存储
embedding = self.embed(experience['description'])
self.vector_store.upsert(
id=experience['id'],
vector=embedding,
metadata={
'task_type': experience['task_type'],
'outcome': experience['outcome'],
'strategy': experience['strategy'],
'timestamp': time.time()
}
)
def recall_relevant_experience(self, current_task, top_k=3):
"""回忆相关经验"""
query_embedding = self.embed(current_task)
results = self.vector_store.search(
vector=query_embedding,
top_k=top_k,
filter={'outcome': 'success'} # 只回忆成功经验
)
return results
def update_working_memory(self, key, value):
"""更新工作记忆"""
self.working_memory[key] = {
'value': value,
'updated_at': time.time()
}
def get_context_for_decision(self, current_task):
"""获取决策所需的完整上下文"""
return {
'short_term': self.short_term[-10:], # 最近10条
'working_memory': self.working_memory,
'relevant_experience': self.recall_relevant_experience(current_task)
}
多Agent协作
class MultiAgentSystem:
"""
多Agent协作模式
1. Supervisor模式:一个主Agent分配任务给子Agent
2. Sequential模式:Agent按顺序传递结果
3. Parallel模式:多个Agent并行处理,汇总结果
"""
def __init__(self, agents, mode='supervisor'):
self.agents = agents
self.mode = mode
def supervisor_mode(self, task):
"""
Supervisor模式
主Agent负责任务分解和结果汇总
"""
supervisor = self.agents['supervisor']
workers = {k: v for k, v in self.agents.items() if k != 'supervisor'}
# 1. 主Agent分解任务
subtasks = supervisor.decompose_task(task)
# 2. 分配给子Agent
results = {}
for subtask in subtasks:
assigned_agent = supervisor.assign(subtask, workers)
result = workers[assigned_agent].run(subtask)
results[subtask['id']] = result
# 3. 主Agent汇总结果
final_result = supervisor.synthesize(task, results)
return final_result
def sequential_mode(self, task):
"""
Sequential模式
Agent按顺序处理,每个Agent的输出是下一个的输入
"""
current_input = task
for agent_name, agent in self.agents.items():
result = agent.run(current_input)
current_input = result # 传递给下一个
return current_input
def parallel_mode(self, task):
"""
Parallel模式
多个Agent并行处理同一任务,汇总最佳结果
"""
import concurrent.futures
results = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(agent.run, task): name
for name, agent in self.agents.items()
}
for future in concurrent.futures.as_completed(futures):
agent_name = futures[future]
try:
results[agent_name] = future.result()
except Exception as e:
results[agent_name] = {'status': 'error', 'error': str(e)}
# 选择最佳结果
best_result = self.select_best(results)
return best_result
def select_best(self, results):
"""选择最佳结果"""
valid_results = {
k: v for k, v in results.items()
if v.get('status') == 'success'
}
if not valid_results:
return {'status': 'all_failed', 'results': results}
# 按质量评分选择
scored = {
k: self.score_result(v) for k, v in valid_results.items()
}
best_agent = max(scored, key=scored.get)
return valid_results[best_agent]
面试时怎么讲
# 面试表达模板
INTERVIEW_TEMPLATE = """
1. 先说Agent解决什么问题:
"Agent让LLM从被动回答变成主动执行,能自主规划和调用工具完成复杂任务"
2. 说核心机制(ReAct):
"核心是 Thought-Action-Observation 循环,模型先思考再行动再观察结果"
3. 说关键设计决策:
- 规划策略:简单任务用adaptive,复杂任务用plan-then-execute
- 错误处理:重试+死循环检测+优雅退出
- 记忆管理:短期(对话历史)+ 长期(向量存储)
4. 说工程挑战:
- 死循环防护(重复动作检测+最大步数限制)
- 成本控制(每步都调LLM,token消耗大)
- 可观测性(每步都要有trace)
5. 如果被追问多Agent:
- Supervisor:适合任务可分解的场景
- Sequential:适合流水线处理
- Parallel:适合需要多角度验证的场景
"""
易错点
- 只说”用了 LangChain Agent”但讲不清 ReAct 循环怎么工作
- 忽略死循环和错误处理,面试官一定会追问
- 把 Agent 说得太神奇,忽略实际工程中的不稳定性
- 多 Agent 协作讲得太复杂,面试时简单说清模式即可
- 不提成本问题(Agent 每步都调 LLM,token 消耗很大)
记忆技巧
记住 Agent 面试的”一个循环 + 三个防护 + 三种协作”:
一个循环: ReAct = Thought → Action → Observation → 循环
三个防护:
- 最大步数限制(防无限循环)
- 重复动作检测(防死循环)
- 超时机制(防卡死)
三种多Agent协作:
- Supervisor(主从分配)
- Sequential(流水线)
- Parallel(并行竞争)
面试速答版
- Agent 核心是 ReAct 循环:思考→行动→观察→再思考,直到任务完成
- 规划策略两种:Plan-then-Execute(任务明确时)vs Adaptive(不确定时)
- 必须有三层防护:最大步数、重复动作检测、超时机制,防止死循环和失控
- 错误处理:工具失败要重试(指数退避)、不可恢复时优雅退出给部分结果
- 记忆分三层:短期(对话历史)、工作记忆(当前任务状态)、长期(向量存储经验)
- 多Agent协作三种模式:Supervisor(分配)、Sequential(流水线)、Parallel(竞争)
- 面试关键:强调 Agent 是系统工程,不是一个 Prompt 能搞定的,要讲容错和可观测
Related · Agent 与工具调用