Phase 1 scaffolding: config/, core/, base models, AES-256-GCM phone encryption, enums mirror apps.tenant: Tenant + Domain (django-tenants) apps.org: 11 models (OrgUnit hierarchy, Staff, audit logs) apps.account: 4 models (UserAccount as AUTH_USER_MODEL, login/password tracking) apps.permission: 7 models (RBAC + overrides + datascope + append-only changelog) apps.region: 5 models (District, BusinessArea, MetroLine, MetroStation, School) All migrations generated, manage.py check passes
152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager
|
|
from django.db import models
|
|
from django.utils import timezone
|
|
|
|
from core.enums import LoginFailureReason, UserAccountStatus
|
|
|
|
|
|
class UserAccountManager(BaseUserManager):
|
|
def create_user(self, username, password=None, **extra_fields):
|
|
if not username:
|
|
raise ValueError("username 不能为空")
|
|
user = self.model(username=username, **extra_fields)
|
|
if password:
|
|
user.set_password(password)
|
|
user.save(using=self._db)
|
|
return user
|
|
|
|
|
|
class UserAccount(AbstractBaseUser):
|
|
username = models.CharField(max_length=30)
|
|
email = models.EmailField(null=True, blank=True)
|
|
phone_enc = models.TextField(
|
|
null=True,
|
|
blank=True,
|
|
help_text="AES-256-GCM ciphertext of phone (core.encryption.PhoneEncryption).",
|
|
)
|
|
phone_hash = models.CharField(max_length=64, null=True, blank=True)
|
|
staff = models.OneToOneField(
|
|
"org.Staff",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="account",
|
|
)
|
|
is_tenant_admin = models.BooleanField(default=False)
|
|
status = models.CharField(
|
|
max_length=10,
|
|
choices=UserAccountStatus.choices,
|
|
default=UserAccountStatus.ACTIVE,
|
|
)
|
|
is_initial_password = models.BooleanField(default=True)
|
|
locked_until = models.DateTimeField(null=True, blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
created_by = models.ForeignKey(
|
|
"self",
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="created_accounts",
|
|
)
|
|
|
|
USERNAME_FIELD = "username"
|
|
REQUIRED_FIELDS: list = []
|
|
|
|
objects = UserAccountManager()
|
|
|
|
class Meta:
|
|
db_table = "user_accounts"
|
|
constraints = [
|
|
models.UniqueConstraint(fields=["username"], name="uq_user_accounts_username"),
|
|
models.UniqueConstraint(
|
|
fields=["email"],
|
|
name="uq_user_accounts_email",
|
|
condition=models.Q(email__isnull=False),
|
|
),
|
|
models.UniqueConstraint(
|
|
fields=["phone_hash"],
|
|
name="uq_user_accounts_phone",
|
|
condition=models.Q(phone_hash__isnull=False),
|
|
),
|
|
]
|
|
indexes = [
|
|
models.Index(fields=["status"], name="idx_user_accounts_status"),
|
|
models.Index(fields=["staff"], name="idx_user_accounts_staff"),
|
|
]
|
|
|
|
def __str__(self) -> str:
|
|
kind = "admin" if self.is_tenant_admin else "staff"
|
|
return f"{self.username} ({kind})"
|
|
|
|
def is_locked(self) -> bool:
|
|
if self.status != UserAccountStatus.LOCKED:
|
|
return False
|
|
if self.locked_until and timezone.now() >= self.locked_until:
|
|
return False
|
|
return True
|
|
|
|
|
|
class LoginAttempt(models.Model):
|
|
username = models.CharField(max_length=30)
|
|
ip_address = models.GenericIPAddressField()
|
|
user_agent = models.TextField(null=True, blank=True)
|
|
success = models.BooleanField()
|
|
failure_reason = models.CharField(
|
|
max_length=30,
|
|
null=True,
|
|
blank=True,
|
|
choices=LoginFailureReason.choices,
|
|
)
|
|
attempted_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
db_table = "login_attempts"
|
|
indexes = [
|
|
models.Index(fields=["username"], name="idx_login_attempts_username"),
|
|
models.Index(fields=["ip_address"], name="idx_login_attempts_ip"),
|
|
models.Index(fields=["-attempted_at"], name="idx_login_attempts_time"),
|
|
models.Index(
|
|
fields=["username", "success", "-attempted_at"],
|
|
name="idx_login_attempts_fail_check",
|
|
),
|
|
]
|
|
|
|
|
|
class PasswordResetToken(models.Model):
|
|
user = models.ForeignKey(
|
|
"account.UserAccount",
|
|
on_delete=models.CASCADE,
|
|
related_name="reset_tokens",
|
|
)
|
|
token = models.CharField(max_length=86, unique=True)
|
|
expires_at = models.DateTimeField()
|
|
is_used = models.BooleanField(default=False)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
db_table = "password_reset_tokens"
|
|
indexes = [
|
|
models.Index(fields=["user"], name="idx_pw_reset_tokens_user"),
|
|
]
|
|
|
|
def is_valid(self) -> bool:
|
|
return not self.is_used and timezone.now() < self.expires_at
|
|
|
|
|
|
class PasswordHistory(models.Model):
|
|
user = models.ForeignKey(
|
|
"account.UserAccount",
|
|
on_delete=models.CASCADE,
|
|
related_name="password_histories",
|
|
)
|
|
password_hash = models.CharField(max_length=128)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
|
|
class Meta:
|
|
db_table = "password_histories"
|
|
ordering = ["-created_at"]
|
|
indexes = [
|
|
models.Index(fields=["user", "-created_at"], name="idx_pw_histories_user"),
|
|
]
|