🧠AI Agent 与工具调用

Agent面试实战题

难度:⭐⭐ | 高频指数:🔥🔥🔥 | 应用岗相关度:★★★

面试回答

常见问法

  • “设计一个能自动完成任务的 Agent 系统”
  • “ReAct 模式是什么?你怎么用的?”
  • “Agent 死循环怎么防?错误怎么处理?”
  • “多 Agent 协作怎么设计?“

回答

Agent 的核心是让 LLM 不只是回答问题,而是能规划任务、调用工具、观察结果、决定下一步。面试时要讲清楚规划策略、执行机制和容错设计。

# ReAct 模式实现
class ReActAgent:
    """
    ReAct = Reasoning + Acting
    
    循环:思考(Thought) → 行动(Action) → 观察(Observation) → 思考...
    直到任务完成或达到最大步数
    """
    
    def __init__(self, llm, tools, max_steps=10):
        self.llm = llm
        self.tools = {t.name: t for t in tools}
        self.max_steps = max_steps
        self.history = []
    
    def run(self, task):
        """执行任务"""
        for step in range(self.max_steps):
            # 1. Reasoning:让模型思考下一步
            thought = self.think(task, self.history)
            
            # 2. 判断是否完成
            if thought.get('is_final_answer'):
                return {
                    'status': 'success',
                    'answer': thought['answer'],
                    'steps': len(self.history)
                }
            
            # 3. Acting:执行工具调用
            action = thought.get('action')
            if action:
                observation = self.act(action)
            else:
                observation = "没有指定工具调用"
            
            # 4. 记录历史
            self.history.append({
                'step': step + 1,
                'thought': thought.get('reasoning', ''),
                'action': action,
                'observation': observation
            })
        
        # 超过最大步数
        return {
            'status': 'max_steps_reached',
            'partial_result': self.history[-1] if self.history else None,
            'steps': self.max_steps
        }
    
    def think(self, task, history):
        """让模型思考下一步"""
        prompt = f"""你是一个任务执行Agent。根据任务和历史记录,决定下一步。

任务:{task}

可用工具:
{self.format_tools()}

历史记录:
{self.format_history(history)}

请按以下格式回答:
思考:[你的推理过程]
行动:[工具名称]({{"参数名": "参数值"}})
或者
思考:[你的推理过程]
最终答案:[任务的最终回答]
"""
        response = self.llm.generate(prompt)
        return self.parse_thought(response)
    
    def act(self, action):
        """执行工具调用"""
        tool_name = action.get('tool')
        params = action.get('params', {})
        
        if tool_name not in self.tools:
            return f"错误:工具 {tool_name} 不存在"
        
        try:
            result = self.tools[tool_name].execute(**params)
            return str(result)
        except Exception as e:
            return f"工具执行失败:{str(e)}"
    
    def format_tools(self):
        """格式化工具描述"""
        descriptions = []
        for name, tool in self.tools.items():
            descriptions.append(
                f"- {name}: {tool.description}\n"
                f"  参数: {tool.parameters}"
            )
        return "\n".join(descriptions)
    
    def format_history(self, history):
        """格式化历史记录"""
        if not history:
            return "(无历史记录)"
        
        lines = []
        for h in history:
            lines.append(f"步骤{h['step']}:")
            lines.append(f"  思考: {h['thought']}")
            lines.append(f"  行动: {h['action']}")
            lines.append(f"  观察: {h['observation']}")
        return "\n".join(lines)
    
    def parse_thought(self, response):
        """解析模型输出"""
        import re
        
        # 检查是否是最终答案
        final_match = re.search(r'最终答案[::]\s*(.+)', response, re.DOTALL)
        if final_match:
            return {
                'is_final_answer': True,
                'answer': final_match.group(1).strip()
            }
        
        # 解析行动
        action_match = re.search(
            r'行动[::]\s*(\w+)\((.+?)\)', response, re.DOTALL
        )
        reasoning_match = re.search(r'思考[::]\s*(.+?)(?=行动|最终)', response, re.DOTALL)
        
        result = {
            'is_final_answer': False,
            'reasoning': reasoning_match.group(1).strip() if reasoning_match else ''
        }
        
        if action_match:
            import json
            tool_name = action_match.group(1)
            try:
                params = json.loads(action_match.group(2))
            except json.JSONDecodeError:
                params = {}
            
            result['action'] = {'tool': tool_name, 'params': params}
        
        return result

追问

规划策略怎么选?

# 两种主要规划策略对比
class PlanningStrategies:
    """
    策略1: Plan-then-Execute(先规划再执行)
    - 适合:任务明确、步骤可预测
    - 优点:全局最优、可预览计划
    - 缺点:计划可能因中间结果变化而失效
    
    策略2: Adaptive(边做边调整)
    - 适合:任务不确定、需要根据中间结果调整
    - 优点:灵活、能处理意外情况
    - 缺点:可能走弯路、难以预估总步数
    """
    
    def plan_then_execute(self, task):
        """先规划再执行"""
        # 1. 生成完整计划
        plan = self.llm.generate(f"为以下任务生成执行计划:{task}")
        steps = self.parse_plan(plan)
        
        # 2. 逐步执行
        results = []
        for step in steps:
            result = self.execute_step(step)
            results.append(result)
            
            # 如果某步失败,重新规划
            if result['status'] == 'failed':
                remaining_steps = self.replan(task, results, steps)
                steps = steps[:len(results)] + remaining_steps
        
        return results
    
    def adaptive_execution(self, task):
        """边做边调整"""
        context = {'task': task, 'results': [], 'step': 0}
        
        while not self.is_task_complete(context):
            # 每一步都重新决策
            next_action = self.decide_next_step(context)
            result = self.execute_step(next_action)
            
            context['results'].append(result)
            context['step'] += 1
            
            # 安全检查
            if context['step'] > 15:
                break
        
        return context['results']

原理展开

错误处理与死循环防护

class RobustAgent:
    """带完整错误处理的Agent"""
    
    def __init__(self, llm, tools, config=None):
        self.llm = llm
        self.tools = tools
        self.config = config or {}
        
        # 防护参数
        self.max_steps = self.config.get('max_steps', 10)
        self.max_retries_per_tool = self.config.get('max_retries', 2)
        self.max_same_action = self.config.get('max_same_action', 3)
        self.timeout_seconds = self.config.get('timeout', 60)
    
    def run_with_safeguards(self, task):
        """带安全防护的执行"""
        action_counter = {}  # 记录每个action的调用次数
        start_time = time.time()
        
        for step in range(self.max_steps):
            # 超时检查
            if time.time() - start_time > self.timeout_seconds:
                return self.graceful_timeout(task, step)
            
            # 获取下一步动作
            action = self.get_next_action(task)
            
            # 死循环检测
            action_key = str(action)
            action_counter[action_key] = action_counter.get(action_key, 0) + 1
            
            if action_counter[action_key] > self.max_same_action:
                # 检测到重复动作,强制换策略
                action = self.force_alternative_action(task, action)
                if action is None:
                    return self.graceful_exit(
                        "检测到死循环,无法找到替代方案", step
                    )
            
            # 执行动作(带重试)
            result = self.execute_with_retry(action)
            
            if result.get('is_complete'):
                return result
        
        return self.graceful_exit("达到最大步数限制", self.max_steps)
    
    def execute_with_retry(self, action):
        """带重试的工具执行"""
        for attempt in range(self.max_retries_per_tool):
            try:
                result = self.execute_action(action)
                
                if result['status'] == 'success':
                    return result
                
                # 可重试的错误
                if result.get('retryable', False):
                    time.sleep(2 ** attempt)  # 指数退避
                    continue
                else:
                    # 不可重试的错误,换策略
                    return self.handle_non_retryable_error(action, result)
                    
            except Exception as e:
                if attempt == self.max_retries_per_tool - 1:
                    return {
                        'status': 'error',
                        'error': str(e),
                        'action': action
                    }
                time.sleep(2 ** attempt)
        
        return {'status': 'max_retries_exceeded', 'action': action}
    
    def force_alternative_action(self, task, stuck_action):
        """强制选择替代方案"""
        prompt = f"""
之前的动作 {stuck_action} 已经重复执行多次但没有进展。
请选择一个完全不同的方法来完成任务:{task}

要求:
1. 不能再使用相同的工具和参数
2. 尝试从不同角度解决问题
3. 如果确实无法完成,请直接给出最终答案
"""
        response = self.llm.generate(prompt)
        return self.parse_action(response)
    
    def graceful_exit(self, reason, steps_taken):
        """优雅退出"""
        return {
            'status': 'terminated',
            'reason': reason,
            'steps_taken': steps_taken,
            'partial_results': self.get_partial_results()
        }
    
    def graceful_timeout(self, task, steps_taken):
        """超时处理"""
        # 尝试基于已有结果给出部分回答
        partial = self.summarize_progress(task)
        return {
            'status': 'timeout',
            'partial_answer': partial,
            'steps_taken': steps_taken
        }

记忆管理

class AgentMemory:
    """
    Agent记忆管理
    
    短期记忆:当前对话历史(上下文窗口内)
    长期记忆:向量存储的历史经验
    工作记忆:当前任务的中间状态
    """
    
    def __init__(self, vector_store, max_short_term=20):
        self.vector_store = vector_store
        self.max_short_term = max_short_term
        self.short_term = []  # 当前对话
        self.working_memory = {}  # 当前任务状态
    
    def add_to_short_term(self, message):
        """添加到短期记忆"""
        self.short_term.append(message)
        
        # 超出限制时压缩
        if len(self.short_term) > self.max_short_term:
            self.compress_short_term()
    
    def compress_short_term(self):
        """压缩短期记忆"""
        # 保留最近的消息,压缩早期的
        early_messages = self.short_term[:self.max_short_term // 2]
        recent_messages = self.short_term[self.max_short_term // 2:]
        
        # 用LLM总结早期消息
        summary = self.summarize(early_messages)
        
        self.short_term = [
            {'role': 'system', 'content': f'之前的对话摘要:{summary}'}
        ] + recent_messages
    
    def save_to_long_term(self, experience):
        """保存到长期记忆"""
        # 将经验向量化存储
        embedding = self.embed(experience['description'])
        
        self.vector_store.upsert(
            id=experience['id'],
            vector=embedding,
            metadata={
                'task_type': experience['task_type'],
                'outcome': experience['outcome'],
                'strategy': experience['strategy'],
                'timestamp': time.time()
            }
        )
    
    def recall_relevant_experience(self, current_task, top_k=3):
        """回忆相关经验"""
        query_embedding = self.embed(current_task)
        
        results = self.vector_store.search(
            vector=query_embedding,
            top_k=top_k,
            filter={'outcome': 'success'}  # 只回忆成功经验
        )
        
        return results
    
    def update_working_memory(self, key, value):
        """更新工作记忆"""
        self.working_memory[key] = {
            'value': value,
            'updated_at': time.time()
        }
    
    def get_context_for_decision(self, current_task):
        """获取决策所需的完整上下文"""
        return {
            'short_term': self.short_term[-10:],  # 最近10条
            'working_memory': self.working_memory,
            'relevant_experience': self.recall_relevant_experience(current_task)
        }

多Agent协作

class MultiAgentSystem:
    """
    多Agent协作模式
    
    1. Supervisor模式:一个主Agent分配任务给子Agent
    2. Sequential模式:Agent按顺序传递结果
    3. Parallel模式:多个Agent并行处理,汇总结果
    """
    
    def __init__(self, agents, mode='supervisor'):
        self.agents = agents
        self.mode = mode
    
    def supervisor_mode(self, task):
        """
        Supervisor模式
        主Agent负责任务分解和结果汇总
        """
        supervisor = self.agents['supervisor']
        workers = {k: v for k, v in self.agents.items() if k != 'supervisor'}
        
        # 1. 主Agent分解任务
        subtasks = supervisor.decompose_task(task)
        
        # 2. 分配给子Agent
        results = {}
        for subtask in subtasks:
            assigned_agent = supervisor.assign(subtask, workers)
            result = workers[assigned_agent].run(subtask)
            results[subtask['id']] = result
        
        # 3. 主Agent汇总结果
        final_result = supervisor.synthesize(task, results)
        return final_result
    
    def sequential_mode(self, task):
        """
        Sequential模式
        Agent按顺序处理,每个Agent的输出是下一个的输入
        """
        current_input = task
        
        for agent_name, agent in self.agents.items():
            result = agent.run(current_input)
            current_input = result  # 传递给下一个
        
        return current_input
    
    def parallel_mode(self, task):
        """
        Parallel模式
        多个Agent并行处理同一任务,汇总最佳结果
        """
        import concurrent.futures
        
        results = {}
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(agent.run, task): name
                for name, agent in self.agents.items()
            }
            
            for future in concurrent.futures.as_completed(futures):
                agent_name = futures[future]
                try:
                    results[agent_name] = future.result()
                except Exception as e:
                    results[agent_name] = {'status': 'error', 'error': str(e)}
        
        # 选择最佳结果
        best_result = self.select_best(results)
        return best_result
    
    def select_best(self, results):
        """选择最佳结果"""
        valid_results = {
            k: v for k, v in results.items() 
            if v.get('status') == 'success'
        }
        
        if not valid_results:
            return {'status': 'all_failed', 'results': results}
        
        # 按质量评分选择
        scored = {
            k: self.score_result(v) for k, v in valid_results.items()
        }
        best_agent = max(scored, key=scored.get)
        return valid_results[best_agent]

面试时怎么讲

# 面试表达模板
INTERVIEW_TEMPLATE = """
1. 先说Agent解决什么问题:
   "Agent让LLM从被动回答变成主动执行,能自主规划和调用工具完成复杂任务"

2. 说核心机制(ReAct):
   "核心是 Thought-Action-Observation 循环,模型先思考再行动再观察结果"

3. 说关键设计决策:
   - 规划策略:简单任务用adaptive,复杂任务用plan-then-execute
   - 错误处理:重试+死循环检测+优雅退出
   - 记忆管理:短期(对话历史)+ 长期(向量存储)

4. 说工程挑战:
   - 死循环防护(重复动作检测+最大步数限制)
   - 成本控制(每步都调LLM,token消耗大)
   - 可观测性(每步都要有trace)

5. 如果被追问多Agent:
   - Supervisor:适合任务可分解的场景
   - Sequential:适合流水线处理
   - Parallel:适合需要多角度验证的场景
"""

易错点

  • 只说”用了 LangChain Agent”但讲不清 ReAct 循环怎么工作
  • 忽略死循环和错误处理,面试官一定会追问
  • 把 Agent 说得太神奇,忽略实际工程中的不稳定性
  • 多 Agent 协作讲得太复杂,面试时简单说清模式即可
  • 不提成本问题(Agent 每步都调 LLM,token 消耗很大)

记忆技巧

记住 Agent 面试的”一个循环 + 三个防护 + 三种协作”:

一个循环: ReAct = Thought → Action → Observation → 循环

三个防护:

  1. 最大步数限制(防无限循环)
  2. 重复动作检测(防死循环)
  3. 超时机制(防卡死)

三种多Agent协作:

  1. Supervisor(主从分配)
  2. Sequential(流水线)
  3. Parallel(并行竞争)

面试速答版

  • Agent 核心是 ReAct 循环:思考→行动→观察→再思考,直到任务完成
  • 规划策略两种:Plan-then-Execute(任务明确时)vs Adaptive(不确定时)
  • 必须有三层防护:最大步数、重复动作检测、超时机制,防止死循环和失控
  • 错误处理:工具失败要重试(指数退避)、不可恢复时优雅退出给部分结果
  • 记忆分三层:短期(对话历史)、工作记忆(当前任务状态)、长期(向量存储经验)
  • 多Agent协作三种模式:Supervisor(分配)、Sequential(流水线)、Parallel(竞争)
  • 面试关键:强调 Agent 是系统工程,不是一个 Prompt 能搞定的,要讲容错和可观测
Related · Agent 与工具调用