跳到主要内容

邮轮穿舱件管理系统后台 - 认证与授权系统文档

概述

本文档详细分析邮轮穿舱件管理系统后台的用户认证和基于角色的访问控制(RBAC)系统。该系统采用JWT令牌认证机制,结合角色-权限映射模型,实现了细粒度的权限控制。

系统架构

核心组件关系图

flowchart TD
A[用户请求] --> B[JWT令牌验证]
B --> C{令牌有效?}
C -->|是| D[解析用户ID]
C -->|否| E[返回401错误]
D --> F[查询用户角色]
F --> G[获取权限列表]
G --> H[权限检查]
H --> I{权限满足?}
I -->|是| J[执行请求]
I -->|否| K[返回403错误]

核心模块分析

1. 认证模块 (core/authorize.py)

JWT令牌管理

系统使用PyJWT库实现JWT令牌的创建和验证:

# JWT配置参数
SECRET_KEY = settings.JWT_SECRET_KEY
ALGORITHM = settings.JWT_ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES = settings.JWT_ACCESS_EXPIRE_MINUTES

令牌创建函数

def create_jwt_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> Dict[str, str]:
"""
创建JWT令牌,支持自定义过期时间
"""
to_encode = data.copy()
expire = datetime.now(timezone('Asia/Shanghai')) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": encoded_jwt, "token_type": "bearer"}

令牌验证函数

async def verify_jwt_token(token: str = Depends(oauth2_scheme)) -> Dict[str, Any]:
"""
验证JWT令牌有效性,解析用户信息
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
g.user_id = payload.get("uid")
return payload
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="JWT验证失败")

2. 权限模型 (models/role.py)

角色权限表结构

class Role(Model):
user_id = fields.IntField(description="用户ID", null=False)
scope_list = fields.CharField(max_length=255, description="权限列表", null=True, default="")
scope_group = fields.CharField(max_length=255, description="权限组", null=True, default="")
created_by = fields.IntField(description="创建人", null=True, default=-1)
updated_by = fields.IntField(description="更新人", null=True, default=-1)
created_at = fields.DatetimeField(description="创建时间", null=False, auto_now_add=True)
updated_at = fields.DatetimeField(description="更新时间", null=False, auto_now=True)

class Meta:
table = "roles"

3. 用户模型 (models/user.py)

用户表结构

class User(Model):
id = fields.IntField(primary_key=True)
username = fields.CharField(max_length=255, description="用户名", unique=True, null=False)
password = fields.CharField(max_length=511, description="密码", null=False)
email = fields.CharField(max_length=255, description="邮箱", unique=True, null=True)
sms = fields.CharField(max_length=255, description="手机号码", null=True)
status = fields.CharField(max_length=255, description="用户状态", null=False, default=UserStatus.ACTIVE.value)
openid = fields.CharField(max_length=255, description="微信OpenID", null=True)
is_system = fields.BooleanField(description="是否系统用户", null=False, default=False)

RBAC权限系统设计

角色定义

系统预定义了四种角色类型:

ROLES = {
"desktop": "桌面端用户",
"miniapp": "小程序用户",
"inspect": "巡检用户",
"maintenance": "维修用户",
}

权限作用域定义

系统采用细粒度的权限作用域控制:

SCOPE = {
"workpiece:read": "读取工件信息",
"workpiece:write": "写入工件信息",
"task:read": "读取任务信息",
"task:write": "写入任务信息",
"log:read": "读取日志信息",
"log:write": "写入日志信息",
"image:read": "读取图像信息",
"image:write": "写入图像信息",
"system:read": "读取系统信息",
"system:write": "写入系统信息",
"inspect:access": "巡检接入",
"maintenance:access": "维修接入",
"miniapp:access": "小程序接入",
}

角色-权限映射

ROLE_SCOPES = {
"desktop": SCOPE.keys(), # 桌面端拥有所有权限
"miniapp": [
"workpiece:read", "workpiece:write", "task:read", "task:write",
"image:read", "image:write", "miniapp:access"
],
"inspect": ["inspect:access"],
"maintenance": ["maintenance:access"],
}

权限服务层 (service/role_service.py)

权限查询服务

async def service_get_user_scopes(user_id: int) -> List[str]:
"""
获取用户权限列表
"""
try:
role = await Role.filter(user_id=user_id).first()
return role.scope_list.split(",")
except DoesNotExist:
return []

权限管理服务

async def service_grant_user_a_scope(user_id: int, scope_name: str):
"""
授予用户特定权限
"""
role = await Role.filter(user_id=user_id).first()
if role:
scope_list = role.scope_list.split(",")
if scope_name not in scope_list:
scope_list.append(scope_name)
role.scope_list = ",".join(scope_list)
await role.save()
else:
await Role.create(user_id=user_id, scope_list=scope_name)
async def service_revoke_user_a_scope(user_id: int, scope_name: str):
"""
撤销用户特定权限
"""
role = await Role.filter(user_id=user_id).first()
if role:
scope_list = role.scope_list.split(",")
if scope_name in scope_list:
scope_list.remove(scope_name)
role.scope_list = ",".join(scope_list)
await role.save()

权限检查机制

作用域检查依赖项

def require_scopes(scopes: list[str], mode: Literal["AND","OR"]="AND") -> Callable:
"""
权限检查依赖项,支持AND/OR两种检查模式
"""
async def _dep(token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("uid")
token_scopes = await _get_user_role_scopes(user_id)

if mode == "AND":
ok = set(scopes).issubset(token_scopes) # 必须包含所有权限
else:
ok = not token_scopes.isdisjoint(scopes) # 只需包含任一权限

if not ok:
raise HTTPException(status_code=403, detail=f"Need {mode} of {scopes}")
return {"uid": user_id, "scopes": list(token_scopes)}
return _dep

系统架构图

classDiagram
class User {
+IntField id
+CharField username
+CharField password
+CharField email
+CharField sms
+CharField status
+CharField openid
+BooleanField is_system
}

class Role {
+IntField user_id
+CharField scope_list
+CharField scope_group
+IntField created_by
+IntField updated_by
+DatetimeField created_at
+DatetimeField updated_at
}

class AuthorizeService {
+create_jwt_token()
+verify_jwt_token()
+require_scopes()
+_get_user_role_scopes()
}

class RoleService {
+service_get_user_scopes()
+service_grant_user_a_scope()
+service_revoke_user_a_scope()
}

User "1" -- "1" Role : has
AuthorizeService --> RoleService : uses
AuthorizeService --> User : authenticates

数据流分析

认证授权流程

sequenceDiagram
participant Client
participant API
participant AuthService
participant RoleService
participant Database

Client->>API: 请求携带JWT令牌
API->>AuthService: 调用verify_jwt_token()
AuthService->>AuthService: 解码验证令牌
AuthService->>RoleService: 调用service_get_user_scopes()
RoleService->>Database: 查询用户权限
Database-->>RoleService: 返回权限列表
RoleService-->>AuthService: 返回权限信息
AuthService-->>API: 返回认证结果
API-->>Client: 返回响应或错误

权限检查模式

AND模式(严格模式)

  • 用户必须拥有所有指定的权限才能访问
  • 适用于需要多重权限验证的场景

OR模式(宽松模式)

  • 用户只需拥有任一指定的权限即可访问
  • 适用于权限分级或替代权限的场景

使用示例

路由级别权限控制

# 需要system:read权限
@app.get("/system/info", dependencies=[Depends(require_scopes(["system:read"], mode="AND"))])
async def get_system_info():
pass

# 需要任一工件相关权限
@app.post("/workpiece", dependencies=[Depends(require_scopes(["workpiece:read", "workpiece:write"], mode="OR"))])
async def create_workpiece():
pass

错误处理机制

系统实现了完善的错误处理:

  1. 401 Unauthorized - JWT令牌无效或过期
  2. 403 Forbidden - 权限不足
  3. 500 Internal Server Error - 权限服务内部错误

性能优化建议

  1. 权限缓存 - 对频繁访问的用户权限进行缓存
  2. 令牌刷新 - 实现令牌自动刷新机制
  3. 批量权限检查 - 对多个权限检查进行优化

索引

邮轮穿舱件管理系统的认证授权系统采用基于JWT和RBAC的架构,实现了:

  • 安全的用户认证机制
  • 细粒度的权限控制
  • 灵活的角色权限映射
  • 完善的错误处理
  • 可扩展的权限管理

系统设计合理,代码结构清晰,为后续功能扩展提供了良好的基础。