🧠AI Agent 与工具调用
Agent规划与状态管理
面试回答
常见问法
Agent 系统为什么需要状态管理和失败恢复?
回答
因为 Agent 往往不是一步完成任务,而是多轮决策、多次工具调用和中间状态累积。如果没有状态管理,你很难知道它做到哪一步、为什么失败、失败后该从哪里继续。
# Agent状态管理示例
class AgentStateManager:
def __init__(self, persistence_backend):
self.persistence = persistence_backend
self.max_retries = 3
self.checkpoint_interval = 5 # 每5步保存一次状态
def execute_with_state_management(self, task, agent, max_steps=20):
"""
带状态管理的Agent执行
1. 初始化或恢复状态
2. 分步骤执行任务
3. 定期保存检查点
4. 失败时恢复或重试
"""
# 1. 初始化状态
state = self.initialize_or_restore_state(task)
# 2. 分步骤执行
for step in range(max_steps):
try:
# 检查是否需要保存检查点
if step % self.checkpoint_interval == 0:
self.save_checkpoint(state, step)
# 执行单步
step_result = agent.execute_step(state)
# 更新状态
state = self.update_state(state, step_result)
# 检查任务完成
if step_result.get('task_completed', False):
return {
'status': 'success',
'final_state': state,
'steps': step + 1,
'result': step_result['final_result']
}
except AgentExecutionError as e:
# 3. 错误处理
retry_count = state.get('retry_count', 0)
if retry_count < self.max_retries:
# 重试:回滚到上一步
state = self.rollback_state(state)
state['retry_count'] = retry_count + 1
# 等待后重试
time.sleep(1 * retry_count) # 指数退避
continue
else:
# 无法恢复:人工接管
return {
'status': 'failed',
'failed_step': step,
'error': str(e),
'state': state,
'requires_human_intervention': True
}
except Exception as e:
# 未预期错误
return {
'status': 'error',
'error': str(e),
'state': state
}
# 4. 任务超时
return {
'status': 'timeout',
'final_state': state,
'steps': max_steps
}
def initialize_or_restore_state(self, task):
"""初始化或恢复状态"""
task_id = task['id']
# 检查是否有保存的状态
saved_state = self.persistence.load(task_id)
if saved_state:
# 恢复状态
return saved_state
else:
# 初始化新状态
return {
'task_id': task_id,
'task_description': task['description'],
'current_step': 0,
'total_steps': 0,
'retry_count': 0,
'checkpoint_step': 0,
'intermediate_results': [],
'tool_calls': [],
'errors': [],
'metadata': {
'start_time': time.time(),
'last_update': time.time()
}
}
def update_state(self, state, step_result):
"""更新状态"""
# 增加步骤计数
state['current_step'] += 1
state['total_steps'] += 1
# 记录工具调用
if 'tool_call' in step_result:
state['tool_calls'].append({
'step': state['current_step'],
'tool': step_result['tool_call'],
'result': step_result.get('tool_result')
})
# 记录中间结果
if 'intermediate_result' in step_result:
state['intermediate_results'].append({
'step': state['current_step'],
'result': step_result['intermediate_result']
})
# 更新时间戳
state['metadata']['last_update'] = time.time()
return state
def save_checkpoint(self, state, step):
"""保存检查点"""
checkpoint_data = {
'state': state,
'checkpoint_time': time.time(),
'step': step
}
self.persistence.save(state['task_id'], checkpoint_data)
# 更新检查点步骤
state['checkpoint_step'] = step
def rollback_state(self, state):
"""回滚状态"""
# 回滚到上一个检查点
checkpoint_step = state['checkpoint_step']
if checkpoint_step > 0:
# 加载检查点状态
checkpoint_data = self.persistence.load(state['task_id'])
return checkpoint_data['state']
else:
# 无法回滚,重新开始
return self.initialize_or_restore_state({
'id': state['task_id'],
'description': state['task_description']
})
# Agent执行器
class AgentExecutor:
def __init__(self, llm_client, tool_registry, planner):
self.llm_client = llm_client
self.tool_registry = tool_registry
self.planner = planner
def execute_step(self, state):
"""
执行单步Agent推理
1. 基于当前状态生成计划
2. 执行工具调用
3. 生成下一步决策
"""
# 1. 生成当前步骤计划
current_plan = self.planner.generate_plan(
state['task_description'],
state.get('intermediate_results', []),
state['current_step']
)
# 2. 执行工具调用(如果需要)
tool_result = None
if current_plan.get('requires_tool_call'):
tool_call = current_plan['tool_call']
tool_result = self.execute_tool(tool_call)
# 检查工具执行结果
if tool_result['status'] != 'success':
raise AgentExecutionError(
f"工具执行失败:{tool_result.get('error')}"
)
# 3. 生成下一步决策
next_decision = self.llm_client.generate_next_step(
task=state['task_description'],
current_step=state['current_step'],
plan=current_plan,
tool_result=tool_result
)
return {
'current_step': state['current_step'],
'plan': current_plan,
'tool_call': tool_call if tool_result else None,
'tool_result': tool_result,
'intermediate_result': next_decision.get('intermediate_result'),
'task_completed': next_decision.get('task_completed', False),
'final_result': next_decision.get('final_result')
}
追问
- 长任务为什么更需要 checkpoint?(防止状态丢失)
- 什么场景下应该让人工接管?(复杂错误恢复)
- 为什么”全自动 Agent”常常不稳定?(缺乏监督)
原理展开
Agent 的难点不只是规划,而是执行链路的不确定性。工具可能失败,外部数据可能变化,模型也可能走偏。所以系统一般要保存步骤状态、工具结果和错误信息,并设计重试、回滚或人工介入机制。
从面试表达上,最好强调 Agent 是”系统工程问题”,不是单个 Prompt 就能解决的问题。
# 高级状态管理:支持人工接管
class HumanInLoopStateManager(AgentStateManager):
def __init__(self, persistence_backend, notification_service):
super().__init__(persistence_backend)
self.notification = notification_service
def handle_requiring_human_intervention(self, state, error_info):
"""
处理需要人工干预的情况
1. 通知相关人员
2. 保存当前状态
3. 等待人工处理
4. 根据人工决策继续
"""
# 1. 保存需要人工处理的状态
task_id = state['task_id']
self.persistence.save_human_intervention_state(
task_id, state, error_info
)
# 2. 发送通知
self.notification.send_alert(
title=f"Agent任务需要人工干预:{task_id}",
message=f"任务在步骤{state['current_step']}遇到错误",
priority='high',
context={
'task_description': state['task_description'],
'failed_step': state['current_step'],
'error_info': error_info,
'state_snapshot': self.summarize_state(state)
}
)
# 3. 等待人工处理
human_decision = self.wait_for_human_decision(task_id)
# 4. 根据人工决策执行
if human_decision['action'] == 'continue':
# 人工修复后继续
state = self.apply_human_fix(state, human_decision['fix'])
return state
elif human_decision['action'] == 'abort':
# 人工终止任务
raise TaskAbortedError("任务被人工终止")
elif human_decision['action'] == 'retry':
# 人工重试
state = self.rollback_state(state)
state['retry_count'] = 0
return state
def wait_for_human_decision(self, task_id, timeout=3600):
"""
等待人工决策,支持超时
实现轮询或webhook等待机制
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 检查是否有新的人工决策
decision = self.persistence.load_human_decision(task_id)
if decision:
return decision
# 等待一段时间
time.sleep(5)
# 超时:默认处理
return {
'action': 'timeout',
'reason': '等待人工决策超时'
}
def apply_human_fix(self, state, fix):
"""应用人工修复"""
# 根据修复类型更新状态
if fix['type'] == 'tool_result_fix':
# 修复工具结果
for tool_call in state['tool_calls']:
if tool_call['step'] == fix['step']:
tool_call['result'] = fix['new_result']
break
elif fix['type'] == 'state_override':
# 覆盖状态
state.update(fix['state_changes'])
# 重置重试计数
state['retry_count'] = 0
return state
# 状态监控仪表板
class AgentStateDashboard:
def __init__(self, state_manager):
self.state_manager = state_manager
def get_task_status(self, task_id):
"""获取任务状态"""
state = self.state_manager.persistence.load(task_id)
if not state:
return {'status': 'not_found'}
# 计算执行时间
start_time = state['metadata']['start_time']
current_time = time.time()
execution_time = current_time - start_time
# 统计信息
stats = {
'total_steps': state['total_steps'],
'tool_calls': len(state['tool_calls']),
'errors': len(state['errors']),
'execution_time': execution_time,
'checkpoint_step': state['checkpoint_step']
}
return {
'status': 'running',
'current_step': state['current_step'],
'stats': stats,
'recent_tool_calls': state['tool_calls'][-5:], # 最近5个工具调用
'recent_errors': state['errors'][-3:] # 最近3个错误
}
def get_all_tasks_summary(self, status_filter=None):
"""获取所有任务摘要"""
task_ids = self.state_manager.persistence.list_tasks()
summaries = []
for task_id in task_ids:
status = self.get_task_status(task_id)
if status_filter is None or status['status'] == status_filter:
summaries.append({
'task_id': task_id,
'status': status['status'],
'current_step': status.get('current_step', 0),
'execution_time': status.get('stats', {}).get('execution_time', 0)
})
return summaries
易错点
- 只讲规划,不讲执行稳定性
- 没有状态就试图做长链路任务
- 忽略人工介入机制
- 状态设计过于复杂导致维护困难
记忆技巧
记住Agent状态管理三要素:
- 检查点 = “定期保存进度”
- 回滚机制 = “出错时恢复”
- 人工接管 = “复杂情况人工处理”
典型应用场景:
- 长任务处理:需要状态持久化
- 复杂决策:需要人工监督
- 错误恢复:需要回滚机制
- 任务监控:需要状态仪表板