认证API文档
概述
邮轮穿舱件管理系统后台采用基于JWT(JSON Web Token)的认证机制,提供完整的用户认证、权限验证和令牌管理功能。系统支持多种用户角色和细粒度的权限控制,确保API访问的安全性。
认证架构
核心组件关系
flowchart TD
A[客户端] --> B[认证路由]
B --> C[用户服务]
C --> D[用户模型]
B --> E[角色服务]
E --> F[角色模型]
B --> G[授权模块]
G --> H[JWT令牌生成]
H --> I[令牌返回]
style B fill:#f9f
style G fill:#bbf
认证流程
sequenceDiagram
participant Client
participant AuthRouter
participant UserService
participant RoleService
participant Authorize
Client->>AuthRouter: POST /token
AuthRouter->>UserService: 用户认证
UserService->>UserService: 密码验证
UserService-->>AuthRouter: 用户信息
AuthRouter->>RoleService: 获取权限
RoleService-->>AuthRouter: 权限列表
AuthRouter->>Authorize: 生成JWT
Authorize-->>AuthRouter: JWT令牌
AuthRouter-->>Client: 返回令牌
认证接口
JWT令牌获取接口
接口路径: POST /token
功能描述: 使用用户名和密码获取JWT访问令牌
请求参数:
username: 用户名(支持用户名、邮箱、手机号)password: 密码
响应模型:
class OAuthBearerToken(BaseModel):
access_token: str
token_type: str
实现代码分析:
@auth_router.post("/token",
response_model=OAuthBearerToken,
name="JWT验证接口",
summary="JWT验证接口",
description="使用用户名和密码获取JWT访问令牌"
)
async def api_get_token(credentials: OAuth2PasswordRequestForm = Depends()):
# 记录访问日志
await service_create_access_log(...)
# 用户认证
user = await user_service.service_authenticate_user(
UserLoginSchema(username=credentials.username, password=credentials.password)
)
if user:
# 获取用户权限
user_scopes = await role_service.service_get_user_scopes(user.id)
# 生成JWT令牌
jwt_dict = create_jwt_token({
"uid": int(user.id),
"username": user.username,
"iss": "api-platform",
"scopes": user_scopes
})
return OAuthBearerToken(**jwt_dict)
else:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Login Failed, Invalid credentials")
参考文件: app/routers/login.py
权限系统
权限作用域定义
系统定义了细粒度的权限作用域,支持多种用户角色:
SCOPE = {
"workpiece:read": "读取工件信息",
"workpiece:write": "写入工件信息",
"task:read": "读取任务信息",
"task:write": "写入任务信息",
"log:read": "读取日志信息",
"log:write": "写入日志信息",
"image:read": "读取图像信息",
"image:write": "写入图像信息",
"system:read": "读取系统信息",
"system:write": "写入系统信息",
"inspect:access": "巡检接入",
"maintenance:access": "维修接入",
"miniapp:access": "小程序接入",
}
角色权限映射
flowchart LR
A[桌面端用户] --> A1[所有权限]
B[小程序用户] --> B1[工件/任务/图像权限]
C[巡检用户] --> C1[巡检接入权限]
D[维修用户] --> D1[维修接入权限]
style A fill:#9f9
style B fill:#99f
参考文件: app/core/authorize.py
JWT令牌管理
令牌生成
def create_jwt_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> Dict[str, str]:
"""
创建 JWT 令牌
Args:
data: 包含需要编码到令牌中的数据的字典
expires_delta: 令牌过期时间,如果未提供则使用默认值
Returns:
包含访问令牌和令牌类型的字典
"""
to_encode = data.copy()
# 设置过期时间
if expires_delta:
expire = datetime.now(timezone('Asia/Shanghai')) + expires_delta
else:
expire = datetime.now(timezone('Asia/Shanghai')) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
# 创建 JWT 令牌
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return {
"access_token": encoded_jwt,
"token_type": "bearer"
}
参考文件: app/core/authorize.py
令牌验证
async def verify_jwt_token(token: str = Depends(oauth2_scheme)) -> Dict[str, Any]:
"""
验证 JWT 令牌
Args:
token: JWT 令牌字符串
Returns:
解码后的令牌数据
Raises:
HTTPException: 如果令牌无效或已过期
"""
try:
# 解码并验证令牌
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
g.user_id = payload.get("uid")
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="JWT验证失败:无效的身份验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
参考文件: app/core/authorize.py
权限验证依赖
作用域验证
系统提供灵活的权限验证依赖项,支持AND和OR两种验证模式:
def require_scopes(scopes: list[str], mode: Literal["AND","OR"]="AND") -> Callable:
"""
依赖项,用于检查 JWT 令牌是否包含指定的作用域
Args:
scopes: 所需的作用域列表
mode: 检查模式,"AND"表示所有作用域都必须包含,"OR"表示只需包含其中一个作用域
Returns:
包含用户ID和作用域列表的字典
"""
async def _dep(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("uid")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
g.user_id = user_id
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
token_scopes = await _get_user_role_scopes(user_id)
if mode == "AND":
ok = set(scopes).issubset(token_scopes)
else: # "OR"
ok = not token_scopes.isdisjoint(scopes)
if not ok:
raise HTTPException(status_code=403, detail=f"Need {mode} of {scopes}")
return {"uid": user_id, "scopes": list(token_scopes)}
return _dep
使用示例:
# 路由级别权限验证
dependencies=[Depends(require_scopes(["system:read"], mode="AND"))]
# 函数级别权限验证
current_user=Depends(require_scopes(["system:write"], mode="AND"))
参考文件: app/core/authorize.py
用户认证服务
用户认证流程
flowchart TD
A[登录请求] --> B[用户名验证]
B --> C[邮箱验证]
C --> D[手机号验证]
D --> E[密码验证]
E --> F[用户状态检查]
F --> G[认证成功]
B --> H[认证失败]
C --> H
D --> H
E --> H
F --> H
认证实现
async def service_authenticate_user(login_data: UserLoginSchema) -> Optional[UserSchemaOut]:
"""
用户认证
:param login_data: 登录数据
:return: 认证成功的用户信息
"""
try:
# 多字段认证支持
user_by_usr = await User.filter(username=login_data.username).first()
user_by_email = await User.filter(email=login_data.username).first()
user_by_sms = await User.filter(sms=login_data.username).first()
user = None
if user_by_usr:
user = user_by_usr
elif user_by_email:
user = user_by_email
elif user_by_sms:
user = user_by_sms
else:
return None
# 密码验证
if not service_util_verify_password(login_data.password, user.password):
return None
# 用户状态检查
if user.status != UserStatus.ACTIVE.value:
return None
return UserSchemaOut(...)
except DoesNotExist:
return None
参考文件: app/service/user_service.py
密码安全
密码哈希与验证
系统使用bcrypt算法进行密码安全处理:
def service_util_hash_password(password: str) -> str:
"""
密码哈希工具函数
:param password: 明文密码
:return: 哈希后的密码
"""
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def service_util_verify_password(password: str, hashed_password: str) -> bool:
"""
验证密码工具函数
:param password: 明文密码
:param hashed_password: 哈希后的密码
:return: 密码是否匹配
"""
return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
参考文件: app/service/user_service.py
角色权限管理
权限数据结构
classDiagram
class Role {
+IntField user_id
+CharField scope_list
+CharField scope_group
+IntField created_by
+IntField updated_by
+DatetimeField created_at
+DatetimeField updated_at
}
class User {
+IntField id
+CharField username
+CharField password
+CharField email
+CharField sms
+CharField status
+CharField openid
+BooleanField is_system
+IntField created_by
+IntField updated_by
+DatetimeField created_at
+DatetimeField updated_at
}
Role --> User : user_id
权限服务操作
async def service_get_user_scopes(user_id: int) -> List[str]:
"""
获取用户权限
:param user_id: 用户ID
:return: 权限列表
"""
try:
role = await Role.filter(user_id=user_id).first()
return role.scope_list.split(",")
except DoesNotExist:
return []
参考文件: app/service/role_service.py
配置参数
JWT配置
系统通过环境变量配置JWT参数:
SECRET_KEY = settings.JWT_SECRET_KEY
ALGORITHM = settings.JWT_ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES = settings.JWT_ACCESS_EXPIRE_MINUTES
OAuth2配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", scopes=SCOPE)
参考文件: app/core/authorize.py
错误处理
认证错误
系统提供详细的错误处理机制:
- 401 Unauthorized: JWT令牌无效或过期
- 403 Forbidden: 权限不足
- 400 Bad Request: 请求参数错误
异常处理示例
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="JWT验证失败:无效的身份验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
索引
邮轮穿舱件管理系统后台的认证系统具有以下特点:
- 安全性: 采用JWT令牌和bcrypt密码哈希,确保认证安全
- 灵活性: 支持多种认证方式和权限验证模式
- 可扩展性: 模块化设计便于功能扩展和维护
- 完整性: 完整的错误处理和日志记录机制
系统通过精细的权限控制和安全的认证机制,为邮轮穿舱件管理提供了可靠的API安全保障。
主要参考文件: