🧠AI Eval 与观测

AI应用质量保障

难度:⭐⭐ | 高频指数:🔥🔥🔥 | 应用岗相关度:★★★

面试回答

常见问法

  • “AI 应用怎么做质量保障?和传统软件测试有什么区别?”
  • “你们的 AI 系统上线前有什么 checklist?”
  • “线上出了问题怎么快速定位?“

回答

AI 应用和传统软件最大的区别是输出非确定性——同样的输入可能产生不同的输出,而且”正确”的标准往往是模糊的。所以质量保障不能只靠单元测试,需要一套从离线评测到线上监控的完整体系。

# AI应用质量保障体系
class AIQualityAssurance:
    """
    AI应用质量保障三层体系:
    1. 离线评测(上线前)
    2. 灰度发布(上线中)
    3. 线上监控(上线后)
    """
    
    def __init__(self, config):
        self.offline_evaluator = OfflineEvaluator(config['eval'])
        self.canary_deployer = CanaryDeployer(config['deploy'])
        self.monitor = OnlineMonitor(config['monitor'])
        self.tracer = DistributedTracer(config['tracing'])
    
    # ========== 第一层:离线评测 ==========
    def run_offline_evaluation(self, new_version, baseline_version=None):
        """
        上线前离线评测
        
        评测维度:
        - 准确性:回答是否正确
        - 相关性:回答是否切题
        - 完整性:回答是否完整
        - 安全性:是否有有害内容
        - 延迟:响应时间是否可接受
        """
        eval_results = {}
        
        # 1. 准确性评测
        eval_results['accuracy'] = self.offline_evaluator.evaluate_accuracy(
            new_version, 
            dataset='golden_dataset',
            metrics=['exact_match', 'semantic_similarity', 'llm_judge']
        )
        
        # 2. 相关性评测
        eval_results['relevance'] = self.offline_evaluator.evaluate_relevance(
            new_version,
            dataset='relevance_dataset'
        )
        
        # 3. 安全性评测
        eval_results['safety'] = self.offline_evaluator.evaluate_safety(
            new_version,
            dataset='adversarial_dataset',
            checks=['toxicity', 'bias', 'pii_leak', 'jailbreak']
        )
        
        # 4. 延迟评测
        eval_results['latency'] = self.offline_evaluator.evaluate_latency(
            new_version,
            percentiles=[50, 90, 95, 99]
        )
        
        # 5. 回归对比(如果有基线版本)
        if baseline_version:
            eval_results['regression'] = self.compare_with_baseline(
                new_version, baseline_version
            )
        
        # 6. 判断是否通过
        eval_results['pass'] = self.check_pass_criteria(eval_results)
        
        return eval_results
    
    def check_pass_criteria(self, results):
        """检查是否满足上线标准"""
        criteria = {
            'accuracy_threshold': 0.85,
            'safety_threshold': 0.99,
            'latency_p95_ms': 3000,
            'regression_tolerance': 0.02  # 允许2%的回归
        }
        
        passed = True
        failures = []
        
        if results['accuracy']['overall'] < criteria['accuracy_threshold']:
            passed = False
            failures.append(f"准确率 {results['accuracy']['overall']:.2%} < {criteria['accuracy_threshold']:.2%}")
        
        if results['safety']['overall'] < criteria['safety_threshold']:
            passed = False
            failures.append(f"安全性 {results['safety']['overall']:.2%} < {criteria['safety_threshold']:.2%}")
        
        if results['latency']['p95'] > criteria['latency_p95_ms']:
            passed = False
            failures.append(f"P95延迟 {results['latency']['p95']}ms > {criteria['latency_p95_ms']}ms")
        
        return {'passed': passed, 'failures': failures}


# 自动评测Pipeline
class AutoEvalPipeline:
    """
    自动评测三板斧:
    1. Golden Dataset(标准答案对比)
    2. LLM Judge(用大模型评分)
    3. 人工抽检(最终把关)
    """
    
    def __init__(self, golden_dataset, judge_model, human_reviewers):
        self.golden_dataset = golden_dataset
        self.judge_model = judge_model
        self.human_reviewers = human_reviewers
    
    def full_evaluation(self, system_under_test):
        """完整评测流程"""
        results = {}
        
        # 第一步:Golden Dataset 自动评测
        results['golden'] = self.evaluate_with_golden(system_under_test)
        
        # 第二步:LLM Judge 评分
        results['llm_judge'] = self.evaluate_with_llm_judge(system_under_test)
        
        # 第三步:人工抽检(抽取低分样本)
        low_score_samples = self.get_low_score_samples(results)
        results['human_review'] = self.human_review(low_score_samples)
        
        # 综合评分
        results['final_score'] = self.compute_final_score(results)
        
        return results
    
    def evaluate_with_golden(self, system):
        """用标准答案评测"""
        scores = []
        
        for sample in self.golden_dataset:
            output = system.generate(sample['input'])
            
            # 多维度评分
            score = {
                'exact_match': output.strip() == sample['expected'].strip(),
                'contains_key_info': all(
                    key in output for key in sample.get('key_points', [])
                ),
                'semantic_similarity': self.compute_similarity(
                    output, sample['expected']
                )
            }
            scores.append(score)
        
        return {
            'exact_match_rate': sum(s['exact_match'] for s in scores) / len(scores),
            'key_info_rate': sum(s['contains_key_info'] for s in scores) / len(scores),
            'avg_similarity': sum(s['semantic_similarity'] for s in scores) / len(scores)
        }
    
    def evaluate_with_llm_judge(self, system):
        """用LLM作为评判"""
        judge_prompt_template = """
请评估以下AI系统的回答质量。

用户问题:{question}
系统回答:{answer}
参考答案:{reference}

请从以下维度评分(1-5分):
1. 准确性:信息是否正确
2. 完整性:是否回答了所有要点
3. 相关性:是否切题
4. 清晰度:表达是否清晰

输出JSON格式:
{{"accuracy": X, "completeness": X, "relevance": X, "clarity": X, "overall": X, "reason": "评分理由"}}
"""
        scores = []
        
        for sample in self.golden_dataset[:50]:  # LLM Judge成本高,抽样评测
            output = system.generate(sample['input'])
            
            judge_prompt = judge_prompt_template.format(
                question=sample['input'],
                answer=output,
                reference=sample['expected']
            )
            
            judge_result = self.judge_model.generate(judge_prompt)
            score = self.parse_judge_score(judge_result)
            scores.append(score)
        
        return {
            'avg_accuracy': sum(s['accuracy'] for s in scores) / len(scores),
            'avg_completeness': sum(s['completeness'] for s in scores) / len(scores),
            'avg_relevance': sum(s['relevance'] for s in scores) / len(scores),
            'avg_overall': sum(s['overall'] for s in scores) / len(scores)
        }

追问

上线前的 checklist 是什么?

PRE_LAUNCH_CHECKLIST = {
    "功能验证": [
        "离线评测通过(准确率 > 阈值)",
        "安全评测通过(无有害输出)",
        "边界case覆盖(空输入、超长输入、恶意输入)",
        "格式输出稳定性验证",
    ],
    "性能验证": [
        "P95延迟 < 3秒",
        "并发压测通过(目标QPS)",
        "Token消耗在预算内",
        "缓存命中率达标",
    ],
    "安全验证": [
        "Prompt注入防护测试",
        "PII泄露检测",
        "权限控制验证",
        "内容安全过滤生效",
    ],
    "可观测性": [
        "Tracing链路完整",
        "关键指标告警配置",
        "日志级别合理",
        "错误码定义完整",
    ],
    "回滚准备": [
        "回滚方案文档化",
        "回滚操作验证过",
        "灰度比例可调",
        "紧急开关可用",
    ]
}

原理展开

线上监控指标

class OnlineMonitor:
    """线上监控体系"""
    
    def __init__(self, config):
        self.metrics_store = MetricsStore(config['metrics'])
        self.alert_manager = AlertManager(config['alerts'])
    
    def collect_metrics(self, request, response, metadata):
        """
        收集线上指标
        
        核心指标:
        1. 成功率:请求是否正常返回
        2. 延迟:端到端响应时间
        3. 质量分:自动评分
        4. 用户反馈:点赞/点踩
        5. Token消耗:成本监控
        """
        metrics = {
            # 可用性指标
            'success': response.status == 'success',
            'error_type': response.error_type if response.status != 'success' else None,
            
            # 性能指标
            'latency_ms': metadata['end_time'] - metadata['start_time'],
            'ttft_ms': metadata.get('time_to_first_token', 0),  # 首token延迟
            
            # 质量指标
            'auto_quality_score': self.auto_score(response),
            'retrieval_score': metadata.get('retrieval_score', 0),
            
            # 成本指标
            'input_tokens': metadata.get('input_tokens', 0),
            'output_tokens': metadata.get('output_tokens', 0),
            'total_cost': self.calculate_cost(metadata),
            
            # 用户反馈(异步收集)
            'request_id': metadata['request_id'],
        }
        
        self.metrics_store.record(metrics)
        
        # 异常检测
        self.check_anomaly(metrics)
        
        return metrics
    
    def check_anomaly(self, metrics):
        """异常检测与告警"""
        # 延迟异常
        if metrics['latency_ms'] > 5000:
            self.alert_manager.warn(
                "高延迟告警",
                f"请求延迟 {metrics['latency_ms']}ms 超过阈值"
            )
        
        # 错误率异常(滑动窗口)
        recent_error_rate = self.metrics_store.get_error_rate(window='5m')
        if recent_error_rate > 0.05:  # 5%错误率
            self.alert_manager.critical(
                "错误率告警",
                f"最近5分钟错误率 {recent_error_rate:.2%}"
            )
        
        # 质量下降
        recent_quality = self.metrics_store.get_avg_quality(window='1h')
        baseline_quality = self.metrics_store.get_avg_quality(window='24h')
        if recent_quality < baseline_quality * 0.9:  # 质量下降10%
            self.alert_manager.warn(
                "质量下降告警",
                f"最近1小时质量分 {recent_quality:.2f},低于基线 {baseline_quality:.2f}"
            )
    
    def get_dashboard_data(self, time_range='24h'):
        """获取监控面板数据"""
        return {
            'success_rate': self.metrics_store.get_success_rate(time_range),
            'avg_latency': self.metrics_store.get_avg_latency(time_range),
            'p95_latency': self.metrics_store.get_percentile_latency(95, time_range),
            'avg_quality': self.metrics_store.get_avg_quality(time_range),
            'total_requests': self.metrics_store.get_request_count(time_range),
            'total_cost': self.metrics_store.get_total_cost(time_range),
            'user_satisfaction': self.metrics_store.get_satisfaction_rate(time_range),
            'error_distribution': self.metrics_store.get_error_distribution(time_range),
        }

灰度发布策略

class CanaryDeployer:
    """灰度发布"""
    
    def __init__(self, config):
        self.traffic_router = TrafficRouter(config)
        self.monitor = OnlineMonitor(config)
    
    def canary_deploy(self, new_version, stages=None):
        """
        分阶段灰度发布
        
        默认阶段:1% → 5% → 20% → 50% → 100%
        每个阶段观察指标,异常自动回滚
        """
        if stages is None:
            stages = [
                {'traffic': 0.01, 'duration_min': 30, 'name': '探针'},
                {'traffic': 0.05, 'duration_min': 60, 'name': '小流量'},
                {'traffic': 0.20, 'duration_min': 120, 'name': '中流量'},
                {'traffic': 0.50, 'duration_min': 180, 'name': '大流量'},
                {'traffic': 1.00, 'duration_min': 0, 'name': '全量'},
            ]
        
        for stage in stages:
            print(f"灰度阶段:{stage['name']},流量比例:{stage['traffic']:.0%}")
            
            # 切换流量
            self.traffic_router.set_canary_traffic(
                new_version, stage['traffic']
            )
            
            # 观察期
            if stage['duration_min'] > 0:
                observation = self.observe(
                    duration_min=stage['duration_min'],
                    new_version=new_version
                )
                
                # 检查是否需要回滚
                if observation['should_rollback']:
                    self.rollback(new_version, observation['reason'])
                    return {
                        'status': 'rolled_back',
                        'stage': stage['name'],
                        'reason': observation['reason']
                    }
        
        return {'status': 'fully_deployed', 'version': new_version}
    
    def observe(self, duration_min, new_version):
        """观察期监控"""
        import time
        
        start = time.time()
        end = start + duration_min * 60
        
        while time.time() < end:
            # 每分钟检查一次
            time.sleep(60)
            
            # 获取新版本指标
            new_metrics = self.monitor.get_version_metrics(new_version, '5m')
            old_metrics = self.monitor.get_version_metrics('stable', '5m')
            
            # 对比判断
            if new_metrics['error_rate'] > old_metrics['error_rate'] * 2:
                return {
                    'should_rollback': True,
                    'reason': f"错误率异常:新版本 {new_metrics['error_rate']:.2%} vs 旧版本 {old_metrics['error_rate']:.2%}"
                }
            
            if new_metrics['avg_latency'] > old_metrics['avg_latency'] * 1.5:
                return {
                    'should_rollback': True,
                    'reason': f"延迟异常:新版本 {new_metrics['avg_latency']}ms vs 旧版本 {old_metrics['avg_latency']}ms"
                }
        
        return {'should_rollback': False}
    
    def rollback(self, version, reason):
        """回滚"""
        print(f"触发回滚!版本:{version},原因:{reason}")
        self.traffic_router.set_canary_traffic(version, 0)
        self.alert_manager.critical("灰度回滚", reason)

问题定位:Tracing + 日志 + Replay

class ProblemDiagnostics:
    """问题快速定位"""
    
    def __init__(self, tracer, log_store):
        self.tracer = tracer
        self.log_store = log_store
    
    def diagnose_request(self, request_id):
        """
        根据request_id定位问题
        
        1. 查看完整trace链路
        2. 定位耗时最长的环节
        3. 查看错误日志
        4. 支持replay重放
        """
        # 1. 获取完整trace
        trace = self.tracer.get_trace(request_id)
        
        # 2. 分析各环节耗时
        spans = trace['spans']
        timeline = []
        for span in spans:
            timeline.append({
                'name': span['name'],
                'duration_ms': span['duration'],
                'status': span['status'],
                'attributes': span['attributes']
            })
        
        # 3. 找到瓶颈
        bottleneck = max(timeline, key=lambda x: x['duration_ms'])
        
        # 4. 获取相关日志
        logs = self.log_store.query(
            filter={'request_id': request_id},
            level='WARNING'
        )
        
        return {
            'trace_id': trace['trace_id'],
            'total_duration': trace['total_duration'],
            'timeline': timeline,
            'bottleneck': bottleneck,
            'errors': [s for s in timeline if s['status'] == 'error'],
            'logs': logs,
            'replay_available': True
        }
    
    def replay_request(self, request_id):
        """
        重放请求(用于复现问题)
        
        从trace中提取原始输入,重新执行
        """
        trace = self.tracer.get_trace(request_id)
        original_input = trace['spans'][0]['attributes'].get('input')
        
        if original_input:
            # 重新执行,对比结果
            new_output = self.system.generate(original_input)
            original_output = trace['spans'][-1]['attributes'].get('output')
            
            return {
                'original_input': original_input,
                'original_output': original_output,
                'replay_output': new_output,
                'outputs_match': new_output == original_output,
                'note': '非确定性输出可能不完全一致'
            }
        
        return {'error': '无法获取原始输入'}

易错点

  • 用传统软件测试思维做 AI 测试(期望确定性输出)
  • 只做离线评测不做线上监控(上线后才发现问题)
  • 灰度发布没有自动回滚机制(出问题反应太慢)
  • Tracing 链路不完整(无法定位是哪个环节出问题)
  • 忽略用户反馈指标(自动评分和用户感受可能不一致)

记忆技巧

记住 AI 质量保障的”三层防线 + 五个指标”:

三层防线:

  1. 离线评测(上线前):golden dataset + LLM judge + 人工抽检
  2. 灰度发布(上线中):分阶段放量 + 自动回滚
  3. 线上监控(上线后):实时指标 + 异常告警 + 问题定位

五个核心指标:

  1. 成功率(可用性)
  2. 延迟(性能)
  3. 质量分(效果)
  4. 用户满意度(体验)
  5. 成本(Token消耗)

面试速答版

  • AI 应用和传统软件最大区别是输出非确定性,不能只靠单元测试
  • 质量保障三层:离线评测(上线前)→ 灰度发布(上线中)→ 线上监控(上线后)
  • 自动评测三板斧:Golden Dataset + LLM Judge + 人工抽检
  • 核心监控指标:成功率、延迟、质量分、用户满意度、Token 成本
  • 灰度策略:1%→5%→20%→50%→100%,每阶段观察指标,异常自动回滚
  • 问题定位靠 Tracing:完整链路追踪 + 日志 + 请求重放
  • 一句话最佳实践:上线前有评测集把关,上线中有灰度兜底,上线后有监控告警
Related · Eval 与观测