from django.contrib.auth.models import AbstractBaseUser, BaseUserManager from django.db import models from django.utils import timezone from core.enums import LoginFailureReason, UserAccountStatus class UserAccountManager(BaseUserManager): def create_user(self, username, password=None, **extra_fields): if not username: raise ValueError("username 不能为空") user = self.model(username=username, **extra_fields) if password: user.set_password(password) user.save(using=self._db) return user class UserAccount(AbstractBaseUser): username = models.CharField( max_length=30, verbose_name="登录名", help_text="普通员工=手机号(11位数字) / Tenant Admin=自定义(字母开头6~30位);创建后不可更改", ) email = models.EmailField( null=True, blank=True, verbose_name="绑定邮箱", help_text="用于找回密码/用户名;为空则无法自助找回;同租户唯一", ) phone_enc = models.TextField( null=True, blank=True, verbose_name="手机号(加密)", help_text="AES-256-GCM 加密密文;普通员工必填", ) phone_hash = models.CharField( max_length=64, null=True, blank=True, verbose_name="手机号哈希", help_text="SHA-256 哈希;用于唯一性校验和查询;不可反推原文", ) staff = models.OneToOneField( "org.Staff", null=True, blank=True, on_delete=models.SET_NULL, related_name="account", verbose_name="员工档案", help_text="员工档案绑定(1:1);普通员工必须有值;Tenant Admin 可为空", ) is_tenant_admin = models.BooleanField( default=False, verbose_name="是否租户超管", help_text="每个租户最多 1 个(应用层约束)", ) status = models.CharField( max_length=10, choices=UserAccountStatus.choices, default=UserAccountStatus.ACTIVE, verbose_name="账号状态", help_text="active=正常 / disabled=停用 / locked=锁定(30 分钟自动恢复)", ) is_initial_password = models.BooleanField( default=True, verbose_name="是否初始密码", help_text="True 时登录成功后强制跳转修改密码页,不可跳过", ) locked_until = models.DateTimeField( null=True, blank=True, verbose_name="锁定到期时间", help_text="到期后应用层将 status 恢复 active", ) created_at = models.DateTimeField( auto_now_add=True, verbose_name="创建时间", ) updated_at = models.DateTimeField( auto_now=True, verbose_name="最后更新时间", ) created_by = models.ForeignKey( "self", null=True, blank=True, on_delete=models.SET_NULL, related_name="created_accounts", verbose_name="创建人", help_text="普通员工由 Tenant Admin 创建;Tenant Admin 由平台运营创建(可为 NULL)", ) USERNAME_FIELD = "username" REQUIRED_FIELDS: list = [] objects = UserAccountManager() class Meta: db_table = "user_accounts" verbose_name = "用户账号" verbose_name_plural = "用户账号" constraints = [ models.UniqueConstraint(fields=["username"], name="uq_user_accounts_username"), models.UniqueConstraint( fields=["email"], name="uq_user_accounts_email", condition=models.Q(email__isnull=False), ), models.UniqueConstraint( fields=["phone_hash"], name="uq_user_accounts_phone", condition=models.Q(phone_hash__isnull=False), ), ] indexes = [ models.Index(fields=["status"], name="idx_user_accounts_status"), models.Index(fields=["staff"], name="idx_user_accounts_staff"), ] def __str__(self) -> str: kind = "admin" if self.is_tenant_admin else "staff" return f"{self.username} ({kind})" def is_locked(self) -> bool: if self.status != UserAccountStatus.LOCKED: return False if self.locked_until and timezone.now() >= self.locked_until: return False return True class LoginAttempt(models.Model): username = models.CharField( max_length=30, verbose_name="登录用户名", help_text="冗余存储,即使账号不存在也记录", ) ip_address = models.GenericIPAddressField( verbose_name="来源 IP", help_text="支持 IPv4/IPv6", ) user_agent = models.TextField( null=True, blank=True, verbose_name="客户端 UA", help_text="Electron 版本信息", ) success = models.BooleanField( verbose_name="是否登录成功", ) failure_reason = models.CharField( max_length=30, null=True, blank=True, choices=LoginFailureReason.choices, verbose_name="失败原因", help_text="wrong_password=密码错误 / wrong_captcha=验证码失败 / account_locked=账号锁定 / account_disabled=账号停用 / tenant_not_found=租户不存在", ) attempted_at = models.DateTimeField( auto_now_add=True, verbose_name="尝试时间", help_text="分区键,按月分区", ) class Meta: db_table = "login_attempts" verbose_name = "登录尝试记录" verbose_name_plural = "登录尝试记录" indexes = [ models.Index(fields=["username"], name="idx_login_attempts_username"), models.Index(fields=["ip_address"], name="idx_login_attempts_ip"), models.Index(fields=["-attempted_at"], name="idx_login_attempts_time"), models.Index( fields=["username", "success", "-attempted_at"], name="idx_login_attempts_fail_check", ), ] class PasswordResetToken(models.Model): user = models.ForeignKey( "account.UserAccount", on_delete=models.CASCADE, related_name="reset_tokens", verbose_name="关联账号", ) token = models.CharField( max_length=86, unique=True, verbose_name="令牌", help_text="secrets.token_urlsafe(64) 生成(86 字符),全局唯一", ) expires_at = models.DateTimeField( verbose_name="过期时间", help_text="created_at + 30 分钟", ) is_used = models.BooleanField( default=False, verbose_name="是否已使用", help_text="使用后立即置 True,防止重放攻击", ) created_at = models.DateTimeField( auto_now_add=True, verbose_name="创建时间", ) class Meta: db_table = "password_reset_tokens" verbose_name = "密码重置令牌" verbose_name_plural = "密码重置令牌" indexes = [ models.Index(fields=["user"], name="idx_pw_reset_tokens_user"), ] def is_valid(self) -> bool: return not self.is_used and timezone.now() < self.expires_at class PasswordHistory(models.Model): user = models.ForeignKey( "account.UserAccount", on_delete=models.CASCADE, related_name="password_histories", verbose_name="关联账号", ) password_hash = models.CharField( max_length=128, verbose_name="密码哈希", help_text="PBKDF2+SHA256 哈希值", ) created_at = models.DateTimeField( auto_now_add=True, verbose_name="记录时间", help_text="密码修改时间", ) class Meta: db_table = "password_histories" verbose_name = "历史密码" verbose_name_plural = "历史密码" ordering = ["-created_at"] indexes = [ models.Index(fields=["user", "-created_at"], name="idx_pw_histories_user"), ]