🧠AI Eval 与观测
线上指标与Tracing
面试回答
常见问法
AI 系统上线后应该重点看哪些指标?为什么需要 Tracing?
回答
线上不仅要看答案质量,还要看延迟、成本、成功率、拒答率和工具调用失败率。Tracing 的价值在于把一次请求拆成可观察链路,让你知道问题出在检索、模型、工具还是后处理。
# 完整AI系统监控示例
class AIMonitoringSystem:
def __init__(self, metrics_backend, tracing_backend):
self.metrics = metrics_backend
self.tracing = tracing_backend
self.alert_thresholds = {
'latency_p99': 2000, # 99分位延迟
'error_rate': 0.05, # 错误率5%
'hallucination_rate': 0.1, # 幻觉率10%
'cost_per_1k_tokens': 0.05 # 每千token成本
}
def monitor_request(self, request_id, system):
"""
监控完整的AI请求流程
1. 链路追踪
2. 性能指标收集
3. 质量评估
4. 异常检测
"""
# 创建追踪span
with self.tracing.create_span('ai_request', request_id) as span:
# 记录请求开始
span.add_attribute('request.timestamp', time.time())
try:
# 1. 检索阶段监控
with self.tracing.create_span('retrieval', request_id) as retrieval_span:
retrieval_start = time.time()
# 执行检索
retrieved_docs = system.retrieve()
retrieval_time = time.time() - retrieval_start
retrieval_span.add_attribute('retrieval.time', retrieval_time)
retrieval_span.add_attribute('retrieval.count', len(retrieved_docs))
# 记录检索指标
self.metrics.timing('retrieval.latency', retrieval_time)
self.metrics.histogram('retrieval.document_count', len(retrieved_docs))
# 2. 生成阶段监控
with self.tracing.create_span('generation', request_id) as generation_span:
generation_start = time.time()
# 执行生成
response = system.generate(retrieved_docs)
generation_time = time.time() - generation_start
generation_span.add_attribute('generation.time', generation_time)
generation_span.add_attribute('generation.tokens', response['token_count'])
# 记录生成指标
self.metrics.timing('generation.latency', generation_time)
self.metrics.histogram('generation.token_count', response['token_count'])
# 3. 质量评估监控
with self.tracing.create_span('quality_check', request_id) as quality_span:
quality_start = time.time()
# 评估回答质量
quality_metrics = system.evaluate_quality(response)
hallucination_risk = system.detect_hallucination(response)
quality_time = time.time() - quality_start
quality_span.add_attribute('quality.time', quality_time)
quality_span.add_attribute('quality.score', quality_metrics['overall'])
quality_span.add_attribute('hallucination.risk', hallucination_risk)
# 记录质量指标
self.metrics.gauge('response.quality', quality_metrics['overall'])
self.metrics.gauge('hallucination.risk', hallucination_risk)
# 4. 成本监控
total_cost = system.calculate_cost(retrieved_docs, response)
span.add_attribute('request.cost', total_cost)
self.metrics.gauge('request.cost', total_cost)
# 5. 总体请求指标
total_latency = time.time() - span.start_time
span.add_attribute('request.total_latency', total_latency)
self.metrics.timing('request.total_latency', total_latency)
self.metrics.increment('request.success')
# 检查是否需要告警
self.check_alerts({
'latency': total_latency,
'cost': total_cost,
'hallucination_risk': hallucination_risk
})
# 保存完整追踪
self.tracing.finish_span(span)
return {
'status': 'success',
'response': response,
'metrics': {
'latency': total_latency,
'cost': total_cost,
'quality': quality_metrics['overall'],
'hallucination_risk': hallucination_risk
},
'trace_id': span.trace_id
}
except Exception as e:
# 错误监控
span.add_attribute('request.error', str(e))
self.metrics.increment('request.error')
self.metrics.increment(f'request.error.{type(e).__name__}')
# 保存错误追踪
self.tracing.finish_span(span, status='error')
return {
'status': 'error',
'error': str(e),
'trace_id': span.trace_id
}
def check_alerts(self, metrics):
"""检查是否需要发送告警"""
alerts = []
# 延迟告警
if metrics['latency'] > self.alert_thresholds['latency_p99']:
alerts.append({
'type': 'high_latency',
'severity': 'medium',
'value': metrics['latency'],
'threshold': self.alert_thresholds['latency_p99']
})
# 成本告警
if metrics['cost'] > self.alert_thresholds['cost_per_1k_tokens']:
alerts.append({
'type': 'high_cost',
'severity': 'high',
'value': metrics['cost'],
'threshold': self.alert_thresholds['cost_per_1k_tokens']
})
# 幻觉风险告警
if metrics['hallucination_risk'] > self.alert_thresholds['hallucination_rate']:
alerts.append({
'type': 'high_hallucination_risk',
'severity': 'high',
'value': metrics['hallucination_risk'],
'threshold': self.alert_thresholds['hallucination_rate']
})
# 发送告警
for alert in alerts:
self.send_alert(alert)
def send_alert(self, alert):
"""发送告警通知"""
# 实现告警发送逻辑
print(f"[ALERT] {alert['type']}: {alert['value']} > {alert['threshold']}")
def analyze_slow_queries(self, time_range='1h', threshold=1000):
"""
分析慢查询
1. 收集超过阈值的请求
2. 分析瓶颈
3. 生成优化建议
"""
# 查询慢请求追踪
slow_traces = self.tracing.query_traces(
min_duration=threshold,
time_range=time_range
)
# 分析瓶颈分布
bottleneck_stats = {
'retrieval': 0,
'generation': 0,
'quality_check': 0,
'other': 0
}
for trace in slow_traces:
spans = trace.get('spans', [])
for span in spans:
if span['duration'] > threshold:
bottleneck_stats[span['name']] += 1
# 生成优化建议
suggestions = []
if bottleneck_stats['retrieval'] > len(slow_traces) * 0.3:
suggestions.append("检索阶段是主要瓶颈,考虑优化:")
suggestions.append("- 增加检索缓存")
suggestions.append("- 优化Chunk策略")
suggestions.append("- 升级检索模型")
if bottleneck_stats['generation'] > len(slow_traces) * 0.3:
suggestions.append("生成阶段是主要瓶颈,考虑优化:")
suggestions.append("- 优化Prompt长度")
suggestions.append("- 使用更小的模型")
suggestions.append("- 调整生成参数")
return {
'slow_query_count': len(slow_traces),
'bottleneck_distribution': bottleneck_stats,
'optimization_suggestions': suggestions
}
def analyze_error_patterns(self, time_range='24h'):
"""
分析错误模式
1. 统计错误类型
2. 分析错误发生场景
3. 识别系统性问题
"""
# 查询错误追踪
error_traces = self.tracing.query_traces(
status='error',
time_range=time_range
)
# 错误分类统计
error_stats = {}
error_contexts = {}
for trace in error_traces:
error_type = trace.get('error_type', 'unknown')
if error_type not in error_stats:
error_stats[error_type] = 0
error_contexts[error_type] = []
error_stats[error_type] += 1
# 收集错误上下文
context = {
'timestamp': trace.get('timestamp'),
'request_type': trace.get('request_type'),
'user_id': trace.get('user_id'),
'error_message': trace.get('error_message')
}
error_contexts[error_type].append(context)
# 识别高频错误模式
patterns = []
for error_type, count in error_stats.items():
if count > len(error_traces) * 0.1: # 超过10%的错误
patterns.append({
'error_type': error_type,
'frequency': count / len(error_traces),
'contexts': error_contexts[error_type][:5] # 取前5个上下文
})
return {
'total_errors': len(error_traces),
'error_distribution': error_stats,
'significant_patterns': patterns
}
def generate_performance_report(self, time_range='7d'):
"""
生成性能报告
1. 汇总关键指标
2. 趋势分析
3. 对比分析
"""
# 收集指标数据
metrics_data = self.metrics.query_range(time_range)
# 计算关键统计
report = {
'time_range': time_range,
'latency': {
'p50': self.calculate_percentile(metrics_data['latency'], 50),
'p95': self.calculate_percentile(metrics_data['latency'], 95),
'p99': self.calculate_percentile(metrics_data['latency'], 99),
'mean': self.calculate_mean(metrics_data['latency'])
},
'quality': {
'mean': self.calculate_mean(metrics_data['quality']),
'distribution': self.calculate_distribution(metrics_data['quality'])
},
'cost': {
'total': self.calculate_sum(metrics_data['cost']),
'mean': self.calculate_mean(metrics_data['cost']),
'trend': self.calculate_trend(metrics_data['cost'])
},
'hallucination': {
'mean_risk': self.calculate_mean(metrics_data['hallucination_risk']),
'high_risk_rate': self.calculate_high_risk_rate(metrics_data['hallucination_risk'])
},
'throughput': {
'total_requests': self.calculate_sum(metrics_data['request_count']),
'success_rate': self.calculate_success_rate(metrics_data),
'error_rate': self.calculate_error_rate(metrics_data)
}
}
return report
# 使用示例
def demo_monitoring():
"""演示监控系统使用"""
# 初始化监控系统
monitoring = AIMonitoringSystem(
metrics_backend=MetricsBackend(),
tracing_backend=TracingBackend()
)
# 模拟AI系统
class DemoAISystem:
def retrieve(self):
# 模拟检索
time.sleep(0.1)
return [{'id': 1, 'content': '文档内容'}]
def generate(self, documents):
# 模拟生成
time.sleep(0.5)
return {
'text': '这是生成的回答',
'token_count': 100
}
def evaluate_quality(self, response):
# 模拟质量评估
return {'overall': 0.85}
def detect_hallucination(self, response):
# 模拟幻觉检测
return 0.1
def calculate_cost(self, documents, response):
# 模拟成本计算
return 0.03
system = DemoAISystem()
# 监控请求
result = monitoring.monitor_request('request_123', system)
print(f"请求状态:{result['status']}")
print(f"响应时间:{result['metrics']['latency']:.2f}ms")
print(f"回答质量:{result['metrics']['quality']:.2f}")
print(f"请求成本:${result['metrics']['cost']:.4f}")
# 分析性能
performance_report = monitoring.generate_performance_report('24h')
print(f"\n性能指标:")
print(f" 99分位延迟:{performance_report['latency']['p99']:.2f}ms")
print(f" 平均质量:{performance_report['quality']['mean']:.2f}")
print(f" 成功率:{performance_report['throughput']['success_rate']:.2%}")
# 分析慢查询
slow_analysis = monitoring.analyze_slow_queries('1h', threshold=1000)
print(f"\n慢查询分析:")
print(f" 慢查询数量:{slow_analysis['slow_query_count']}")
for suggestion in slow_analysis['optimization_suggestions']:
print(f" - {suggestion}")
# 指标后端模拟
class MetricsBackend:
def timing(self, name, value):
print(f"[METRIC] timing: {name}={value}")
def gauge(self, name, value):
print(f"[METRIC] gauge: {name}={value}")
def increment(self, name):
print(f"[METRIC] increment: {name}")
def histogram(self, name, value):
print(f"[METRIC] histogram: {name}={value}")
def query_range(self, time_range):
# 模拟查询历史数据
return {
'latency': [100, 200, 150, 300],
'quality': [0.8, 0.9, 0.7, 0.85],
'cost': [0.02, 0.03, 0.025, 0.035],
'hallucination_risk': [0.05, 0.1, 0.08, 0.12],
'request_count': [100, 120, 110, 130],
'error_count': [2, 3, 1, 4]
}
# 追踪后端模拟
class TracingBackend:
def create_span(self, name, trace_id):
return MockSpan(name, trace_id)
def finish_span(self, span, status='success'):
print(f"[TRACE] span finished: {span.name}={status}")
def query_traces(self, **kwargs):
# 模拟查询追踪
return []
class MockSpan:
def __init__(self, name, trace_id):
self.name = name
self.trace_id = trace_id
self.start_time = time.time()
self.attributes = {}
def add_attribute(self, key, value):
self.attributes[key] = value
def __enter__(self):
return self
def __exit__(self, *args):
pass
追问
- 为什么最终答案正确率不足以定位问题?(多阶段链路)
- 哪些环节最值得单独埋点?(关键路径)
- 回放失败案例为什么重要?(持续改进)
原理展开
AI 系统往往是多阶段链路,只有最终结果日志远远不够。Tracing 能把每一步输入输出、耗时、错误信息串起来,这样出现问题时才能快速定位瓶颈和失败原因。
线上指标和离线评测是互补关系:前者看真实流量表现,后者看可控对比。缺一边,优化都会变得盲目。
# 高级Tracing:支持回放
class ReplayTracingBackend(TracingBackend):
def __init__(self):
super().__init__()
self.trace_storage = {}
self.replay_cache = {}
def save_trace(self, trace):
"""保存追踪数据"""
trace_id = trace['trace_id']
self.trace_storage[trace_id] = trace
def replay_trace(self, trace_id, system_modifier=None):
"""
回放追踪
1. 加载原始追踪
2. 可选修改系统配置
3. 重新执行并对比
"""
trace = self.trace_storage.get(trace_id)
if not trace:
raise ValueError(f"追踪{trace_id}不存在")
# 创建回放记录
replay_id = f"replay_{trace_id}_{int(time.time())}"
# 重新执行
if system_modifier:
# 使用修改后的系统
replay_system = system_modifier(trace['original_system'])
else:
# 使用原始系统
replay_system = trace['original_system']
# 重新执行并记录
replay_trace = self.execute_and_trace(
replay_system,
trace['original_request'],
replay_id
)
# 对比结果
comparison = self.compare_traces(trace, replay_trace)
return {
'replay_id': replay_id,
'original_trace': trace,
'replay_trace': replay_trace,
'comparison': comparison
}
def compare_traces(self, original, replay):
"""
对比原始追踪和回放追踪
1. 对比输出
2. 对比性能指标
3. 对比错误
"""
comparison = {
'output_changed': original['response'] != replay['response'],
'latency_change': replay['latency'] - original['latency'],
'quality_change': replay['quality'] - original['quality'],
'cost_change': replay['cost'] - original['cost'],
'error_occurred': replay['status'] == 'error'
}
# 差异分析
if comparison['output_changed']:
comparison['diff_analysis'] = self.analyze_output_diff(
original['response'], replay['response']
)
return comparison
def find_regression(self, time_range='24h'):
"""
查找性能退化
1. 对比历史性能指标
2. 识别退化模式
3. 关联代码变更
"""
# 获取历史性能数据
historical_performance = self.get_historical_performance(time_range)
# 对比当前性能
current_performance = self.get_current_performance()
# 识别退化
regressions = []
for metric_name in ['latency', 'quality', 'cost']:
hist_values = historical_performance.get(metric_name, [])
curr_values = current_performance.get(metric_name, [])
if hist_values and curr_values:
hist_mean = sum(hist_values) / len(hist_values)
curr_mean = sum(curr_values) / len(curr_values)
# 计算变化率
change_rate = (curr_mean - hist_mean) / hist_mean
# 如果变化超过阈值,认为是退化
if abs(change_rate) > 0.2: # 20%变化
regressions.append({
'metric': metric_name,
'historical_mean': hist_mean,
'current_mean': curr_mean,
'change_rate': change_rate,
'severity': 'high' if abs(change_rate) > 0.5 else 'medium'
})
return regressions
易错点
- 只有应用层日志,没有链路级观测
- 只盯延迟和成本,不看失败样例
- 追踪数据过多导致性能问题
- 回放机制设计不当导致结果不可靠
记忆技巧
记住监控三要素:
- 指标 = “量化性能”
- 追踪 = “定位问题”
- 回放 = “验证改进”
典型应用场景:
- 性能优化:慢查询分析
- 故障排查:错误追踪
- 质量控制:质量监控
- 成本控制:成本优化