--- title: webhook_views.py 代码改进计划 source: author: shenwei published: created: description: tags: [] --- # webhook_views.py 代码改进计划 > **For Claude:** REQUIRED SUB-SKILL: Use `executing-plans` skill to implement this plan task-by-task. **Goal:** 全面改进 webhook_views.py 的代码质量,修复严重bug,优化性能,增强安全性 **Architecture:** - 创建专门的 validator 层用于输入验证 - 创建 service 层封装业务逻辑和数据操作 - 添加日志脱敏工具函数 - 重构现有 View 为更小、更专注的组件 - 统一使用 DRF Serializer 进行数据验证 **Tech Stack:** Django REST Framework, drf-yasg, json, logging --- ## 📋 问题分析摘要 ### 1. 🔴 静默失败 (CRITICAL BUG) **位置:** 第 146-154 行 **问题:** 数据库操作失败却返回 HTTP 200 OK ```python except Exception as db_error: logger.error(f'数据库操作失败: {db_error}', exc_info=True) return JsonResponse({ 'success': True, # ❌ BUG: 应该是 False 'message': '...', }, status=200) # ❌ BUG: 应该是 500 ``` **影响:** 调用方无法得知数据未保存,导致数据不一致 ### 2. 🟡 长函数 **位置:** - `ItineraryWebhookView.post()` - 130+ 行 - `RequirementWebhookView.post()` - 200+ 行 - `create_daily_schedules()` - 75+ 行 **问题:** 单一方法承担了太多职责,难以测试和维护 ### 3. 🟡 N+1 查询 **位置:** `create_daily_schedules()` 方法 ```python # 循环内查询目的地 for day_schedule in daily_schedules: destination = Destination.objects.filter(...).first() # N+1 for activity in activities: # 每个活动都单独查询 if activity_type == ATTRACTION: attraction = Attraction.objects.get(...) # N+1 elif activity_type == MEAL: restaurant = Restaurant.objects.get(...) # N+1 elif activity_type == CHECK_IN/CHECK_OUT: hotel = Hotel.objects.get(...) # N+1 ``` **影响:** 假设有 10 天行程,每天 5 个活动 = 最多 51 次数据库查询 ### 4. 🟠 输入验证缺失 **问题:** - 只检查字段存在,不验证类型 - 无数据大小限制 - 无格式验证(如日期格式、ID格式) - 无恶意数据检测 ### 5. 🟠 日志敏感信息泄露 **位置:** - 第 340 行: `json.dumps(data, ensure_ascii=False)[:500]` - 第 585 行: `user_input[:100]` - 可能包含 PII - 多处记录完整请求数据 --- ## 📝 实施计划 ### 阶段 1: 日志脱敏工具 (安全优先) #### Task 1.1: 创建日志脱敏工具 **Files:** - Create: `apps/api/utils/logging_utils.py` - Modify: `apps/api/views/webhook_views.py` (导入使用) **Step 1: 创建日志脱敏模块** ```python # apps/api/utils/logging_utils.py """日志脱敏工具模块""" import re import logging from typing import Any, Dict, List, Optional from functools import wraps # 敏感字段名模式 SENSITIVE_FIELD_PATTERNS = [ 'password', 'secret', 'token', 'api_key', 'apikey', 'phone', 'mobile', 'tel', 'email', 'address', 'id_card', 'idcard', 'passport', 'credit_card', 'card_number', 'bank_account', 'ssn' ] # 手机号正则 (中国) PHONE_PATTERN = re.compile(r'1[3-9]\d{9}') # 邮箱正则 EMAIL_PATTERN = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}') # 身份证正则 ID_CARD_PATTERN = re.compile(r'\d{17}[\dXx]') class LogSanitizer: """日志脱敏器""" @staticmethod def mask_phone(text: str) -> str: """脱敏手机号: 13812345678 -> 138****5678""" def replacer(match): phone = match.group() return phone[:3] + '****' + phone[-4:] return PHONE_PATTERN.sub(replacer, text) @staticmethod def mask_email(text: str) -> str: """脱敏邮箱: user@example.com -> u***@example.com""" def replacer(match): email = match.group() local = email.split('@')[0] domain = email.split('@')[1] if '@' in email else '' return f"{local[0]}***@{domain}" return EMAIL_PATTERN.sub(replacer, text) @staticmethod def mask_id_card(text: str) -> str: """脱敏身份证: 110101199001011234 -> 110101********1234""" def replacer(match): id_card = match.group() return id_card[:6] + '********' + id_card[-4:] return ID_CARD_PATTERN.sub(replacer, text) @classmethod def is_sensitive_field(cls, field_name: str) -> bool: """判断字段是否为敏感字段""" field_lower = field_name.lower() for pattern in SENSITIVE_FIELD_PATTERNS: if pattern in field_lower: return True return False @classmethod def sanitize_dict(cls, data: Any, depth: int = 0, max_depth: int = 10) -> Any: """递归脱敏字典中的敏感信息""" if depth > max_depth: return '[MAX_DEPTH_EXCEEDED]' if isinstance(data, dict): result = {} for key, value in data.items(): if cls.is_sensitive_field(key): if isinstance(value, str) and len(value) > 4: result[key] = value[:2] + '***' + value[-2:] else: result[key] = '***' else: result[key] = cls.sanitize_dict(value, depth + 1, max_depth) return result elif isinstance(data, list): return [cls.sanitize_dict(item, depth + 1, max_depth) for item in data] elif isinstance(data, str): text = cls.mask_phone(data) text = cls.mask_email(text) text = cls.mask_id_card(text) # 截断过长字符串 if len(text) > 200: return text[:200] + '...[TRUNCATED]' return text return data @classmethod def sanitize_for_logging(cls, data: Any, include_fields: Optional[List[str]] = None) -> str: """ 将数据脱敏后转为日志字符串 Args: data: 要脱敏的数据 include_fields: 如果指定,只记录这些字段(用于白名单模式) """ import json if include_fields: # 白名单模式 if isinstance(data, dict): filtered = {k: v for k, v in data.items() if k in include_fields} data = filtered else: data = {} sanitized = cls.sanitize_dict(data) try: return json.dumps(sanitized, ensure_ascii=False, default=str) except Exception: return str(sanitized) def log_webhook_call(logger: logging.Logger, data: Dict, message: str, include_fields: Optional[List[str]] = None, max_length: int = 500): """ 记录 webhook 调用日志(自动脱敏) Args: logger: 日志记录器 data: 请求数据 message: 日志消息 include_fields: 只记录的字段列表(可选) max_length: 最大日志长度 """ sanitized = LogSanitizer.sanitize_for_logging(data, include_fields) if len(sanitized) > max_length: sanitized = sanitized[:max_length] + '...[TRUNCATED]' logger.info(f"{message}: {sanitized}") ``` **Step 2: 验证模块存在性** Run: `ls -la apps/api/utils/` Expected: 目录存在或自动创建 --- ### 阶段 2: 创建 Serializer 层 (输入验证) #### Task 2.1: 创建 ItineraryWebhookSerializer **Files:** - Create: `apps/api/serializers/webhook_serializers.py` **Step 1: 创建 Webhook 序列化器** ```python # apps/api/serializers/webhook_serializers.py """Webhook API 专用序列化器""" from rest_framework import serializers from apps.models.requirement import Requirement from apps.models.itinerary import Itinerary from apps.models.destinations import Destination from apps.models.traveler_stats import TravelerStats from apps.models.daily_schedule import DailySchedule from apps.models.attraction import Attraction from apps.models.hotel import Hotel from apps.models.restaurant import Restaurant class ActivitySerializer(serializers.Serializer): """单个活动序列化器""" activity_title = serializers.CharField(max_length=200) activity_type = serializers.ChoiceField(choices=[ 'FLIGHT', 'TRAIN', 'ATTRACTION', 'MEAL', 'TRANSPORT', 'SHOPPING', 'FREE', 'CHECK_IN', 'CHECK_OUT', 'OTHER' ]) start_time = serializers.TimeField(format='%H:%M:%S') end_time = serializers.TimeField(format='%H:%M:%S') activity_description = serializers.CharField(required=False, allow_blank=True) id_reference = serializers.CharField(required=False, allow_blank=True) class DayScheduleSerializer(serializers.Serializer): """每日行程序列化器""" day = serializers.IntegerField(min_value=1) date = serializers.DateField() city = serializers.CharField(max_length=100) activities = ActivitySerializer(many=True) class DestinationSerializer(serializers.Serializer): """目的地序列化器""" city_name = serializers.CharField(max_length=100) country_code = serializers.CharField(max_length=3, required=False, default='CN') destination_order = serializers.IntegerField(min_value=1) arrival_date = serializers.DateField() departure_date = serializers.DateField() class TravelerStatsInputSerializer(serializers.Serializer): """旅行者统计输入序列化器""" adults = serializers.IntegerField(min_value=0, default=0) children = serializers.IntegerField(min_value=0, default=0) infants = serializers.IntegerField(min_value=0, default=0) seniors = serializers.IntegerField(min_value=0, default=0) class ItineraryWebhookInputSerializer(serializers.Serializer): """Itinerary Webhook 输入序列化器""" requirement_id = serializers.CharField(max_length=100) itinerary_name = serializers.CharField(max_length=200) start_date = serializers.DateField() end_date = serializers.DateField() # 可选字段 destinations = DestinationSerializer(many=True, required=False) traveler_stats = TravelerStatsInputSerializer(required=False) daily_schedules = DayScheduleSerializer(many=True, required=False) def validate(self, data): """交叉验证""" if data.get('start_date') and data.get('end_date'): if data['end_date'] < data['start_date']: raise serializers.ValidationError({ 'end_date': '结束日期不能早于开始日期' }) return data class RequirementWebhookInputSerializer(serializers.Serializer): """Requirement Webhook 输入序列化器""" user_input = serializers.CharField(max_length=5000, required=False) structured_data = serializers.DictField(required=False) requirement_id = serializers.CharField(max_length=100, required=False) llm_info = serializers.DictField(required=False) output = serializers.DictField(required=False) def validate(self, data): """验证至少有一个有效数据源""" if not any([ data.get('user_input'), data.get('structured_data'), data.get('requirement_id') ]): raise serializers.ValidationError( '至少需要提供 user_input, structured_data 或 requirement_id 之一' ) return data ``` --- ### 阶段 3: 修复静默失败 Bug (关键修复) #### Task 3.1: 修复数据库失败返回 200 的问题 **Files:** - Modify: `apps/api/views/webhook_views.py:146-154` **Step 1: 修改异常处理逻辑** 在 `ItineraryWebhookView.post()` 方法中,将: ```python except Exception as db_error: logger.error(f'数据库操作失败: {db_error}', exc_info=True) # 即使数据库操作失败,也要返回成功的响应,因为JSON解析和验证已经成功 return JsonResponse({ 'success': True, 'message': 'JSON解析和验证成功,但数据库操作失败。这可能是因为数据库服务不可用。', 'itinerary_name': data.get('itinerary_name'), 'requirement_id': data.get('requirement_id') }, status=200) ``` 替换为: ```python except Exception as db_error: logger.error(f'数据库操作失败: {db_error}', exc_info=True) # 明确返回错误状态,让调用方知道数据未保存 return JsonResponse({ 'success': False, 'error': '数据库操作失败,数据未保存', 'error_code': 'DB_OPERATION_FAILED', 'itinerary_name': data.get('itinerary_name') if data else None, 'requirement_id': data.get('requirement_id') if data else None }, status=500) ``` **Step 2: 添加错误响应结构化** 在文件顶部添加错误码常量: ```python # 错误码定义 class WebhookErrorCode: INVALID_JSON = 'INVALID_JSON' MISSING_REQUIRED_FIELD = 'MISSING_REQUIRED_FIELD' VALIDATION_FAILED = 'VALIDATION_FAILED' REQUIREMENT_NOT_FOUND = 'REQUIREMENT_NOT_FOUND' DB_OPERATION_FAILED = 'DB_OPERATION_FAILED' N8N_WEBHOOK_ERROR = 'N8N_WEBHOOK_ERROR' N8N_TIMEOUT = 'N8N_TIMEOUT' INTERNAL_ERROR = 'INTERNAL_ERROR' ``` --- ### 阶段 4: 重构长函数 (代码结构) #### Task 4.1: 提取数据处理服务 **Files:** - Create: `apps/api/services/webhook_services.py` **Step 1: 创建 Webhook 服务层** ```python # apps/api/services/webhook_services.py """Webhook 业务逻辑服务层""" from typing import Dict, List, Optional, Tuple from decimal import Decimal from datetime import datetime, date, time import logging from django.db import transaction from django.db.models import Q from apps.models.requirement import Requirement from apps.models.itinerary import Itinerary from apps.models.destinations import Destination from apps.models.traveler_stats import TravelerStats from apps.models.daily_schedule import DailySchedule from apps.models.attraction import Attraction from apps.models.hotel import Hotel from apps.models.restaurant import Restaurant logger = logging.getLogger(__name__) class ItineraryWebhookService: """行程 Webhook 服务""" def __init__(self, validated_data: Dict): self.data = validated_data self.itinerary: Optional[Itinerary] = None self.destinations_cache: Dict[str, Destination] = {} self.attractions_cache: Dict[str, Attraction] = {} self.hotels_cache: Dict[str, Hotel] = {} self.restaurants_cache: Dict[str, Restaurant] = {} def process(self, requirement: Requirement) -> Tuple[bool, str, Optional[Itinerary]]: """ 处理行程数据 Returns: Tuple[成功标志, 消息, 行程对象] """ try: with transaction.atomic(): # 1. 预加载关联数据 (解决 N+1 问题) self._preload_related_data() # 2. 创建行程主表 self.itinerary = self._create_itinerary(requirement) # 3. 创建关联关系 self._create_requirement_itinerary(requirement) # 4. 创建目的地 self._create_destinations() # 5. 创建旅行者统计 self._create_traveler_stats() # 6. 创建每日行程 self._create_daily_schedules() return True, '行程数据处理成功', self.itinerary except Exception as e: logger.error(f'处理行程数据失败: {e}', exc_info=True) return False, str(e), None def _preload_related_data(self): """预加载所有关联数据以避免 N+1 查询""" requirement_id = self.data.get('requirement_id') # 预加载目的地 destinations = self.data.get('destinations', []) destination_cities = [d.get('city_name') for d in destinations] if destination_cities: self.destinations_cache = { d.city_name: d for d in Destination.objects.filter( itinerary=self.itinerary, city_name__in=destination_cities ) } # 预加载所有可能用到的景点 attraction_ids = [] hotel_ids = [] restaurant_ids = [] for day_schedule in self.data.get('daily_schedules', []): for activity in day_schedule.get('activities', []): ref_id = activity.get('id_reference') if activity.get('activity_type') == 'ATTRACTION': attraction_ids.append(ref_id) elif activity.get('activity_type') in ['CHECK_IN', 'CHECK_OUT']: hotel_ids.append(ref_id) elif activity.get('activity_type') == 'MEAL': restaurant_ids.append(ref_id) if attraction_ids: self.attractions_cache = { a.attraction_id: a for a in Attraction.objects.filter( attraction_id__in=attraction_ids ) } if hotel_ids: self.hotels_cache = { h.hotel_id: h for h in Hotel.objects.filter( hotel_id__in=hotel_ids ) } if restaurant_ids: self.restaurants_cache = { r.restaurant_id: r for r in Restaurant.objects.filter( restaurant_id__in=restaurant_ids ) } def _create_itinerary(self, requirement: Requirement) -> Itinerary: """创建行程主表""" itinerary = Itinerary( itinerary_name=self.data.get('itinerary_name', '未命名行程'), start_date=self.data.get('start_date'), end_date=self.data.get('end_date'), travel_purpose=Itinerary.TravelPurpose.LEISURE, contact_person=requirement.contact_person or '测试用户', contact_phone=requirement.contact_phone or '13800138000', contact_company=requirement.contact_company, departure_city=requirement.origin_name or '上海', return_city=requirement.origin_name or '上海', current_status=Itinerary.CurrentStatus.DRAFT, created_by='webhook_user' ) itinerary.save() return itinerary def _create_requirement_itinerary(self, requirement: Requirement): """创建需求与行程关联""" from apps.models.requirement_itinerary import RequirementItinerary requirement_itinerary = RequirementItinerary( requirement=requirement, itinerary=self.itinerary ) requirement_itinerary.save() logger.info( f'创建需求与行程关联成功: ' f'requirement_id={requirement.requirement_id}, ' f'itinerary_id={self.itinerary.itinerary_id}' ) def _create_destinations(self): """批量创建目的地""" destinations = self.data.get('destinations', []) for dest_data in destinations: destination = Destination( itinerary=self.itinerary, destination_order=dest_data.get('destination_order'), city_name=dest_data.get('city_name'), country_code=dest_data.get('country_code', 'CN'), arrival_date=dest_data.get('arrival_date'), departure_date=dest_data.get('departure_date') ) destination.save() self.destinations_cache[dest_data.get('city_name')] = destination def _create_traveler_stats(self): """创建旅行者统计""" traveler_stats_info = self.data.get('traveler_stats', {}) traveler_stats = TravelerStats( itinerary=self.itinerary, adult_count=traveler_stats_info.get('adults', 0), child_count=traveler_stats_info.get('children', 0), infant_count=traveler_stats_info.get('infants', 0), senior_count=traveler_stats_info.get('seniors', 0) ) traveler_stats.save() def _create_daily_schedules(self): """批量创建每日行程 (使用预加载的数据)""" daily_schedules = self.data.get('daily_schedules', []) for day_schedule in daily_schedules: day_number = day_schedule.get('day') schedule_date = day_schedule.get('date') city = day_schedule.get('city') # 使用缓存的目的地 destination = self.destinations_cache.get(city) # 批量处理活动 activities = day_schedule.get('activities', []) for activity in activities: self._create_activity_schedule( day_number, schedule_date, city, destination, activity ) def _create_activity_schedule( self, day_number: int, schedule_date: date, city: str, destination: Optional[Destination], activity: Dict ): """创建单个活动记录""" activity_type = self._map_activity_type(activity.get('activity_type')) # 使用缓存的关联对象 attraction = self.hotel = self.restaurant = None if activity_type == DailySchedule.ActivityType.ATTRACTION: attraction = self.attractions_cache.get(activity.get('id_reference')) elif activity_type in [DailySchedule.ActivityType.CHECK_IN, DailySchedule.ActivityType.CHECK_OUT]: self.hotel = self.hotels_cache.get(activity.get('id_reference')) elif activity_type == DailySchedule.ActivityType.MEAL: self.restaurant = self.restaurants_cache.get(activity.get('id_reference')) schedule = DailySchedule( itinerary_id=self.itinerary, day_number=day_number, schedule_date=schedule_date, destination_id=destination, activity_type=activity_type, activity_title=activity.get('activity_title'), activity_description=activity.get('activity_description', ''), start_time=activity.get('start_time'), end_time=activity.get('end_time'), attraction_id=attraction, hotel_id=self.hotel, restaurant_id=self.restaurant, booking_status=DailySchedule.BookingStatus.NOT_BOOKED ) schedule.save() @staticmethod def _map_activity_type(activity_type_str: str) -> str: """映射活动类型""" mapping = { 'FLIGHT': DailySchedule.ActivityType.FLIGHT, 'TRAIN': DailySchedule.ActivityType.TRAIN, 'ATTRACTION': DailySchedule.ActivityType.ATTRACTION, 'MEAL': DailySchedule.ActivityType.MEAL, 'TRANSPORT': DailySchedule.ActivityType.TRANSPORT, 'SHOPPING': DailySchedule.ActivityType.SHOPPING, 'FREE': DailySchedule.ActivityType.FREE, 'CHECK_IN': DailySchedule.ActivityType.CHECK_IN, 'CHECK_OUT': DailySchedule.ActivityType.CHECK_OUT, 'OTHER': DailySchedule.ActivityType.OTHER } return mapping.get(activity_type_str, DailySchedule.ActivityType.OTHER) class RequirementWebhookService: """需求 Webhook 服务""" def __init__(self, validated_data: Dict): self.data = validated_data self.requirement_id: Optional[str] = None def process(self) -> Tuple[bool, str, Optional[str], Optional[Requirement]]: """ 处理需求数据 Returns: Tuple[成功标志, 消息, 需求ID, 需求对象] """ try: # 提取数据 requirement_data = self._extract_structured_data() # 生成需求ID self.requirement_id = self._generate_requirement_id() # 构建需求对象 requirement = self._build_requirement(requirement_data) # 保存 requirement.save() logger.info(f'需求保存成功,ID: {self.requirement_id}') return True, '需求处理成功', self.requirement_id, requirement except Exception as e: logger.error(f'处理需求失败: {e}', exc_info=True) return False, str(e), None, None def _extract_structured_data(self) -> Dict: """提取结构化数据""" # 优先使用 structured_data,否则使用原始数据 if 'structured_data' in self.data: return self.data['structured_data'] # 移除顶层元数据字段 result = {k: v for k, v in self.data.items() if k not in ['user_input', 'requirement_id', 'llm_info', 'output']} return result def _generate_requirement_id(self) -> str: """生成唯一的 requirement_id""" import time max_attempts = 100 for attempt in range(max_attempts): today = datetime.now().strftime('%Y%m%d') prefix = f'REQ_{today}_' # 获取当前最大序号 max_seq = Requirement.objects.filter( requirement_id__startswith=prefix ).aggregate( max_seq=Max( Cast( Substr(F('requirement_id'), len(prefix) + 1), output_field=IntegerField() ) ) )['max_seq'] or 0 new_id = f'{prefix}{max_seq + 1:03d}' if not Requirement.objects.filter(requirement_id=new_id).exists(): return new_id time.sleep(0.01) raise ValueError('无法生成唯一的 requirement_id') def _build_requirement(self, data: Dict) -> Requirement: """构建 Requirement 对象""" base_info = data.get('base_info', {}) preferences = data.get('preferences', {}) budget = data.get('budget', {}) metadata = data.get('metadata', {}) # 解析日期 travel_start = self._parse_date( base_info.get('travel_date', {}).get('start_date') ) travel_end = self._parse_date( base_info.get('travel_date', {}).get('end_date') ) # 解析预算 budget_range = budget.get('range', {}) budget_min = self._parse_decimal(budget_range.get('min')) budget_max = self._parse_decimal(budget_range.get('max')) return Requirement( requirement_id=self.requirement_id, origin_name=base_info.get('origin', {}).get('name', ''), origin_code=base_info.get('origin', {}).get('code', ''), origin_type=base_info.get('origin', {}).get('type', ''), destination_cities=base_info.get('destination_cities', []), trip_days=base_info.get('trip_days', 1), group_adults=base_info.get('group_size', {}).get('adults', 0), group_children=base_info.get('group_size', {}).get('children', 0), group_seniors=base_info.get('group_size', {}).get('seniors', 0), group_total=base_info.get('group_size', {}).get('total', 1), travel_start_date=travel_start, travel_end_date=travel_end, travel_date_flexible=base_info.get('travel_date', {}).get('is_flexible', False), transportation_type=preferences.get('transportation', {}).get('type', ''), transportation_notes=preferences.get('transportation', {}).get('notes', ''), hotel_level=preferences.get('accommodation', {}).get('level', ''), hotel_requirements=preferences.get('accommodation', {}).get('requirements', ''), trip_rhythm=preferences.get('itinerary', {}).get('rhythm', ''), preference_tags=preferences.get('itinerary', {}).get('tags', []), must_visit_spots=preferences.get('itinerary', {}).get('special_constraints', {}).get('must_visit_spots', []), avoid_activities=preferences.get('itinerary', {}).get('special_constraints', {}).get('avoid_activities', []), budget_level=budget.get('level', ''), budget_currency=budget.get('currency', 'CNY'), budget_min=budget_min, budget_max=budget_max, budget_notes=budget.get('budget_notes', ''), source_type=metadata.get('source_type', 'NaturalLanguage'), status=metadata.get('status', 'Confirmed'), assumptions=metadata.get('assumptions', []), is_template=metadata.get('is_template', False), template_name=metadata.get('template_info', {}).get('name', ''), template_category=metadata.get('template_info', {}).get('category', ''), extension=data.get('extension', {}) ) def _parse_date(self, value) -> Optional[date]: """解析日期""" if not value: return None if isinstance(value, date): return value try: return datetime.fromisoformat(str(value)).date() except (ValueError, TypeError): logger.warning(f'无法解析日期: {value}') return None def _parse_decimal(self, value) -> Optional[Decimal]: """解析 Decimal""" if not value: return None try: return Decimal(str(value)) except (ValueError, TypeError): logger.warning(f'无法解析金额: {value}') return None ``` > **注意:** 需要在 `_generate_requirement_id` 中添加必要的导入: > ```python > from django.db.models import Max, F > from django.db.models.functions import Substr > from django.db.models import IntegerField > ``` --- ### 阶段 5: 重构 View 层 (使用新服务) #### Task 5.1: 重构 ItineraryWebhookView **Files:** - Modify: `apps/api/views/webhook_views.py` **Step 1: 重构 ItineraryWebhookView** ```python @method_decorator(csrf_exempt, name='dispatch') class ItineraryWebhookView(View): """处理n8n webhook返回的行程数据""" def post(self, request, *args, **kwargs): """接收并处理webhook数据""" from apps.api.utils.logging_utils import log_webhook_call from apps.api.serializers.webhook_serializers import ItineraryWebhookInputSerializer try: # 1. 解析请求体 content = request.body.decode('utf-8') raw_data = json.loads(content) # 2. 日志记录(脱敏后) log_webhook_call( logger, raw_data, '接收到行程webhook请求', include_fields=['requirement_id', 'itinerary_name', 'start_date', 'end_date'] ) # 3. 处理 output 字段 data = raw_data.get('output', raw_data) # 4. 输入验证 serializer = ItineraryWebhookInputSerializer(data=data) if not serializer.is_valid(): logger.warning(f'输入验证失败: {serializer.errors}') return JsonResponse({ 'success': False, 'error': '输入验证失败', 'error_code': WebhookErrorCode.VALIDATION_FAILED, 'details': serializer.errors }, status=400) validated_data = serializer.validated_data # 5. 验证关联需求存在 requirement_id = validated_data.get('requirement_id') try: requirement = Requirement.objects.get(requirement_id=requirement_id) except Requirement.DoesNotExist: logger.error(f'需求不存在: {requirement_id}') return JsonResponse({ 'success': False, 'error': f'需求不存在: {requirement_id}', 'error_code': WebhookErrorCode.REQUIREMENT_NOT_FOUND }, status=404) # 6. 调用服务处理 from apps.api.services.webhook_services import ItineraryWebhookService service = ItineraryWebhookService(validated_data) success, message, itinerary = service.process(requirement) if success: logger.info(f'行程创建成功: itinerary_id={itinerary.itinerary_id}') return JsonResponse({ 'success': True, 'itinerary_id': itinerary.itinerary_id, 'itinerary_name': itinerary.itinerary_name }, status=200) else: logger.error(f'行程创建失败: {message}') return JsonResponse({ 'success': False, 'error': message, 'error_code': WebhookErrorCode.DB_OPERATION_FAILED }, status=500) except json.JSONDecodeError as e: logger.error(f'JSON格式错误: {e}') return JsonResponse({ 'success': False, 'error': f'JSON格式错误: {str(e)}', 'error_code': WebhookErrorCode.INVALID_JSON }, status=400) except Exception as e: logger.error(f'处理行程数据失败: {e}', exc_info=True) return JsonResponse({ 'success': False, 'error': f'处理数据失败: {str(e)}', 'error_code': WebhookErrorCode.INTERNAL_ERROR }, status=500) ``` #### Task 5.2: 重构 RequirementWebhookView **Files:** - Modify: `apps/api/views/webhook_views.py` **Step 1: 简化 RequirementWebhookView.post 方法** ```python @method_decorator(csrf_exempt, name='dispatch') class RequirementWebhookView(APIView): """处理n8n webhook返回的旅游需求数据""" permission_classes = [AllowAny] @swagger_auto_schema(...) def post(self, request, *args, **kwargs): from apps.api.utils.logging_utils import log_webhook_call from apps.api.serializers.webhook_serializers import RequirementWebhookInputSerializer from apps.api.services.webhook_services import RequirementWebhookService try: raw_data = request.data # 处理列表结构 if isinstance(raw_data, list) and len(raw_data) > 0: raw_data = raw_data[0] # 处理 output 字段 if 'output' in raw_data: raw_data = raw_data['output'] # 日志记录(脱敏) log_webhook_call( logger, raw_data, '接收到需求webhook请求', include_fields=['requirement_id', 'source_type', 'status'] ) # 输入验证 serializer = RequirementWebhookInputSerializer(data=raw_data) if not serializer.is_valid(): return Response({ 'success': False, 'error': '输入验证失败', 'error_code': WebhookErrorCode.VALIDATION_FAILED, 'details': serializer.errors }, status=status.HTTP_400_BAD_REQUEST) # 调用服务处理 service = RequirementWebhookService(serializer.validated_data) success, message, requirement_id, requirement = service.process() if success: return Response({ 'success': True, 'requirement_id': requirement_id, 'structured_data': serializer.validated_data.get('structured_data', {}), 'validation_errors': None, 'warnings': [], 'error': None }, status=status.HTTP_200_OK) else: return Response({ 'success': False, 'error': message, 'error_code': WebhookErrorCode.DB_OPERATION_FAILED }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: logger.error(f'处理需求解析数据失败: {e}', exc_info=True) return Response({ 'success': False, 'error': f'处理数据失败: {str(e)}', 'error_code': WebhookErrorCode.INTERNAL_ERROR }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) ``` --- ### 阶段 6: 添加单元测试 #### Task 6.1: 创建 Webhook 测试文件 **Files:** - Create: `tests/api/test_webhook_views.py` **Step 1: 创建测试文件** ```python """Webhook API 测试""" import json from datetime import date, time from unittest.mock import patch, MagicMock from django.test import TestCase, RequestFactory from django.http import JsonResponse from apps.api.views.webhook_views import ( ItineraryWebhookView, RequirementWebhookView, WebhookErrorCode ) from apps.api.serializers.webhook_serializers import ( ItineraryWebhookInputSerializer, RequirementWebhookInputSerializer ) class ItineraryWebhookSerializerTest(TestCase): """ItineraryWebhookInputSerializer 测试""" def test_valid_data(self): """测试有效数据""" data = { 'requirement_id': 'REQ_20240101_001', 'itinerary_name': '测试行程', 'start_date': '2024-03-01', 'end_date': '2024-03-05' } serializer = ItineraryWebhookInputSerializer(data=data) self.assertTrue(serializer.is_valid(), serializer.errors) def test_missing_required_field(self): """测试缺少必需字段""" data = { 'requirement_id': 'REQ_20240101_001' # 缺少 itinerary_name } serializer = ItineraryWebhookInputSerializer(data=data) self.assertFalse(serializer.is_valid()) self.assertIn('itinerary_name', serializer.errors) def test_end_date_before_start_date(self): """测试结束日期早于开始日期""" data = { 'requirement_id': 'REQ_20240101_001', 'itinerary_name': '测试行程', 'start_date': '2024-03-05', 'end_date': '2024-03-01' # 早于开始日期 } serializer = ItineraryWebhookInputSerializer(data=data) self.assertFalse(serializer.is_valid()) self.assertIn('end_date', str(serializer.errors)) class ItineraryWebhookViewTest(TestCase): """ItineraryWebhookView 测试""" def setUp(self): self.factory = RequestFactory() self.view = ItineraryWebhookView.as_view() @patch('apps.api.views.webhook_views.Requirement.objects') def test_db_failure_returns_500(self, mock_requirement): """测试数据库失败时返回 500 而非 200""" # 模拟数据库失败 mock_requirement.get.side_effect = Exception('Database connection failed') request = self.factory.post( '/api/webhook/itinerary/', data=json.dumps({ 'requirement_id': 'REQ_20240101_001', 'itinerary_name': '测试', 'start_date': '2024-03-01', 'end_date': '2024-03-05' }), content_type='application/json' ) response = self.view(request) # 关键断言:数据库失败应返回 500 self.assertEqual(response.status_code, 500) response_data = json.loads(response.content) self.assertFalse(response_data.get('success')) self.assertEqual( response_data.get('error_code'), WebhookErrorCode.DB_OPERATION_FAILED ) class LogSanitizerTest(TestCase): """日志脱敏工具测试""" def test_mask_phone(self): """测试手机号脱敏""" from apps.api.utils.logging_utils import LogSanitizer text = "联系电话: 13812345678" result = LogSanitizer.sanitize_dict({'phone': '13812345678'}) self.assertEqual(result['phone'], '13***5678') def test_mask_email(self): """测试邮箱脱敏""" from apps.api.utils.logging_utils import LogSanitizer result = LogSanitizer.sanitize_dict({'email': 'user@example.com'}) self.assertEqual(result['email'], 'u***@example.com') def test_sensitive_field_masking(self): """测试敏感字段自动脱敏""" from apps.api.utils.logging_utils import LogSanitizer data = { 'password': 'secret123', 'api_key': 'sk-abc123xyz', 'username': 'john_doe' # 非敏感字段 } result = LogSanitizer.sanitize_dict(data) self.assertEqual(result['password'], 'se***23') self.assertEqual(result['api_key'], 'sk***yz') self.assertEqual(result['username'], 'john_doe') # 保持原样 ``` **Step 2: 运行测试验证** Run: `python manage.py test tests.api.test_webhook_views -v 2` Expected: 所有测试通过 --- ## 📊 预期改进效果 | 指标 | 改进前 | 改进后 | |------|--------|--------| | **静默失败 Bug** | 存在 | ✅ 修复 | | **post() 方法行数** | 130+ / 200+ | ~40 行 | | **N+1 查询次数** | 最多 51 次 | 最多 4 次 | | **输入验证** | 基础检查 | 完整 Schema 验证 | | **敏感信息泄露** | 存在 | ✅ 已脱敏 | --- ## 🚀 实施顺序建议 1. **Task 1.1** - 创建日志脱敏工具 (安全基础) 2. **Task 3.1** - 修复静默失败 Bug (关键修复) 3. **Task 2.1** - 创建 Serializer 层 (验证基础) 4. **Task 4.1** - 创建服务层 (重构核心) 5. **Task 5.1/5.2** - 重构 View 层 (使用新代码) 6. **Task 6.1** - 添加单元测试 (质量保证) --- **Plan complete and saved to `docs/plans/webhook-views-improvement-plan.md`** **Two execution options:** **1. Subagent-Driven (this session)** - I dispatch fresh subagent per task, review between tasks, fast iteration **2. Parallel Session (separate)** - Open new session with executing-plans, batch execution with checkpoints **Which approach?**