🧠AI Agent 与工具调用

Agent规划与状态管理

面试回答

常见问法

Agent 系统为什么需要状态管理和失败恢复?

回答

因为 Agent 往往不是一步完成任务,而是多轮决策、多次工具调用和中间状态累积。如果没有状态管理,你很难知道它做到哪一步、为什么失败、失败后该从哪里继续。

# Agent状态管理示例
class AgentStateManager:
    def __init__(self, persistence_backend):
        self.persistence = persistence_backend
        self.max_retries = 3
        self.checkpoint_interval = 5  # 每5步保存一次状态
    
    def execute_with_state_management(self, task, agent, max_steps=20):
        """
        带状态管理的Agent执行
        
        1. 初始化或恢复状态
        2. 分步骤执行任务
        3. 定期保存检查点
        4. 失败时恢复或重试
        """
        # 1. 初始化状态
        state = self.initialize_or_restore_state(task)
        
        # 2. 分步骤执行
        for step in range(max_steps):
            try:
                # 检查是否需要保存检查点
                if step % self.checkpoint_interval == 0:
                    self.save_checkpoint(state, step)
                
                # 执行单步
                step_result = agent.execute_step(state)
                
                # 更新状态
                state = self.update_state(state, step_result)
                
                # 检查任务完成
                if step_result.get('task_completed', False):
                    return {
                        'status': 'success',
                        'final_state': state,
                        'steps': step + 1,
                        'result': step_result['final_result']
                    }
                
            except AgentExecutionError as e:
                # 3. 错误处理
                retry_count = state.get('retry_count', 0)
                
                if retry_count < self.max_retries:
                    # 重试:回滚到上一步
                    state = self.rollback_state(state)
                    state['retry_count'] = retry_count + 1
                    
                    # 等待后重试
                    time.sleep(1 * retry_count)  # 指数退避
                    continue
                else:
                    # 无法恢复:人工接管
                    return {
                        'status': 'failed',
                        'failed_step': step,
                        'error': str(e),
                        'state': state,
                        'requires_human_intervention': True
                    }
            
            except Exception as e:
                # 未预期错误
                return {
                    'status': 'error',
                    'error': str(e),
                    'state': state
                }
        
        # 4. 任务超时
        return {
            'status': 'timeout',
            'final_state': state,
            'steps': max_steps
        }
    
    def initialize_or_restore_state(self, task):
        """初始化或恢复状态"""
        task_id = task['id']
        
        # 检查是否有保存的状态
        saved_state = self.persistence.load(task_id)
        
        if saved_state:
            # 恢复状态
            return saved_state
        else:
            # 初始化新状态
            return {
                'task_id': task_id,
                'task_description': task['description'],
                'current_step': 0,
                'total_steps': 0,
                'retry_count': 0,
                'checkpoint_step': 0,
                'intermediate_results': [],
                'tool_calls': [],
                'errors': [],
                'metadata': {
                    'start_time': time.time(),
                    'last_update': time.time()
                }
            }
    
    def update_state(self, state, step_result):
        """更新状态"""
        # 增加步骤计数
        state['current_step'] += 1
        state['total_steps'] += 1
        
        # 记录工具调用
        if 'tool_call' in step_result:
            state['tool_calls'].append({
                'step': state['current_step'],
                'tool': step_result['tool_call'],
                'result': step_result.get('tool_result')
            })
        
        # 记录中间结果
        if 'intermediate_result' in step_result:
            state['intermediate_results'].append({
                'step': state['current_step'],
                'result': step_result['intermediate_result']
            })
        
        # 更新时间戳
        state['metadata']['last_update'] = time.time()
        
        return state
    
    def save_checkpoint(self, state, step):
        """保存检查点"""
        checkpoint_data = {
            'state': state,
            'checkpoint_time': time.time(),
            'step': step
        }
        
        self.persistence.save(state['task_id'], checkpoint_data)
        
        # 更新检查点步骤
        state['checkpoint_step'] = step
    
    def rollback_state(self, state):
        """回滚状态"""
        # 回滚到上一个检查点
        checkpoint_step = state['checkpoint_step']
        
        if checkpoint_step > 0:
            # 加载检查点状态
            checkpoint_data = self.persistence.load(state['task_id'])
            return checkpoint_data['state']
        else:
            # 无法回滚,重新开始
            return self.initialize_or_restore_state({
                'id': state['task_id'],
                'description': state['task_description']
            })

# Agent执行器
class AgentExecutor:
    def __init__(self, llm_client, tool_registry, planner):
        self.llm_client = llm_client
        self.tool_registry = tool_registry
        self.planner = planner
    
    def execute_step(self, state):
        """
        执行单步Agent推理
        
        1. 基于当前状态生成计划
        2. 执行工具调用
        3. 生成下一步决策
        """
        # 1. 生成当前步骤计划
        current_plan = self.planner.generate_plan(
            state['task_description'],
            state.get('intermediate_results', []),
            state['current_step']
        )
        
        # 2. 执行工具调用(如果需要)
        tool_result = None
        if current_plan.get('requires_tool_call'):
            tool_call = current_plan['tool_call']
            tool_result = self.execute_tool(tool_call)
            
            # 检查工具执行结果
            if tool_result['status'] != 'success':
                raise AgentExecutionError(
                    f"工具执行失败:{tool_result.get('error')}"
                )
        
        # 3. 生成下一步决策
        next_decision = self.llm_client.generate_next_step(
            task=state['task_description'],
            current_step=state['current_step'],
            plan=current_plan,
            tool_result=tool_result
        )
        
        return {
            'current_step': state['current_step'],
            'plan': current_plan,
            'tool_call': tool_call if tool_result else None,
            'tool_result': tool_result,
            'intermediate_result': next_decision.get('intermediate_result'),
            'task_completed': next_decision.get('task_completed', False),
            'final_result': next_decision.get('final_result')
        }

追问

  • 长任务为什么更需要 checkpoint?(防止状态丢失)
  • 什么场景下应该让人工接管?(复杂错误恢复)
  • 为什么”全自动 Agent”常常不稳定?(缺乏监督)

原理展开

Agent 的难点不只是规划,而是执行链路的不确定性。工具可能失败,外部数据可能变化,模型也可能走偏。所以系统一般要保存步骤状态、工具结果和错误信息,并设计重试、回滚或人工介入机制。

从面试表达上,最好强调 Agent 是”系统工程问题”,不是单个 Prompt 就能解决的问题。

# 高级状态管理:支持人工接管
class HumanInLoopStateManager(AgentStateManager):
    def __init__(self, persistence_backend, notification_service):
        super().__init__(persistence_backend)
        self.notification = notification_service
    
    def handle_requiring_human_intervention(self, state, error_info):
        """
        处理需要人工干预的情况
        
        1. 通知相关人员
        2. 保存当前状态
        3. 等待人工处理
        4. 根据人工决策继续
        """
        # 1. 保存需要人工处理的状态
        task_id = state['task_id']
        self.persistence.save_human_intervention_state(
            task_id, state, error_info
        )
        
        # 2. 发送通知
        self.notification.send_alert(
            title=f"Agent任务需要人工干预:{task_id}",
            message=f"任务在步骤{state['current_step']}遇到错误",
            priority='high',
            context={
                'task_description': state['task_description'],
                'failed_step': state['current_step'],
                'error_info': error_info,
                'state_snapshot': self.summarize_state(state)
            }
        )
        
        # 3. 等待人工处理
        human_decision = self.wait_for_human_decision(task_id)
        
        # 4. 根据人工决策执行
        if human_decision['action'] == 'continue':
            # 人工修复后继续
            state = self.apply_human_fix(state, human_decision['fix'])
            return state
        elif human_decision['action'] == 'abort':
            # 人工终止任务
            raise TaskAbortedError("任务被人工终止")
        elif human_decision['action'] == 'retry':
            # 人工重试
            state = self.rollback_state(state)
            state['retry_count'] = 0
            return state
    
    def wait_for_human_decision(self, task_id, timeout=3600):
        """
        等待人工决策,支持超时
        
        实现轮询或webhook等待机制
        """
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            # 检查是否有新的人工决策
            decision = self.persistence.load_human_decision(task_id)
            
            if decision:
                return decision
            
            # 等待一段时间
            time.sleep(5)
        
        # 超时:默认处理
        return {
            'action': 'timeout',
            'reason': '等待人工决策超时'
        }
    
    def apply_human_fix(self, state, fix):
        """应用人工修复"""
        # 根据修复类型更新状态
        if fix['type'] == 'tool_result_fix':
            # 修复工具结果
            for tool_call in state['tool_calls']:
                if tool_call['step'] == fix['step']:
                    tool_call['result'] = fix['new_result']
                    break
        
        elif fix['type'] == 'state_override':
            # 覆盖状态
            state.update(fix['state_changes'])
        
        # 重置重试计数
        state['retry_count'] = 0
        
        return state

# 状态监控仪表板
class AgentStateDashboard:
    def __init__(self, state_manager):
        self.state_manager = state_manager
    
    def get_task_status(self, task_id):
        """获取任务状态"""
        state = self.state_manager.persistence.load(task_id)
        
        if not state:
            return {'status': 'not_found'}
        
        # 计算执行时间
        start_time = state['metadata']['start_time']
        current_time = time.time()
        execution_time = current_time - start_time
        
        # 统计信息
        stats = {
            'total_steps': state['total_steps'],
            'tool_calls': len(state['tool_calls']),
            'errors': len(state['errors']),
            'execution_time': execution_time,
            'checkpoint_step': state['checkpoint_step']
        }
        
        return {
            'status': 'running',
            'current_step': state['current_step'],
            'stats': stats,
            'recent_tool_calls': state['tool_calls'][-5:],  # 最近5个工具调用
            'recent_errors': state['errors'][-3:]  # 最近3个错误
        }
    
    def get_all_tasks_summary(self, status_filter=None):
        """获取所有任务摘要"""
        task_ids = self.state_manager.persistence.list_tasks()
        
        summaries = []
        for task_id in task_ids:
            status = self.get_task_status(task_id)
            
            if status_filter is None or status['status'] == status_filter:
                summaries.append({
                    'task_id': task_id,
                    'status': status['status'],
                    'current_step': status.get('current_step', 0),
                    'execution_time': status.get('stats', {}).get('execution_time', 0)
                })
        
        return summaries

易错点

  • 只讲规划,不讲执行稳定性
  • 没有状态就试图做长链路任务
  • 忽略人工介入机制
  • 状态设计过于复杂导致维护困难

记忆技巧

记住Agent状态管理三要素:

  1. 检查点 = “定期保存进度”
  2. 回滚机制 = “出错时恢复”
  3. 人工接管 = “复杂情况人工处理”

典型应用场景:

  • 长任务处理:需要状态持久化
  • 复杂决策:需要人工监督
  • 错误恢复:需要回滚机制
  • 任务监控:需要状态仪表板