42 KiB
webhook_views.py 代码改进计划
For Claude: REQUIRED SUB-SKILL: Use
executing-plansskill to implement this plan task-by-task.
Goal: 全面改进 webhook_views.py 的代码质量,修复严重bug,优化性能,增强安全性
Architecture:
- 创建专门的 validator 层用于输入验证
- 创建 service 层封装业务逻辑和数据操作
- 添加日志脱敏工具函数
- 重构现有 View 为更小、更专注的组件
- 统一使用 DRF Serializer 进行数据验证
Tech Stack: Django REST Framework, drf-yasg, json, logging
📋 问题分析摘要
1. 🔴 静默失败 (CRITICAL BUG)
位置: 第 146-154 行 问题: 数据库操作失败却返回 HTTP 200 OK
except Exception as db_error:
logger.error(f'数据库操作失败: {db_error}', exc_info=True)
return JsonResponse({
'success': True, # ❌ BUG: 应该是 False
'message': '...',
}, status=200) # ❌ BUG: 应该是 500
影响: 调用方无法得知数据未保存,导致数据不一致
2. 🟡 长函数
位置:
ItineraryWebhookView.post()- 130+ 行RequirementWebhookView.post()- 200+ 行create_daily_schedules()- 75+ 行
问题: 单一方法承担了太多职责,难以测试和维护
3. 🟡 N+1 查询
位置: create_daily_schedules() 方法
# 循环内查询目的地
for day_schedule in daily_schedules:
destination = Destination.objects.filter(...).first() # N+1
for activity in activities:
# 每个活动都单独查询
if activity_type == ATTRACTION:
attraction = Attraction.objects.get(...) # N+1
elif activity_type == MEAL:
restaurant = Restaurant.objects.get(...) # N+1
elif activity_type == CHECK_IN/CHECK_OUT:
hotel = Hotel.objects.get(...) # N+1
影响: 假设有 10 天行程,每天 5 个活动 = 最多 51 次数据库查询
4. 🟠 输入验证缺失
问题:
- 只检查字段存在,不验证类型
- 无数据大小限制
- 无格式验证(如日期格式、ID格式)
- 无恶意数据检测
5. 🟠 日志敏感信息泄露
位置:
- 第 340 行:
json.dumps(data, ensure_ascii=False)[:500] - 第 585 行:
user_input[:100]- 可能包含 PII - 多处记录完整请求数据
📝 实施计划
阶段 1: 日志脱敏工具 (安全优先)
Task 1.1: 创建日志脱敏工具
Files:
- Create:
apps/api/utils/logging_utils.py - Modify:
apps/api/views/webhook_views.py(导入使用)
Step 1: 创建日志脱敏模块
# apps/api/utils/logging_utils.py
"""日志脱敏工具模块"""
import re
import logging
from typing import Any, Dict, List, Optional
from functools import wraps
# 敏感字段名模式
SENSITIVE_FIELD_PATTERNS = [
'password', 'secret', 'token', 'api_key', 'apikey',
'phone', 'mobile', 'tel', 'email', 'address',
'id_card', 'idcard', 'passport', 'credit_card',
'card_number', 'bank_account', 'ssn'
]
# 手机号正则 (中国)
PHONE_PATTERN = re.compile(r'1[3-9]\d{9}')
# 邮箱正则
EMAIL_PATTERN = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
# 身份证正则
ID_CARD_PATTERN = re.compile(r'\d{17}[\dXx]')
class LogSanitizer:
"""日志脱敏器"""
@staticmethod
def mask_phone(text: str) -> str:
"""脱敏手机号: 13812345678 -> 138****5678"""
def replacer(match):
phone = match.group()
return phone[:3] + '****' + phone[-4:]
return PHONE_PATTERN.sub(replacer, text)
@staticmethod
def mask_email(text: str) -> str:
"""脱敏邮箱: user@example.com -> u***@example.com"""
def replacer(match):
email = match.group()
local = email.split('@')[0]
domain = email.split('@')[1] if '@' in email else ''
return f"{local[0]}***@{domain}"
return EMAIL_PATTERN.sub(replacer, text)
@staticmethod
def mask_id_card(text: str) -> str:
"""脱敏身份证: 110101199001011234 -> 110101********1234"""
def replacer(match):
id_card = match.group()
return id_card[:6] + '********' + id_card[-4:]
return ID_CARD_PATTERN.sub(replacer, text)
@classmethod
def is_sensitive_field(cls, field_name: str) -> bool:
"""判断字段是否为敏感字段"""
field_lower = field_name.lower()
for pattern in SENSITIVE_FIELD_PATTERNS:
if pattern in field_lower:
return True
return False
@classmethod
def sanitize_dict(cls, data: Any, depth: int = 0, max_depth: int = 10) -> Any:
"""递归脱敏字典中的敏感信息"""
if depth > max_depth:
return '[MAX_DEPTH_EXCEEDED]'
if isinstance(data, dict):
result = {}
for key, value in data.items():
if cls.is_sensitive_field(key):
if isinstance(value, str) and len(value) > 4:
result[key] = value[:2] + '***' + value[-2:]
else:
result[key] = '***'
else:
result[key] = cls.sanitize_dict(value, depth + 1, max_depth)
return result
elif isinstance(data, list):
return [cls.sanitize_dict(item, depth + 1, max_depth) for item in data]
elif isinstance(data, str):
text = cls.mask_phone(data)
text = cls.mask_email(text)
text = cls.mask_id_card(text)
# 截断过长字符串
if len(text) > 200:
return text[:200] + '...[TRUNCATED]'
return text
return data
@classmethod
def sanitize_for_logging(cls, data: Any, include_fields: Optional[List[str]] = None) -> str:
"""
将数据脱敏后转为日志字符串
Args:
data: 要脱敏的数据
include_fields: 如果指定,只记录这些字段(用于白名单模式)
"""
import json
if include_fields:
# 白名单模式
if isinstance(data, dict):
filtered = {k: v for k, v in data.items() if k in include_fields}
data = filtered
else:
data = {}
sanitized = cls.sanitize_dict(data)
try:
return json.dumps(sanitized, ensure_ascii=False, default=str)
except Exception:
return str(sanitized)
def log_webhook_call(logger: logging.Logger, data: Dict, message: str,
include_fields: Optional[List[str]] = None,
max_length: int = 500):
"""
记录 webhook 调用日志(自动脱敏)
Args:
logger: 日志记录器
data: 请求数据
message: 日志消息
include_fields: 只记录的字段列表(可选)
max_length: 最大日志长度
"""
sanitized = LogSanitizer.sanitize_for_logging(data, include_fields)
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + '...[TRUNCATED]'
logger.info(f"{message}: {sanitized}")
Step 2: 验证模块存在性
Run: ls -la apps/api/utils/
Expected: 目录存在或自动创建
阶段 2: 创建 Serializer 层 (输入验证)
Task 2.1: 创建 ItineraryWebhookSerializer
Files:
- Create:
apps/api/serializers/webhook_serializers.py
Step 1: 创建 Webhook 序列化器
# apps/api/serializers/webhook_serializers.py
"""Webhook API 专用序列化器"""
from rest_framework import serializers
from apps.models.requirement import Requirement
from apps.models.itinerary import Itinerary
from apps.models.destinations import Destination
from apps.models.traveler_stats import TravelerStats
from apps.models.daily_schedule import DailySchedule
from apps.models.attraction import Attraction
from apps.models.hotel import Hotel
from apps.models.restaurant import Restaurant
class ActivitySerializer(serializers.Serializer):
"""单个活动序列化器"""
activity_title = serializers.CharField(max_length=200)
activity_type = serializers.ChoiceField(choices=[
'FLIGHT', 'TRAIN', 'ATTRACTION', 'MEAL',
'TRANSPORT', 'SHOPPING', 'FREE', 'CHECK_IN', 'CHECK_OUT', 'OTHER'
])
start_time = serializers.TimeField(format='%H:%M:%S')
end_time = serializers.TimeField(format='%H:%M:%S')
activity_description = serializers.CharField(required=False, allow_blank=True)
id_reference = serializers.CharField(required=False, allow_blank=True)
class DayScheduleSerializer(serializers.Serializer):
"""每日行程序列化器"""
day = serializers.IntegerField(min_value=1)
date = serializers.DateField()
city = serializers.CharField(max_length=100)
activities = ActivitySerializer(many=True)
class DestinationSerializer(serializers.Serializer):
"""目的地序列化器"""
city_name = serializers.CharField(max_length=100)
country_code = serializers.CharField(max_length=3, required=False, default='CN')
destination_order = serializers.IntegerField(min_value=1)
arrival_date = serializers.DateField()
departure_date = serializers.DateField()
class TravelerStatsInputSerializer(serializers.Serializer):
"""旅行者统计输入序列化器"""
adults = serializers.IntegerField(min_value=0, default=0)
children = serializers.IntegerField(min_value=0, default=0)
infants = serializers.IntegerField(min_value=0, default=0)
seniors = serializers.IntegerField(min_value=0, default=0)
class ItineraryWebhookInputSerializer(serializers.Serializer):
"""Itinerary Webhook 输入序列化器"""
requirement_id = serializers.CharField(max_length=100)
itinerary_name = serializers.CharField(max_length=200)
start_date = serializers.DateField()
end_date = serializers.DateField()
# 可选字段
destinations = DestinationSerializer(many=True, required=False)
traveler_stats = TravelerStatsInputSerializer(required=False)
daily_schedules = DayScheduleSerializer(many=True, required=False)
def validate(self, data):
"""交叉验证"""
if data.get('start_date') and data.get('end_date'):
if data['end_date'] < data['start_date']:
raise serializers.ValidationError({
'end_date': '结束日期不能早于开始日期'
})
return data
class RequirementWebhookInputSerializer(serializers.Serializer):
"""Requirement Webhook 输入序列化器"""
user_input = serializers.CharField(max_length=5000, required=False)
structured_data = serializers.DictField(required=False)
requirement_id = serializers.CharField(max_length=100, required=False)
llm_info = serializers.DictField(required=False)
output = serializers.DictField(required=False)
def validate(self, data):
"""验证至少有一个有效数据源"""
if not any([
data.get('user_input'),
data.get('structured_data'),
data.get('requirement_id')
]):
raise serializers.ValidationError(
'至少需要提供 user_input, structured_data 或 requirement_id 之一'
)
return data
阶段 3: 修复静默失败 Bug (关键修复)
Task 3.1: 修复数据库失败返回 200 的问题
Files:
- Modify:
apps/api/views/webhook_views.py:146-154
Step 1: 修改异常处理逻辑
在 ItineraryWebhookView.post() 方法中,将:
except Exception as db_error:
logger.error(f'数据库操作失败: {db_error}', exc_info=True)
# 即使数据库操作失败,也要返回成功的响应,因为JSON解析和验证已经成功
return JsonResponse({
'success': True,
'message': 'JSON解析和验证成功,但数据库操作失败。这可能是因为数据库服务不可用。',
'itinerary_name': data.get('itinerary_name'),
'requirement_id': data.get('requirement_id')
}, status=200)
替换为:
except Exception as db_error:
logger.error(f'数据库操作失败: {db_error}', exc_info=True)
# 明确返回错误状态,让调用方知道数据未保存
return JsonResponse({
'success': False,
'error': '数据库操作失败,数据未保存',
'error_code': 'DB_OPERATION_FAILED',
'itinerary_name': data.get('itinerary_name') if data else None,
'requirement_id': data.get('requirement_id') if data else None
}, status=500)
Step 2: 添加错误响应结构化
在文件顶部添加错误码常量:
# 错误码定义
class WebhookErrorCode:
INVALID_JSON = 'INVALID_JSON'
MISSING_REQUIRED_FIELD = 'MISSING_REQUIRED_FIELD'
VALIDATION_FAILED = 'VALIDATION_FAILED'
REQUIREMENT_NOT_FOUND = 'REQUIREMENT_NOT_FOUND'
DB_OPERATION_FAILED = 'DB_OPERATION_FAILED'
N8N_WEBHOOK_ERROR = 'N8N_WEBHOOK_ERROR'
N8N_TIMEOUT = 'N8N_TIMEOUT'
INTERNAL_ERROR = 'INTERNAL_ERROR'
阶段 4: 重构长函数 (代码结构)
Task 4.1: 提取数据处理服务
Files:
- Create:
apps/api/services/webhook_services.py
Step 1: 创建 Webhook 服务层
# apps/api/services/webhook_services.py
"""Webhook 业务逻辑服务层"""
from typing import Dict, List, Optional, Tuple
from decimal import Decimal
from datetime import datetime, date, time
import logging
from django.db import transaction
from django.db.models import Q
from apps.models.requirement import Requirement
from apps.models.itinerary import Itinerary
from apps.models.destinations import Destination
from apps.models.traveler_stats import TravelerStats
from apps.models.daily_schedule import DailySchedule
from apps.models.attraction import Attraction
from apps.models.hotel import Hotel
from apps.models.restaurant import Restaurant
logger = logging.getLogger(__name__)
class ItineraryWebhookService:
"""行程 Webhook 服务"""
def __init__(self, validated_data: Dict):
self.data = validated_data
self.itinerary: Optional[Itinerary] = None
self.destinations_cache: Dict[str, Destination] = {}
self.attractions_cache: Dict[str, Attraction] = {}
self.hotels_cache: Dict[str, Hotel] = {}
self.restaurants_cache: Dict[str, Restaurant] = {}
def process(self, requirement: Requirement) -> Tuple[bool, str, Optional[Itinerary]]:
"""
处理行程数据
Returns:
Tuple[成功标志, 消息, 行程对象]
"""
try:
with transaction.atomic():
# 1. 预加载关联数据 (解决 N+1 问题)
self._preload_related_data()
# 2. 创建行程主表
self.itinerary = self._create_itinerary(requirement)
# 3. 创建关联关系
self._create_requirement_itinerary(requirement)
# 4. 创建目的地
self._create_destinations()
# 5. 创建旅行者统计
self._create_traveler_stats()
# 6. 创建每日行程
self._create_daily_schedules()
return True, '行程数据处理成功', self.itinerary
except Exception as e:
logger.error(f'处理行程数据失败: {e}', exc_info=True)
return False, str(e), None
def _preload_related_data(self):
"""预加载所有关联数据以避免 N+1 查询"""
requirement_id = self.data.get('requirement_id')
# 预加载目的地
destinations = self.data.get('destinations', [])
destination_cities = [d.get('city_name') for d in destinations]
if destination_cities:
self.destinations_cache = {
d.city_name: d for d in Destination.objects.filter(
itinerary=self.itinerary,
city_name__in=destination_cities
)
}
# 预加载所有可能用到的景点
attraction_ids = []
hotel_ids = []
restaurant_ids = []
for day_schedule in self.data.get('daily_schedules', []):
for activity in day_schedule.get('activities', []):
ref_id = activity.get('id_reference')
if activity.get('activity_type') == 'ATTRACTION':
attraction_ids.append(ref_id)
elif activity.get('activity_type') in ['CHECK_IN', 'CHECK_OUT']:
hotel_ids.append(ref_id)
elif activity.get('activity_type') == 'MEAL':
restaurant_ids.append(ref_id)
if attraction_ids:
self.attractions_cache = {
a.attraction_id: a for a in Attraction.objects.filter(
attraction_id__in=attraction_ids
)
}
if hotel_ids:
self.hotels_cache = {
h.hotel_id: h for h in Hotel.objects.filter(
hotel_id__in=hotel_ids
)
}
if restaurant_ids:
self.restaurants_cache = {
r.restaurant_id: r for r in Restaurant.objects.filter(
restaurant_id__in=restaurant_ids
)
}
def _create_itinerary(self, requirement: Requirement) -> Itinerary:
"""创建行程主表"""
itinerary = Itinerary(
itinerary_name=self.data.get('itinerary_name', '未命名行程'),
start_date=self.data.get('start_date'),
end_date=self.data.get('end_date'),
travel_purpose=Itinerary.TravelPurpose.LEISURE,
contact_person=requirement.contact_person or '测试用户',
contact_phone=requirement.contact_phone or '13800138000',
contact_company=requirement.contact_company,
departure_city=requirement.origin_name or '上海',
return_city=requirement.origin_name or '上海',
current_status=Itinerary.CurrentStatus.DRAFT,
created_by='webhook_user'
)
itinerary.save()
return itinerary
def _create_requirement_itinerary(self, requirement: Requirement):
"""创建需求与行程关联"""
from apps.models.requirement_itinerary import RequirementItinerary
requirement_itinerary = RequirementItinerary(
requirement=requirement,
itinerary=self.itinerary
)
requirement_itinerary.save()
logger.info(
f'创建需求与行程关联成功: '
f'requirement_id={requirement.requirement_id}, '
f'itinerary_id={self.itinerary.itinerary_id}'
)
def _create_destinations(self):
"""批量创建目的地"""
destinations = self.data.get('destinations', [])
for dest_data in destinations:
destination = Destination(
itinerary=self.itinerary,
destination_order=dest_data.get('destination_order'),
city_name=dest_data.get('city_name'),
country_code=dest_data.get('country_code', 'CN'),
arrival_date=dest_data.get('arrival_date'),
departure_date=dest_data.get('departure_date')
)
destination.save()
self.destinations_cache[dest_data.get('city_name')] = destination
def _create_traveler_stats(self):
"""创建旅行者统计"""
traveler_stats_info = self.data.get('traveler_stats', {})
traveler_stats = TravelerStats(
itinerary=self.itinerary,
adult_count=traveler_stats_info.get('adults', 0),
child_count=traveler_stats_info.get('children', 0),
infant_count=traveler_stats_info.get('infants', 0),
senior_count=traveler_stats_info.get('seniors', 0)
)
traveler_stats.save()
def _create_daily_schedules(self):
"""批量创建每日行程 (使用预加载的数据)"""
daily_schedules = self.data.get('daily_schedules', [])
for day_schedule in daily_schedules:
day_number = day_schedule.get('day')
schedule_date = day_schedule.get('date')
city = day_schedule.get('city')
# 使用缓存的目的地
destination = self.destinations_cache.get(city)
# 批量处理活动
activities = day_schedule.get('activities', [])
for activity in activities:
self._create_activity_schedule(
day_number, schedule_date, city, destination, activity
)
def _create_activity_schedule(
self, day_number: int, schedule_date: date, city: str,
destination: Optional[Destination], activity: Dict
):
"""创建单个活动记录"""
activity_type = self._map_activity_type(activity.get('activity_type'))
# 使用缓存的关联对象
attraction = self.hotel = self.restaurant = None
if activity_type == DailySchedule.ActivityType.ATTRACTION:
attraction = self.attractions_cache.get(activity.get('id_reference'))
elif activity_type in [DailySchedule.ActivityType.CHECK_IN, DailySchedule.ActivityType.CHECK_OUT]:
self.hotel = self.hotels_cache.get(activity.get('id_reference'))
elif activity_type == DailySchedule.ActivityType.MEAL:
self.restaurant = self.restaurants_cache.get(activity.get('id_reference'))
schedule = DailySchedule(
itinerary_id=self.itinerary,
day_number=day_number,
schedule_date=schedule_date,
destination_id=destination,
activity_type=activity_type,
activity_title=activity.get('activity_title'),
activity_description=activity.get('activity_description', ''),
start_time=activity.get('start_time'),
end_time=activity.get('end_time'),
attraction_id=attraction,
hotel_id=self.hotel,
restaurant_id=self.restaurant,
booking_status=DailySchedule.BookingStatus.NOT_BOOKED
)
schedule.save()
@staticmethod
def _map_activity_type(activity_type_str: str) -> str:
"""映射活动类型"""
mapping = {
'FLIGHT': DailySchedule.ActivityType.FLIGHT,
'TRAIN': DailySchedule.ActivityType.TRAIN,
'ATTRACTION': DailySchedule.ActivityType.ATTRACTION,
'MEAL': DailySchedule.ActivityType.MEAL,
'TRANSPORT': DailySchedule.ActivityType.TRANSPORT,
'SHOPPING': DailySchedule.ActivityType.SHOPPING,
'FREE': DailySchedule.ActivityType.FREE,
'CHECK_IN': DailySchedule.ActivityType.CHECK_IN,
'CHECK_OUT': DailySchedule.ActivityType.CHECK_OUT,
'OTHER': DailySchedule.ActivityType.OTHER
}
return mapping.get(activity_type_str, DailySchedule.ActivityType.OTHER)
class RequirementWebhookService:
"""需求 Webhook 服务"""
def __init__(self, validated_data: Dict):
self.data = validated_data
self.requirement_id: Optional[str] = None
def process(self) -> Tuple[bool, str, Optional[str], Optional[Requirement]]:
"""
处理需求数据
Returns:
Tuple[成功标志, 消息, 需求ID, 需求对象]
"""
try:
# 提取数据
requirement_data = self._extract_structured_data()
# 生成需求ID
self.requirement_id = self._generate_requirement_id()
# 构建需求对象
requirement = self._build_requirement(requirement_data)
# 保存
requirement.save()
logger.info(f'需求保存成功,ID: {self.requirement_id}')
return True, '需求处理成功', self.requirement_id, requirement
except Exception as e:
logger.error(f'处理需求失败: {e}', exc_info=True)
return False, str(e), None, None
def _extract_structured_data(self) -> Dict:
"""提取结构化数据"""
# 优先使用 structured_data,否则使用原始数据
if 'structured_data' in self.data:
return self.data['structured_data']
# 移除顶层元数据字段
result = {k: v for k, v in self.data.items()
if k not in ['user_input', 'requirement_id', 'llm_info', 'output']}
return result
def _generate_requirement_id(self) -> str:
"""生成唯一的 requirement_id"""
import time
max_attempts = 100
for attempt in range(max_attempts):
today = datetime.now().strftime('%Y%m%d')
prefix = f'REQ_{today}_'
# 获取当前最大序号
max_seq = Requirement.objects.filter(
requirement_id__startswith=prefix
).aggregate(
max_seq=Max(
Cast(
Substr(F('requirement_id'), len(prefix) + 1),
output_field=IntegerField()
)
)
)['max_seq'] or 0
new_id = f'{prefix}{max_seq + 1:03d}'
if not Requirement.objects.filter(requirement_id=new_id).exists():
return new_id
time.sleep(0.01)
raise ValueError('无法生成唯一的 requirement_id')
def _build_requirement(self, data: Dict) -> Requirement:
"""构建 Requirement 对象"""
base_info = data.get('base_info', {})
preferences = data.get('preferences', {})
budget = data.get('budget', {})
metadata = data.get('metadata', {})
# 解析日期
travel_start = self._parse_date(
base_info.get('travel_date', {}).get('start_date')
)
travel_end = self._parse_date(
base_info.get('travel_date', {}).get('end_date')
)
# 解析预算
budget_range = budget.get('range', {})
budget_min = self._parse_decimal(budget_range.get('min'))
budget_max = self._parse_decimal(budget_range.get('max'))
return Requirement(
requirement_id=self.requirement_id,
origin_name=base_info.get('origin', {}).get('name', ''),
origin_code=base_info.get('origin', {}).get('code', ''),
origin_type=base_info.get('origin', {}).get('type', ''),
destination_cities=base_info.get('destination_cities', []),
trip_days=base_info.get('trip_days', 1),
group_adults=base_info.get('group_size', {}).get('adults', 0),
group_children=base_info.get('group_size', {}).get('children', 0),
group_seniors=base_info.get('group_size', {}).get('seniors', 0),
group_total=base_info.get('group_size', {}).get('total', 1),
travel_start_date=travel_start,
travel_end_date=travel_end,
travel_date_flexible=base_info.get('travel_date', {}).get('is_flexible', False),
transportation_type=preferences.get('transportation', {}).get('type', ''),
transportation_notes=preferences.get('transportation', {}).get('notes', ''),
hotel_level=preferences.get('accommodation', {}).get('level', ''),
hotel_requirements=preferences.get('accommodation', {}).get('requirements', ''),
trip_rhythm=preferences.get('itinerary', {}).get('rhythm', ''),
preference_tags=preferences.get('itinerary', {}).get('tags', []),
must_visit_spots=preferences.get('itinerary', {}).get('special_constraints', {}).get('must_visit_spots', []),
avoid_activities=preferences.get('itinerary', {}).get('special_constraints', {}).get('avoid_activities', []),
budget_level=budget.get('level', ''),
budget_currency=budget.get('currency', 'CNY'),
budget_min=budget_min,
budget_max=budget_max,
budget_notes=budget.get('budget_notes', ''),
source_type=metadata.get('source_type', 'NaturalLanguage'),
status=metadata.get('status', 'Confirmed'),
assumptions=metadata.get('assumptions', []),
is_template=metadata.get('is_template', False),
template_name=metadata.get('template_info', {}).get('name', ''),
template_category=metadata.get('template_info', {}).get('category', ''),
extension=data.get('extension', {})
)
def _parse_date(self, value) -> Optional[date]:
"""解析日期"""
if not value:
return None
if isinstance(value, date):
return value
try:
return datetime.fromisoformat(str(value)).date()
except (ValueError, TypeError):
logger.warning(f'无法解析日期: {value}')
return None
def _parse_decimal(self, value) -> Optional[Decimal]:
"""解析 Decimal"""
if not value:
return None
try:
return Decimal(str(value))
except (ValueError, TypeError):
logger.warning(f'无法解析金额: {value}')
return None
注意: 需要在
_generate_requirement_id中添加必要的导入:from django.db.models import Max, F from django.db.models.functions import Substr from django.db.models import IntegerField
阶段 5: 重构 View 层 (使用新服务)
Task 5.1: 重构 ItineraryWebhookView
Files:
- Modify:
apps/api/views/webhook_views.py
Step 1: 重构 ItineraryWebhookView
@method_decorator(csrf_exempt, name='dispatch')
class ItineraryWebhookView(View):
"""处理n8n webhook返回的行程数据"""
def post(self, request, *args, **kwargs):
"""接收并处理webhook数据"""
from apps.api.utils.logging_utils import log_webhook_call
from apps.api.serializers.webhook_serializers import ItineraryWebhookInputSerializer
try:
# 1. 解析请求体
content = request.body.decode('utf-8')
raw_data = json.loads(content)
# 2. 日志记录(脱敏后)
log_webhook_call(
logger, raw_data,
'接收到行程webhook请求',
include_fields=['requirement_id', 'itinerary_name', 'start_date', 'end_date']
)
# 3. 处理 output 字段
data = raw_data.get('output', raw_data)
# 4. 输入验证
serializer = ItineraryWebhookInputSerializer(data=data)
if not serializer.is_valid():
logger.warning(f'输入验证失败: {serializer.errors}')
return JsonResponse({
'success': False,
'error': '输入验证失败',
'error_code': WebhookErrorCode.VALIDATION_FAILED,
'details': serializer.errors
}, status=400)
validated_data = serializer.validated_data
# 5. 验证关联需求存在
requirement_id = validated_data.get('requirement_id')
try:
requirement = Requirement.objects.get(requirement_id=requirement_id)
except Requirement.DoesNotExist:
logger.error(f'需求不存在: {requirement_id}')
return JsonResponse({
'success': False,
'error': f'需求不存在: {requirement_id}',
'error_code': WebhookErrorCode.REQUIREMENT_NOT_FOUND
}, status=404)
# 6. 调用服务处理
from apps.api.services.webhook_services import ItineraryWebhookService
service = ItineraryWebhookService(validated_data)
success, message, itinerary = service.process(requirement)
if success:
logger.info(f'行程创建成功: itinerary_id={itinerary.itinerary_id}')
return JsonResponse({
'success': True,
'itinerary_id': itinerary.itinerary_id,
'itinerary_name': itinerary.itinerary_name
}, status=200)
else:
logger.error(f'行程创建失败: {message}')
return JsonResponse({
'success': False,
'error': message,
'error_code': WebhookErrorCode.DB_OPERATION_FAILED
}, status=500)
except json.JSONDecodeError as e:
logger.error(f'JSON格式错误: {e}')
return JsonResponse({
'success': False,
'error': f'JSON格式错误: {str(e)}',
'error_code': WebhookErrorCode.INVALID_JSON
}, status=400)
except Exception as e:
logger.error(f'处理行程数据失败: {e}', exc_info=True)
return JsonResponse({
'success': False,
'error': f'处理数据失败: {str(e)}',
'error_code': WebhookErrorCode.INTERNAL_ERROR
}, status=500)
Task 5.2: 重构 RequirementWebhookView
Files:
- Modify:
apps/api/views/webhook_views.py
Step 1: 简化 RequirementWebhookView.post 方法
@method_decorator(csrf_exempt, name='dispatch')
class RequirementWebhookView(APIView):
"""处理n8n webhook返回的旅游需求数据"""
permission_classes = [AllowAny]
@swagger_auto_schema(...)
def post(self, request, *args, **kwargs):
from apps.api.utils.logging_utils import log_webhook_call
from apps.api.serializers.webhook_serializers import RequirementWebhookInputSerializer
from apps.api.services.webhook_services import RequirementWebhookService
try:
raw_data = request.data
# 处理列表结构
if isinstance(raw_data, list) and len(raw_data) > 0:
raw_data = raw_data[0]
# 处理 output 字段
if 'output' in raw_data:
raw_data = raw_data['output']
# 日志记录(脱敏)
log_webhook_call(
logger, raw_data,
'接收到需求webhook请求',
include_fields=['requirement_id', 'source_type', 'status']
)
# 输入验证
serializer = RequirementWebhookInputSerializer(data=raw_data)
if not serializer.is_valid():
return Response({
'success': False,
'error': '输入验证失败',
'error_code': WebhookErrorCode.VALIDATION_FAILED,
'details': serializer.errors
}, status=status.HTTP_400_BAD_REQUEST)
# 调用服务处理
service = RequirementWebhookService(serializer.validated_data)
success, message, requirement_id, requirement = service.process()
if success:
return Response({
'success': True,
'requirement_id': requirement_id,
'structured_data': serializer.validated_data.get('structured_data', {}),
'validation_errors': None,
'warnings': [],
'error': None
}, status=status.HTTP_200_OK)
else:
return Response({
'success': False,
'error': message,
'error_code': WebhookErrorCode.DB_OPERATION_FAILED
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f'处理需求解析数据失败: {e}', exc_info=True)
return Response({
'success': False,
'error': f'处理数据失败: {str(e)}',
'error_code': WebhookErrorCode.INTERNAL_ERROR
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
阶段 6: 添加单元测试
Task 6.1: 创建 Webhook 测试文件
Files:
- Create:
tests/api/test_webhook_views.py
Step 1: 创建测试文件
"""Webhook API 测试"""
import json
from datetime import date, time
from unittest.mock import patch, MagicMock
from django.test import TestCase, RequestFactory
from django.http import JsonResponse
from apps.api.views.webhook_views import (
ItineraryWebhookView,
RequirementWebhookView,
WebhookErrorCode
)
from apps.api.serializers.webhook_serializers import (
ItineraryWebhookInputSerializer,
RequirementWebhookInputSerializer
)
class ItineraryWebhookSerializerTest(TestCase):
"""ItineraryWebhookInputSerializer 测试"""
def test_valid_data(self):
"""测试有效数据"""
data = {
'requirement_id': 'REQ_20240101_001',
'itinerary_name': '测试行程',
'start_date': '2024-03-01',
'end_date': '2024-03-05'
}
serializer = ItineraryWebhookInputSerializer(data=data)
self.assertTrue(serializer.is_valid(), serializer.errors)
def test_missing_required_field(self):
"""测试缺少必需字段"""
data = {
'requirement_id': 'REQ_20240101_001'
# 缺少 itinerary_name
}
serializer = ItineraryWebhookInputSerializer(data=data)
self.assertFalse(serializer.is_valid())
self.assertIn('itinerary_name', serializer.errors)
def test_end_date_before_start_date(self):
"""测试结束日期早于开始日期"""
data = {
'requirement_id': 'REQ_20240101_001',
'itinerary_name': '测试行程',
'start_date': '2024-03-05',
'end_date': '2024-03-01' # 早于开始日期
}
serializer = ItineraryWebhookInputSerializer(data=data)
self.assertFalse(serializer.is_valid())
self.assertIn('end_date', str(serializer.errors))
class ItineraryWebhookViewTest(TestCase):
"""ItineraryWebhookView 测试"""
def setUp(self):
self.factory = RequestFactory()
self.view = ItineraryWebhookView.as_view()
@patch('apps.api.views.webhook_views.Requirement.objects')
def test_db_failure_returns_500(self, mock_requirement):
"""测试数据库失败时返回 500 而非 200"""
# 模拟数据库失败
mock_requirement.get.side_effect = Exception('Database connection failed')
request = self.factory.post(
'/api/webhook/itinerary/',
data=json.dumps({
'requirement_id': 'REQ_20240101_001',
'itinerary_name': '测试',
'start_date': '2024-03-01',
'end_date': '2024-03-05'
}),
content_type='application/json'
)
response = self.view(request)
# 关键断言:数据库失败应返回 500
self.assertEqual(response.status_code, 500)
response_data = json.loads(response.content)
self.assertFalse(response_data.get('success'))
self.assertEqual(
response_data.get('error_code'),
WebhookErrorCode.DB_OPERATION_FAILED
)
class LogSanitizerTest(TestCase):
"""日志脱敏工具测试"""
def test_mask_phone(self):
"""测试手机号脱敏"""
from apps.api.utils.logging_utils import LogSanitizer
text = "联系电话: 13812345678"
result = LogSanitizer.sanitize_dict({'phone': '13812345678'})
self.assertEqual(result['phone'], '13***5678')
def test_mask_email(self):
"""测试邮箱脱敏"""
from apps.api.utils.logging_utils import LogSanitizer
result = LogSanitizer.sanitize_dict({'email': 'user@example.com'})
self.assertEqual(result['email'], 'u***@example.com')
def test_sensitive_field_masking(self):
"""测试敏感字段自动脱敏"""
from apps.api.utils.logging_utils import LogSanitizer
data = {
'password': 'secret123',
'api_key': 'sk-abc123xyz',
'username': 'john_doe' # 非敏感字段
}
result = LogSanitizer.sanitize_dict(data)
self.assertEqual(result['password'], 'se***23')
self.assertEqual(result['api_key'], 'sk***yz')
self.assertEqual(result['username'], 'john_doe') # 保持原样
Step 2: 运行测试验证
Run: python manage.py test tests.api.test_webhook_views -v 2
Expected: 所有测试通过
📊 预期改进效果
| 指标 | 改进前 | 改进后 |
|---|---|---|
| 静默失败 Bug | 存在 | ✅ 修复 |
| post() 方法行数 | 130+ / 200+ | ~40 行 |
| N+1 查询次数 | 最多 51 次 | 最多 4 次 |
| 输入验证 | 基础检查 | 完整 Schema 验证 |
| 敏感信息泄露 | 存在 | ✅ 已脱敏 |
🚀 实施顺序建议
- Task 1.1 - 创建日志脱敏工具 (安全基础)
- Task 3.1 - 修复静默失败 Bug (关键修复)
- Task 2.1 - 创建 Serializer 层 (验证基础)
- Task 4.1 - 创建服务层 (重构核心)
- Task 5.1/5.2 - 重构 View 层 (使用新代码)
- Task 6.1 - 添加单元测试 (质量保证)
Plan complete and saved to docs/plans/webhook-views-improvement-plan.md
Two execution options:
1. Subagent-Driven (this session) - I dispatch fresh subagent per task, review between tasks, fast iteration
2. Parallel Session (separate) - Open new session with executing-plans, batch execution with checkpoints
Which approach?