🧠AI Eval 与观测
AI应用质量保障
难度:⭐⭐ | 高频指数:🔥🔥🔥 | 应用岗相关度:★★★
面试回答
常见问法
- “AI 应用怎么做质量保障?和传统软件测试有什么区别?”
- “你们的 AI 系统上线前有什么 checklist?”
- “线上出了问题怎么快速定位?“
回答
AI 应用和传统软件最大的区别是输出非确定性——同样的输入可能产生不同的输出,而且”正确”的标准往往是模糊的。所以质量保障不能只靠单元测试,需要一套从离线评测到线上监控的完整体系。
# AI应用质量保障体系
class AIQualityAssurance:
"""
AI应用质量保障三层体系:
1. 离线评测(上线前)
2. 灰度发布(上线中)
3. 线上监控(上线后)
"""
def __init__(self, config):
self.offline_evaluator = OfflineEvaluator(config['eval'])
self.canary_deployer = CanaryDeployer(config['deploy'])
self.monitor = OnlineMonitor(config['monitor'])
self.tracer = DistributedTracer(config['tracing'])
# ========== 第一层:离线评测 ==========
def run_offline_evaluation(self, new_version, baseline_version=None):
"""
上线前离线评测
评测维度:
- 准确性:回答是否正确
- 相关性:回答是否切题
- 完整性:回答是否完整
- 安全性:是否有有害内容
- 延迟:响应时间是否可接受
"""
eval_results = {}
# 1. 准确性评测
eval_results['accuracy'] = self.offline_evaluator.evaluate_accuracy(
new_version,
dataset='golden_dataset',
metrics=['exact_match', 'semantic_similarity', 'llm_judge']
)
# 2. 相关性评测
eval_results['relevance'] = self.offline_evaluator.evaluate_relevance(
new_version,
dataset='relevance_dataset'
)
# 3. 安全性评测
eval_results['safety'] = self.offline_evaluator.evaluate_safety(
new_version,
dataset='adversarial_dataset',
checks=['toxicity', 'bias', 'pii_leak', 'jailbreak']
)
# 4. 延迟评测
eval_results['latency'] = self.offline_evaluator.evaluate_latency(
new_version,
percentiles=[50, 90, 95, 99]
)
# 5. 回归对比(如果有基线版本)
if baseline_version:
eval_results['regression'] = self.compare_with_baseline(
new_version, baseline_version
)
# 6. 判断是否通过
eval_results['pass'] = self.check_pass_criteria(eval_results)
return eval_results
def check_pass_criteria(self, results):
"""检查是否满足上线标准"""
criteria = {
'accuracy_threshold': 0.85,
'safety_threshold': 0.99,
'latency_p95_ms': 3000,
'regression_tolerance': 0.02 # 允许2%的回归
}
passed = True
failures = []
if results['accuracy']['overall'] < criteria['accuracy_threshold']:
passed = False
failures.append(f"准确率 {results['accuracy']['overall']:.2%} < {criteria['accuracy_threshold']:.2%}")
if results['safety']['overall'] < criteria['safety_threshold']:
passed = False
failures.append(f"安全性 {results['safety']['overall']:.2%} < {criteria['safety_threshold']:.2%}")
if results['latency']['p95'] > criteria['latency_p95_ms']:
passed = False
failures.append(f"P95延迟 {results['latency']['p95']}ms > {criteria['latency_p95_ms']}ms")
return {'passed': passed, 'failures': failures}
# 自动评测Pipeline
class AutoEvalPipeline:
"""
自动评测三板斧:
1. Golden Dataset(标准答案对比)
2. LLM Judge(用大模型评分)
3. 人工抽检(最终把关)
"""
def __init__(self, golden_dataset, judge_model, human_reviewers):
self.golden_dataset = golden_dataset
self.judge_model = judge_model
self.human_reviewers = human_reviewers
def full_evaluation(self, system_under_test):
"""完整评测流程"""
results = {}
# 第一步:Golden Dataset 自动评测
results['golden'] = self.evaluate_with_golden(system_under_test)
# 第二步:LLM Judge 评分
results['llm_judge'] = self.evaluate_with_llm_judge(system_under_test)
# 第三步:人工抽检(抽取低分样本)
low_score_samples = self.get_low_score_samples(results)
results['human_review'] = self.human_review(low_score_samples)
# 综合评分
results['final_score'] = self.compute_final_score(results)
return results
def evaluate_with_golden(self, system):
"""用标准答案评测"""
scores = []
for sample in self.golden_dataset:
output = system.generate(sample['input'])
# 多维度评分
score = {
'exact_match': output.strip() == sample['expected'].strip(),
'contains_key_info': all(
key in output for key in sample.get('key_points', [])
),
'semantic_similarity': self.compute_similarity(
output, sample['expected']
)
}
scores.append(score)
return {
'exact_match_rate': sum(s['exact_match'] for s in scores) / len(scores),
'key_info_rate': sum(s['contains_key_info'] for s in scores) / len(scores),
'avg_similarity': sum(s['semantic_similarity'] for s in scores) / len(scores)
}
def evaluate_with_llm_judge(self, system):
"""用LLM作为评判"""
judge_prompt_template = """
请评估以下AI系统的回答质量。
用户问题:{question}
系统回答:{answer}
参考答案:{reference}
请从以下维度评分(1-5分):
1. 准确性:信息是否正确
2. 完整性:是否回答了所有要点
3. 相关性:是否切题
4. 清晰度:表达是否清晰
输出JSON格式:
{{"accuracy": X, "completeness": X, "relevance": X, "clarity": X, "overall": X, "reason": "评分理由"}}
"""
scores = []
for sample in self.golden_dataset[:50]: # LLM Judge成本高,抽样评测
output = system.generate(sample['input'])
judge_prompt = judge_prompt_template.format(
question=sample['input'],
answer=output,
reference=sample['expected']
)
judge_result = self.judge_model.generate(judge_prompt)
score = self.parse_judge_score(judge_result)
scores.append(score)
return {
'avg_accuracy': sum(s['accuracy'] for s in scores) / len(scores),
'avg_completeness': sum(s['completeness'] for s in scores) / len(scores),
'avg_relevance': sum(s['relevance'] for s in scores) / len(scores),
'avg_overall': sum(s['overall'] for s in scores) / len(scores)
}
追问
上线前的 checklist 是什么?
PRE_LAUNCH_CHECKLIST = {
"功能验证": [
"离线评测通过(准确率 > 阈值)",
"安全评测通过(无有害输出)",
"边界case覆盖(空输入、超长输入、恶意输入)",
"格式输出稳定性验证",
],
"性能验证": [
"P95延迟 < 3秒",
"并发压测通过(目标QPS)",
"Token消耗在预算内",
"缓存命中率达标",
],
"安全验证": [
"Prompt注入防护测试",
"PII泄露检测",
"权限控制验证",
"内容安全过滤生效",
],
"可观测性": [
"Tracing链路完整",
"关键指标告警配置",
"日志级别合理",
"错误码定义完整",
],
"回滚准备": [
"回滚方案文档化",
"回滚操作验证过",
"灰度比例可调",
"紧急开关可用",
]
}
原理展开
线上监控指标
class OnlineMonitor:
"""线上监控体系"""
def __init__(self, config):
self.metrics_store = MetricsStore(config['metrics'])
self.alert_manager = AlertManager(config['alerts'])
def collect_metrics(self, request, response, metadata):
"""
收集线上指标
核心指标:
1. 成功率:请求是否正常返回
2. 延迟:端到端响应时间
3. 质量分:自动评分
4. 用户反馈:点赞/点踩
5. Token消耗:成本监控
"""
metrics = {
# 可用性指标
'success': response.status == 'success',
'error_type': response.error_type if response.status != 'success' else None,
# 性能指标
'latency_ms': metadata['end_time'] - metadata['start_time'],
'ttft_ms': metadata.get('time_to_first_token', 0), # 首token延迟
# 质量指标
'auto_quality_score': self.auto_score(response),
'retrieval_score': metadata.get('retrieval_score', 0),
# 成本指标
'input_tokens': metadata.get('input_tokens', 0),
'output_tokens': metadata.get('output_tokens', 0),
'total_cost': self.calculate_cost(metadata),
# 用户反馈(异步收集)
'request_id': metadata['request_id'],
}
self.metrics_store.record(metrics)
# 异常检测
self.check_anomaly(metrics)
return metrics
def check_anomaly(self, metrics):
"""异常检测与告警"""
# 延迟异常
if metrics['latency_ms'] > 5000:
self.alert_manager.warn(
"高延迟告警",
f"请求延迟 {metrics['latency_ms']}ms 超过阈值"
)
# 错误率异常(滑动窗口)
recent_error_rate = self.metrics_store.get_error_rate(window='5m')
if recent_error_rate > 0.05: # 5%错误率
self.alert_manager.critical(
"错误率告警",
f"最近5分钟错误率 {recent_error_rate:.2%}"
)
# 质量下降
recent_quality = self.metrics_store.get_avg_quality(window='1h')
baseline_quality = self.metrics_store.get_avg_quality(window='24h')
if recent_quality < baseline_quality * 0.9: # 质量下降10%
self.alert_manager.warn(
"质量下降告警",
f"最近1小时质量分 {recent_quality:.2f},低于基线 {baseline_quality:.2f}"
)
def get_dashboard_data(self, time_range='24h'):
"""获取监控面板数据"""
return {
'success_rate': self.metrics_store.get_success_rate(time_range),
'avg_latency': self.metrics_store.get_avg_latency(time_range),
'p95_latency': self.metrics_store.get_percentile_latency(95, time_range),
'avg_quality': self.metrics_store.get_avg_quality(time_range),
'total_requests': self.metrics_store.get_request_count(time_range),
'total_cost': self.metrics_store.get_total_cost(time_range),
'user_satisfaction': self.metrics_store.get_satisfaction_rate(time_range),
'error_distribution': self.metrics_store.get_error_distribution(time_range),
}
灰度发布策略
class CanaryDeployer:
"""灰度发布"""
def __init__(self, config):
self.traffic_router = TrafficRouter(config)
self.monitor = OnlineMonitor(config)
def canary_deploy(self, new_version, stages=None):
"""
分阶段灰度发布
默认阶段:1% → 5% → 20% → 50% → 100%
每个阶段观察指标,异常自动回滚
"""
if stages is None:
stages = [
{'traffic': 0.01, 'duration_min': 30, 'name': '探针'},
{'traffic': 0.05, 'duration_min': 60, 'name': '小流量'},
{'traffic': 0.20, 'duration_min': 120, 'name': '中流量'},
{'traffic': 0.50, 'duration_min': 180, 'name': '大流量'},
{'traffic': 1.00, 'duration_min': 0, 'name': '全量'},
]
for stage in stages:
print(f"灰度阶段:{stage['name']},流量比例:{stage['traffic']:.0%}")
# 切换流量
self.traffic_router.set_canary_traffic(
new_version, stage['traffic']
)
# 观察期
if stage['duration_min'] > 0:
observation = self.observe(
duration_min=stage['duration_min'],
new_version=new_version
)
# 检查是否需要回滚
if observation['should_rollback']:
self.rollback(new_version, observation['reason'])
return {
'status': 'rolled_back',
'stage': stage['name'],
'reason': observation['reason']
}
return {'status': 'fully_deployed', 'version': new_version}
def observe(self, duration_min, new_version):
"""观察期监控"""
import time
start = time.time()
end = start + duration_min * 60
while time.time() < end:
# 每分钟检查一次
time.sleep(60)
# 获取新版本指标
new_metrics = self.monitor.get_version_metrics(new_version, '5m')
old_metrics = self.monitor.get_version_metrics('stable', '5m')
# 对比判断
if new_metrics['error_rate'] > old_metrics['error_rate'] * 2:
return {
'should_rollback': True,
'reason': f"错误率异常:新版本 {new_metrics['error_rate']:.2%} vs 旧版本 {old_metrics['error_rate']:.2%}"
}
if new_metrics['avg_latency'] > old_metrics['avg_latency'] * 1.5:
return {
'should_rollback': True,
'reason': f"延迟异常:新版本 {new_metrics['avg_latency']}ms vs 旧版本 {old_metrics['avg_latency']}ms"
}
return {'should_rollback': False}
def rollback(self, version, reason):
"""回滚"""
print(f"触发回滚!版本:{version},原因:{reason}")
self.traffic_router.set_canary_traffic(version, 0)
self.alert_manager.critical("灰度回滚", reason)
问题定位:Tracing + 日志 + Replay
class ProblemDiagnostics:
"""问题快速定位"""
def __init__(self, tracer, log_store):
self.tracer = tracer
self.log_store = log_store
def diagnose_request(self, request_id):
"""
根据request_id定位问题
1. 查看完整trace链路
2. 定位耗时最长的环节
3. 查看错误日志
4. 支持replay重放
"""
# 1. 获取完整trace
trace = self.tracer.get_trace(request_id)
# 2. 分析各环节耗时
spans = trace['spans']
timeline = []
for span in spans:
timeline.append({
'name': span['name'],
'duration_ms': span['duration'],
'status': span['status'],
'attributes': span['attributes']
})
# 3. 找到瓶颈
bottleneck = max(timeline, key=lambda x: x['duration_ms'])
# 4. 获取相关日志
logs = self.log_store.query(
filter={'request_id': request_id},
level='WARNING'
)
return {
'trace_id': trace['trace_id'],
'total_duration': trace['total_duration'],
'timeline': timeline,
'bottleneck': bottleneck,
'errors': [s for s in timeline if s['status'] == 'error'],
'logs': logs,
'replay_available': True
}
def replay_request(self, request_id):
"""
重放请求(用于复现问题)
从trace中提取原始输入,重新执行
"""
trace = self.tracer.get_trace(request_id)
original_input = trace['spans'][0]['attributes'].get('input')
if original_input:
# 重新执行,对比结果
new_output = self.system.generate(original_input)
original_output = trace['spans'][-1]['attributes'].get('output')
return {
'original_input': original_input,
'original_output': original_output,
'replay_output': new_output,
'outputs_match': new_output == original_output,
'note': '非确定性输出可能不完全一致'
}
return {'error': '无法获取原始输入'}
易错点
- 用传统软件测试思维做 AI 测试(期望确定性输出)
- 只做离线评测不做线上监控(上线后才发现问题)
- 灰度发布没有自动回滚机制(出问题反应太慢)
- Tracing 链路不完整(无法定位是哪个环节出问题)
- 忽略用户反馈指标(自动评分和用户感受可能不一致)
记忆技巧
记住 AI 质量保障的”三层防线 + 五个指标”:
三层防线:
- 离线评测(上线前):golden dataset + LLM judge + 人工抽检
- 灰度发布(上线中):分阶段放量 + 自动回滚
- 线上监控(上线后):实时指标 + 异常告警 + 问题定位
五个核心指标:
- 成功率(可用性)
- 延迟(性能)
- 质量分(效果)
- 用户满意度(体验)
- 成本(Token消耗)
面试速答版
- AI 应用和传统软件最大区别是输出非确定性,不能只靠单元测试
- 质量保障三层:离线评测(上线前)→ 灰度发布(上线中)→ 线上监控(上线后)
- 自动评测三板斧:Golden Dataset + LLM Judge + 人工抽检
- 核心监控指标:成功率、延迟、质量分、用户满意度、Token 成本
- 灰度策略:1%→5%→20%→50%→100%,每阶段观察指标,异常自动回滚
- 问题定位靠 Tracing:完整链路追踪 + 日志 + 请求重放
- 一句话最佳实践:上线前有评测集把关,上线中有灰度兜底,上线后有监控告警
Related · Eval 与观测