from django.contrib.auth.models import AbstractBaseUser, BaseUserManager from django.db import models from django.utils import timezone from core.enums import LoginFailureReason, UserAccountStatus class UserAccountManager(BaseUserManager): def create_user(self, username, password=None, **extra_fields): if not username: raise ValueError("username 不能为空") user = self.model(username=username, **extra_fields) if password: user.set_password(password) user.save(using=self._db) return user class UserAccount(AbstractBaseUser): username = models.CharField(max_length=30) email = models.EmailField(null=True, blank=True) phone_enc = models.TextField( null=True, blank=True, help_text="AES-256-GCM ciphertext of phone (core.encryption.PhoneEncryption).", ) phone_hash = models.CharField(max_length=64, null=True, blank=True) staff = models.OneToOneField( "org.Staff", null=True, blank=True, on_delete=models.SET_NULL, related_name="account", ) is_tenant_admin = models.BooleanField(default=False) status = models.CharField( max_length=10, choices=UserAccountStatus.choices, default=UserAccountStatus.ACTIVE, ) is_initial_password = models.BooleanField(default=True) locked_until = models.DateTimeField(null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) created_by = models.ForeignKey( "self", null=True, blank=True, on_delete=models.SET_NULL, related_name="created_accounts", ) USERNAME_FIELD = "username" REQUIRED_FIELDS: list = [] objects = UserAccountManager() class Meta: db_table = "user_accounts" constraints = [ models.UniqueConstraint(fields=["username"], name="uq_user_accounts_username"), models.UniqueConstraint( fields=["email"], name="uq_user_accounts_email", condition=models.Q(email__isnull=False), ), models.UniqueConstraint( fields=["phone_hash"], name="uq_user_accounts_phone", condition=models.Q(phone_hash__isnull=False), ), ] indexes = [ models.Index(fields=["status"], name="idx_user_accounts_status"), models.Index(fields=["staff"], name="idx_user_accounts_staff"), ] def __str__(self) -> str: kind = "admin" if self.is_tenant_admin else "staff" return f"{self.username} ({kind})" def is_locked(self) -> bool: if self.status != UserAccountStatus.LOCKED: return False if self.locked_until and timezone.now() >= self.locked_until: return False return True class LoginAttempt(models.Model): username = models.CharField(max_length=30) ip_address = models.GenericIPAddressField() user_agent = models.TextField(null=True, blank=True) success = models.BooleanField() failure_reason = models.CharField( max_length=30, null=True, blank=True, choices=LoginFailureReason.choices, ) attempted_at = models.DateTimeField(auto_now_add=True) class Meta: db_table = "login_attempts" indexes = [ models.Index(fields=["username"], name="idx_login_attempts_username"), models.Index(fields=["ip_address"], name="idx_login_attempts_ip"), models.Index(fields=["-attempted_at"], name="idx_login_attempts_time"), models.Index( fields=["username", "success", "-attempted_at"], name="idx_login_attempts_fail_check", ), ] class PasswordResetToken(models.Model): user = models.ForeignKey( "account.UserAccount", on_delete=models.CASCADE, related_name="reset_tokens", ) token = models.CharField(max_length=86, unique=True) expires_at = models.DateTimeField() is_used = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) class Meta: db_table = "password_reset_tokens" indexes = [ models.Index(fields=["user"], name="idx_pw_reset_tokens_user"), ] def is_valid(self) -> bool: return not self.is_used and timezone.now() < self.expires_at class PasswordHistory(models.Model): user = models.ForeignKey( "account.UserAccount", on_delete=models.CASCADE, related_name="password_histories", ) password_hash = models.CharField(max_length=128) created_at = models.DateTimeField(auto_now_add=True) class Meta: db_table = "password_histories" ordering = ["-created_at"] indexes = [ models.Index(fields=["user", "-created_at"], name="idx_pw_histories_user"), ]