🧠AI Eval 与观测

线上指标与Tracing

面试回答

常见问法

AI 系统上线后应该重点看哪些指标?为什么需要 Tracing?

回答

线上不仅要看答案质量,还要看延迟、成本、成功率、拒答率和工具调用失败率。Tracing 的价值在于把一次请求拆成可观察链路,让你知道问题出在检索、模型、工具还是后处理。

# 完整AI系统监控示例
class AIMonitoringSystem:
    def __init__(self, metrics_backend, tracing_backend):
        self.metrics = metrics_backend
        self.tracing = tracing_backend
        self.alert_thresholds = {
            'latency_p99': 2000,  # 99分位延迟
            'error_rate': 0.05,   # 错误率5%
            'hallucination_rate': 0.1,  # 幻觉率10%
            'cost_per_1k_tokens': 0.05   # 每千token成本
        }
    
    def monitor_request(self, request_id, system):
        """
        监控完整的AI请求流程
        
        1. 链路追踪
        2. 性能指标收集
        3. 质量评估
        4. 异常检测
        """
        # 创建追踪span
        with self.tracing.create_span('ai_request', request_id) as span:
            # 记录请求开始
            span.add_attribute('request.timestamp', time.time())
            
            try:
                # 1. 检索阶段监控
                with self.tracing.create_span('retrieval', request_id) as retrieval_span:
                    retrieval_start = time.time()
                    
                    # 执行检索
                    retrieved_docs = system.retrieve()
                    
                    retrieval_time = time.time() - retrieval_start
                    retrieval_span.add_attribute('retrieval.time', retrieval_time)
                    retrieval_span.add_attribute('retrieval.count', len(retrieved_docs))
                    
                    # 记录检索指标
                    self.metrics.timing('retrieval.latency', retrieval_time)
                    self.metrics.histogram('retrieval.document_count', len(retrieved_docs))
                
                # 2. 生成阶段监控
                with self.tracing.create_span('generation', request_id) as generation_span:
                    generation_start = time.time()
                    
                    # 执行生成
                    response = system.generate(retrieved_docs)
                    
                    generation_time = time.time() - generation_start
                    generation_span.add_attribute('generation.time', generation_time)
                    generation_span.add_attribute('generation.tokens', response['token_count'])
                    
                    # 记录生成指标
                    self.metrics.timing('generation.latency', generation_time)
                    self.metrics.histogram('generation.token_count', response['token_count'])
                
                # 3. 质量评估监控
                with self.tracing.create_span('quality_check', request_id) as quality_span:
                    quality_start = time.time()
                    
                    # 评估回答质量
                    quality_metrics = system.evaluate_quality(response)
                    hallucination_risk = system.detect_hallucination(response)
                    
                    quality_time = time.time() - quality_start
                    quality_span.add_attribute('quality.time', quality_time)
                    quality_span.add_attribute('quality.score', quality_metrics['overall'])
                    quality_span.add_attribute('hallucination.risk', hallucination_risk)
                    
                    # 记录质量指标
                    self.metrics.gauge('response.quality', quality_metrics['overall'])
                    self.metrics.gauge('hallucination.risk', hallucination_risk)
                
                # 4. 成本监控
                total_cost = system.calculate_cost(retrieved_docs, response)
                span.add_attribute('request.cost', total_cost)
                self.metrics.gauge('request.cost', total_cost)
                
                # 5. 总体请求指标
                total_latency = time.time() - span.start_time
                span.add_attribute('request.total_latency', total_latency)
                
                self.metrics.timing('request.total_latency', total_latency)
                self.metrics.increment('request.success')
                
                # 检查是否需要告警
                self.check_alerts({
                    'latency': total_latency,
                    'cost': total_cost,
                    'hallucination_risk': hallucination_risk
                })
                
                # 保存完整追踪
                self.tracing.finish_span(span)
                
                return {
                    'status': 'success',
                    'response': response,
                    'metrics': {
                        'latency': total_latency,
                        'cost': total_cost,
                        'quality': quality_metrics['overall'],
                        'hallucination_risk': hallucination_risk
                    },
                    'trace_id': span.trace_id
                }
                
            except Exception as e:
                # 错误监控
                span.add_attribute('request.error', str(e))
                self.metrics.increment('request.error')
                self.metrics.increment(f'request.error.{type(e).__name__}')
                
                # 保存错误追踪
                self.tracing.finish_span(span, status='error')
                
                return {
                    'status': 'error',
                    'error': str(e),
                    'trace_id': span.trace_id
                }
    
    def check_alerts(self, metrics):
        """检查是否需要发送告警"""
        alerts = []
        
        # 延迟告警
        if metrics['latency'] > self.alert_thresholds['latency_p99']:
            alerts.append({
                'type': 'high_latency',
                'severity': 'medium',
                'value': metrics['latency'],
                'threshold': self.alert_thresholds['latency_p99']
            })
        
        # 成本告警
        if metrics['cost'] > self.alert_thresholds['cost_per_1k_tokens']:
            alerts.append({
                'type': 'high_cost',
                'severity': 'high',
                'value': metrics['cost'],
                'threshold': self.alert_thresholds['cost_per_1k_tokens']
            })
        
        # 幻觉风险告警
        if metrics['hallucination_risk'] > self.alert_thresholds['hallucination_rate']:
            alerts.append({
                'type': 'high_hallucination_risk',
                'severity': 'high',
                'value': metrics['hallucination_risk'],
                'threshold': self.alert_thresholds['hallucination_rate']
            })
        
        # 发送告警
        for alert in alerts:
            self.send_alert(alert)
    
    def send_alert(self, alert):
        """发送告警通知"""
        # 实现告警发送逻辑
        print(f"[ALERT] {alert['type']}: {alert['value']} > {alert['threshold']}")
    
    def analyze_slow_queries(self, time_range='1h', threshold=1000):
        """
        分析慢查询
        
        1. 收集超过阈值的请求
        2. 分析瓶颈
        3. 生成优化建议
        """
        # 查询慢请求追踪
        slow_traces = self.tracing.query_traces(
            min_duration=threshold,
            time_range=time_range
        )
        
        # 分析瓶颈分布
        bottleneck_stats = {
            'retrieval': 0,
            'generation': 0,
            'quality_check': 0,
            'other': 0
        }
        
        for trace in slow_traces:
            spans = trace.get('spans', [])
            
            for span in spans:
                if span['duration'] > threshold:
                    bottleneck_stats[span['name']] += 1
        
        # 生成优化建议
        suggestions = []
        
        if bottleneck_stats['retrieval'] > len(slow_traces) * 0.3:
            suggestions.append("检索阶段是主要瓶颈,考虑优化:")
            suggestions.append("- 增加检索缓存")
            suggestions.append("- 优化Chunk策略")
            suggestions.append("- 升级检索模型")
        
        if bottleneck_stats['generation'] > len(slow_traces) * 0.3:
            suggestions.append("生成阶段是主要瓶颈,考虑优化:")
            suggestions.append("- 优化Prompt长度")
            suggestions.append("- 使用更小的模型")
            suggestions.append("- 调整生成参数")
        
        return {
            'slow_query_count': len(slow_traces),
            'bottleneck_distribution': bottleneck_stats,
            'optimization_suggestions': suggestions
        }
    
    def analyze_error_patterns(self, time_range='24h'):
        """
        分析错误模式
        
        1. 统计错误类型
        2. 分析错误发生场景
        3. 识别系统性问题
        """
        # 查询错误追踪
        error_traces = self.tracing.query_traces(
            status='error',
            time_range=time_range
        )
        
        # 错误分类统计
        error_stats = {}
        error_contexts = {}
        
        for trace in error_traces:
            error_type = trace.get('error_type', 'unknown')
            
            if error_type not in error_stats:
                error_stats[error_type] = 0
                error_contexts[error_type] = []
            
            error_stats[error_type] += 1
            
            # 收集错误上下文
            context = {
                'timestamp': trace.get('timestamp'),
                'request_type': trace.get('request_type'),
                'user_id': trace.get('user_id'),
                'error_message': trace.get('error_message')
            }
            error_contexts[error_type].append(context)
        
        # 识别高频错误模式
        patterns = []
        
        for error_type, count in error_stats.items():
            if count > len(error_traces) * 0.1:  # 超过10%的错误
                patterns.append({
                    'error_type': error_type,
                    'frequency': count / len(error_traces),
                    'contexts': error_contexts[error_type][:5]  # 取前5个上下文
                })
        
        return {
            'total_errors': len(error_traces),
            'error_distribution': error_stats,
            'significant_patterns': patterns
        }
    
    def generate_performance_report(self, time_range='7d'):
        """
        生成性能报告
        
        1. 汇总关键指标
        2. 趋势分析
        3. 对比分析
        """
        # 收集指标数据
        metrics_data = self.metrics.query_range(time_range)
        
        # 计算关键统计
        report = {
            'time_range': time_range,
            'latency': {
                'p50': self.calculate_percentile(metrics_data['latency'], 50),
                'p95': self.calculate_percentile(metrics_data['latency'], 95),
                'p99': self.calculate_percentile(metrics_data['latency'], 99),
                'mean': self.calculate_mean(metrics_data['latency'])
            },
            'quality': {
                'mean': self.calculate_mean(metrics_data['quality']),
                'distribution': self.calculate_distribution(metrics_data['quality'])
            },
            'cost': {
                'total': self.calculate_sum(metrics_data['cost']),
                'mean': self.calculate_mean(metrics_data['cost']),
                'trend': self.calculate_trend(metrics_data['cost'])
            },
            'hallucination': {
                'mean_risk': self.calculate_mean(metrics_data['hallucination_risk']),
                'high_risk_rate': self.calculate_high_risk_rate(metrics_data['hallucination_risk'])
            },
            'throughput': {
                'total_requests': self.calculate_sum(metrics_data['request_count']),
                'success_rate': self.calculate_success_rate(metrics_data),
                'error_rate': self.calculate_error_rate(metrics_data)
            }
        }
        
        return report

# 使用示例
def demo_monitoring():
    """演示监控系统使用"""
    
    # 初始化监控系统
    monitoring = AIMonitoringSystem(
        metrics_backend=MetricsBackend(),
        tracing_backend=TracingBackend()
    )
    
    # 模拟AI系统
    class DemoAISystem:
        def retrieve(self):
            # 模拟检索
            time.sleep(0.1)
            return [{'id': 1, 'content': '文档内容'}]
        
        def generate(self, documents):
            # 模拟生成
            time.sleep(0.5)
            return {
                'text': '这是生成的回答',
                'token_count': 100
            }
        
        def evaluate_quality(self, response):
            # 模拟质量评估
            return {'overall': 0.85}
        
        def detect_hallucination(self, response):
            # 模拟幻觉检测
            return 0.1
        
        def calculate_cost(self, documents, response):
            # 模拟成本计算
            return 0.03
    
    system = DemoAISystem()
    
    # 监控请求
    result = monitoring.monitor_request('request_123', system)
    
    print(f"请求状态:{result['status']}")
    print(f"响应时间:{result['metrics']['latency']:.2f}ms")
    print(f"回答质量:{result['metrics']['quality']:.2f}")
    print(f"请求成本:${result['metrics']['cost']:.4f}")
    
    # 分析性能
    performance_report = monitoring.generate_performance_report('24h')
    print(f"\n性能指标:")
    print(f"  99分位延迟:{performance_report['latency']['p99']:.2f}ms")
    print(f"  平均质量:{performance_report['quality']['mean']:.2f}")
    print(f"  成功率:{performance_report['throughput']['success_rate']:.2%}")
    
    # 分析慢查询
    slow_analysis = monitoring.analyze_slow_queries('1h', threshold=1000)
    print(f"\n慢查询分析:")
    print(f"  慢查询数量:{slow_analysis['slow_query_count']}")
    for suggestion in slow_analysis['optimization_suggestions']:
        print(f"  - {suggestion}")

# 指标后端模拟
class MetricsBackend:
    def timing(self, name, value):
        print(f"[METRIC] timing: {name}={value}")
    
    def gauge(self, name, value):
        print(f"[METRIC] gauge: {name}={value}")
    
    def increment(self, name):
        print(f"[METRIC] increment: {name}")
    
    def histogram(self, name, value):
        print(f"[METRIC] histogram: {name}={value}")
    
    def query_range(self, time_range):
        # 模拟查询历史数据
        return {
            'latency': [100, 200, 150, 300],
            'quality': [0.8, 0.9, 0.7, 0.85],
            'cost': [0.02, 0.03, 0.025, 0.035],
            'hallucination_risk': [0.05, 0.1, 0.08, 0.12],
            'request_count': [100, 120, 110, 130],
            'error_count': [2, 3, 1, 4]
        }

# 追踪后端模拟
class TracingBackend:
    def create_span(self, name, trace_id):
        return MockSpan(name, trace_id)
    
    def finish_span(self, span, status='success'):
        print(f"[TRACE] span finished: {span.name}={status}")
    
    def query_traces(self, **kwargs):
        # 模拟查询追踪
        return []

class MockSpan:
    def __init__(self, name, trace_id):
        self.name = name
        self.trace_id = trace_id
        self.start_time = time.time()
        self.attributes = {}
    
    def add_attribute(self, key, value):
        self.attributes[key] = value
    
    def __enter__(self):
        return self
    
    def __exit__(self, *args):
        pass

追问

  • 为什么最终答案正确率不足以定位问题?(多阶段链路)
  • 哪些环节最值得单独埋点?(关键路径)
  • 回放失败案例为什么重要?(持续改进)

原理展开

AI 系统往往是多阶段链路,只有最终结果日志远远不够。Tracing 能把每一步输入输出、耗时、错误信息串起来,这样出现问题时才能快速定位瓶颈和失败原因。

线上指标和离线评测是互补关系:前者看真实流量表现,后者看可控对比。缺一边,优化都会变得盲目。

# 高级Tracing:支持回放
class ReplayTracingBackend(TracingBackend):
    def __init__(self):
        super().__init__()
        self.trace_storage = {}
        self.replay_cache = {}
    
    def save_trace(self, trace):
        """保存追踪数据"""
        trace_id = trace['trace_id']
        self.trace_storage[trace_id] = trace
    
    def replay_trace(self, trace_id, system_modifier=None):
        """
        回放追踪
        
        1. 加载原始追踪
        2. 可选修改系统配置
        3. 重新执行并对比
        """
        trace = self.trace_storage.get(trace_id)
        if not trace:
            raise ValueError(f"追踪{trace_id}不存在")
        
        # 创建回放记录
        replay_id = f"replay_{trace_id}_{int(time.time())}"
        
        # 重新执行
        if system_modifier:
            # 使用修改后的系统
            replay_system = system_modifier(trace['original_system'])
        else:
            # 使用原始系统
            replay_system = trace['original_system']
        
        # 重新执行并记录
        replay_trace = self.execute_and_trace(
            replay_system, 
            trace['original_request'],
            replay_id
        )
        
        # 对比结果
        comparison = self.compare_traces(trace, replay_trace)
        
        return {
            'replay_id': replay_id,
            'original_trace': trace,
            'replay_trace': replay_trace,
            'comparison': comparison
        }
    
    def compare_traces(self, original, replay):
        """
        对比原始追踪和回放追踪
        
        1. 对比输出
        2. 对比性能指标
        3. 对比错误
        """
        comparison = {
            'output_changed': original['response'] != replay['response'],
            'latency_change': replay['latency'] - original['latency'],
            'quality_change': replay['quality'] - original['quality'],
            'cost_change': replay['cost'] - original['cost'],
            'error_occurred': replay['status'] == 'error'
        }
        
        # 差异分析
        if comparison['output_changed']:
            comparison['diff_analysis'] = self.analyze_output_diff(
                original['response'], replay['response']
            )
        
        return comparison
    
    def find_regression(self, time_range='24h'):
        """
        查找性能退化
        
        1. 对比历史性能指标
        2. 识别退化模式
        3. 关联代码变更
        """
        # 获取历史性能数据
        historical_performance = self.get_historical_performance(time_range)
        
        # 对比当前性能
        current_performance = self.get_current_performance()
        
        # 识别退化
        regressions = []
        
        for metric_name in ['latency', 'quality', 'cost']:
            hist_values = historical_performance.get(metric_name, [])
            curr_values = current_performance.get(metric_name, [])
            
            if hist_values and curr_values:
                hist_mean = sum(hist_values) / len(hist_values)
                curr_mean = sum(curr_values) / len(curr_values)
                
                # 计算变化率
                change_rate = (curr_mean - hist_mean) / hist_mean
                
                # 如果变化超过阈值,认为是退化
                if abs(change_rate) > 0.2:  # 20%变化
                    regressions.append({
                        'metric': metric_name,
                        'historical_mean': hist_mean,
                        'current_mean': curr_mean,
                        'change_rate': change_rate,
                        'severity': 'high' if abs(change_rate) > 0.5 else 'medium'
                    })
        
        return regressions

易错点

  • 只有应用层日志,没有链路级观测
  • 只盯延迟和成本,不看失败样例
  • 追踪数据过多导致性能问题
  • 回放机制设计不当导致结果不可靠

记忆技巧

记住监控三要素:

  1. 指标 = “量化性能”
  2. 追踪 = “定位问题”
  3. 回放 = “验证改进”

典型应用场景:

  • 性能优化:慢查询分析
  • 故障排查:错误追踪
  • 质量控制:质量监控
  • 成本控制:成本优化