权限控制:RBAC 与 ABAC 实践
权限控制(Authorization)是确定"用户能做什么"的核心安全机制。在企业应用中,选择合适的权限模型直接影响系统的安全性、可维护性和扩展性。本章深入讲解两种主流权限模型:基于角色的访问控制(RBAC)和基于属性的访问控制(ABAC)。
认证 vs 授权:再次明确
在深入权限模型之前,让我们再次明确两者的区别:
| 维度 | 认证 (Authentication) | 授权 (Authorization) |
|---|---|---|
| 问题 | 你是谁? | 你能做什么? |
| 输入 | 凭据(密码、令牌、生物特征) | 身份 + 请求上下文 |
| 输出 | 用户身份标识 | 允许/拒绝决策 |
| 时机 | 先发生 | 后发生(认证成功后) |
| HTTP 状态码 | 401 Unauthorized | 403 Forbidden |
权限控制发生在认证之后,系统根据用户身份和请求上下文判断是否允许执行操作。
RBAC:基于角色的访问控制
RBAC(Role-Based Access Control)是最广泛应用的权限模型,核心思想是通过角色作为用户和权限之间的桥梁。
核心概念
用户 (User) ──┬── 角色 (Role) ──┬── 权限 (Permission)
│ │
│ ├── 读取文档
│ └── 删除文档
│
└── 管理员 ──────────── 所有权限
编辑 ────────────── 编辑文档
查看者 ──────────── 只读权限
关键元素:
- 用户(User):系统的使用者
- 角色(Role):权限的集合,代表一类职能
- 权限(Permission):对资源的操作能力
- 会话(Session):用户到角色的映射
RBAC 的层次
RBAC 有四个层次:
| 层次 | 名称 | 特点 |
|---|---|---|
| RBAC0 | 基础 RBAC | 用户-角色-权限三层结构 |
| RBAC1 | 角色继承 | 支持角色继承层次 |
| RBAC2 | 约束 RBAC | 添加职责分离等约束 |
| RBAC3 | 统一 RBAC | RBAC1 + RBAC2 |
数据库设计
-- 用户表
CREATE TABLE users (
id UUID PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 角色表
CREATE TABLE roles (
id UUID PRIMARY KEY,
name VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 权限表
CREATE TABLE permissions (
id UUID PRIMARY KEY,
resource VARCHAR(100) NOT NULL, -- 资源:user, document, order
action VARCHAR(50) NOT NULL, -- 操作:create, read, update, delete
description TEXT,
UNIQUE(resource, action)
);
-- 用户-角色关联表(多对多)
CREATE TABLE user_roles (
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, role_id)
);
-- 角色-权限关联表(多对多)
CREATE TABLE role_permissions (
role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
permission_id UUID REFERENCES permissions(id) ON DELETE CASCADE,
PRIMARY KEY (role_id, permission_id)
);
-- 角色继承表(支持 RBAC1)
CREATE TABLE role_hierarchy (
parent_role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
child_role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
PRIMARY KEY (parent_role_id, child_role_id)
);
代码实现
Node.js 实现
// 权限检查中间件
class RBAC {
constructor(db) {
this.db = db;
}
// 获取用户的所有权限
async getUserPermissions(userId) {
const query = `
SELECT DISTINCT p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN user_roles ur ON rp.role_id = ur.role_id
WHERE ur.user_id = $1
`;
const result = await this.db.query(query, [userId]);
return result.rows;
}
// 检查用户是否有特定权限
async hasPermission(userId, resource, action) {
const permissions = await this.getUserPermissions(userId);
return permissions.some(p => p.resource === resource && p.action === action);
}
// 授权中间件
requirePermission(resource, action) {
return async (req, res, next) => {
if (!req.user) {
return res.status(401).json({ error: '未认证' });
}
const hasAccess = await this.hasPermission(req.user.id, resource, action);
if (!hasAccess) {
return res.status(403).json({ error: '权限不足' });
}
next();
};
}
// 角色检查中间件
requireRole(roleName) {
return async (req, res, next) => {
if (!req.user) {
return res.status(401).json({ error: '未认证' });
}
const query = `
SELECT 1 FROM user_roles ur
JOIN roles r ON ur.role_id = r.id
WHERE ur.user_id = $1 AND r.name = $2
`;
const result = await this.db.query(query, [req.user.id, roleName]);
if (result.rows.length === 0) {
return res.status(403).json({ error: '角色不足' });
}
next();
};
}
}
// 使用示例
const rbac = new RBAC(db);
// 检查权限
app.delete('/documents/:id',
rbac.requirePermission('document', 'delete'),
deleteDocument
);
// 检查角色
app.get('/admin/dashboard',
rbac.requireRole('admin'),
adminDashboard
);
Python 实现
from functools import wraps
from flask import request, g, jsonify
from typing import List, Tuple
class RBAC:
def __init__(self, db):
self.db = db
def get_user_permissions(self, user_id: str) -> List[Tuple[str, str]]:
"""获取用户的所有权限"""
query = """
SELECT DISTINCT p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN user_roles ur ON rp.role_id = ur.role_id
WHERE ur.user_id = %s
"""
return self.db.execute(query, (user_id,))
def has_permission(self, user_id: str, resource: str, action: str) -> bool:
"""检查用户是否有特定权限"""
permissions = self.get_user_permissions(user_id)
return any(p[0] == resource and p[1] == action for p in permissions)
def has_role(self, user_id: str, role_name: str) -> bool:
"""检查用户是否有特定角色"""
query = """
SELECT 1 FROM user_roles ur
JOIN roles r ON ur.role_id = r.id
WHERE ur.user_id = %s AND r.name = %s
"""
result = self.db.execute(query, (user_id, role_name))
return len(result) > 0
def require_permission(self, resource: str, action: str):
"""权限检查装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': '未认证'}), 401
if not self.has_permission(g.user.id, resource, action):
return jsonify({'error': '权限不足'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
def require_role(self, role_name: str):
"""角色检查装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': '未认证'}), 401
if not self.has_role(g.user.id, role_name):
return jsonify({'error': '角色不足'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# 使用示例
rbac = RBAC(db)
@app.route('/documents/<id>', methods=['DELETE'])
@rbac.require_permission('document', 'delete')
def delete_document(id):
# 删除文档逻辑
pass
@app.route('/admin/dashboard')
@rbac.require_role('admin')
def admin_dashboard():
# 管理员面板
pass
角色继承(RBAC1)
// 获取用户权限(支持角色继承)
async getUserPermissionsWithInheritance(userId) {
const query = `
WITH RECURSIVE role_tree AS (
-- 用户直接分配的角色
SELECT r.id, r.name
FROM roles r
JOIN user_roles ur ON r.id = ur.role_id
WHERE ur.user_id = $1
UNION
-- 通过继承获得的角色
SELECT r.id, r.name
FROM roles r
JOIN role_hierarchy rh ON r.id = rh.parent_role_id
JOIN role_tree rt ON rh.child_role_id = rt.id
)
SELECT DISTINCT p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN role_tree rt ON rp.role_id = rt.id
`;
const result = await this.db.query(query, [userId]);
return result.rows;
}
职责分离(RBAC2)
职责分离确保关键操作需要多个角色共同授权,防止单人滥用权限。
# 静态职责分离:同一用户不能同时拥有冲突角色
CONFLICTING_ROLES = [
('admin', 'auditor'), # 管理员和审计员不能同时拥有
('approver', 'requester'), # 审批人和申请人不能同时拥有
]
def assign_role(user_id: str, role_name: str):
"""分配角色时检查职责分离约束"""
user_roles = get_user_roles(user_id)
for r1, r2 in CONFLICTING_ROLES:
if role_name == r1 and r2 in user_roles:
raise ValueError(f'职责冲突:不能同时拥有 {r1} 和 {r2}')
if role_name == r2 and r1 in user_roles:
raise ValueError(f'职责冲突:不能同时拥有 {r1} 和 {r2}')
# 执行角色分配
do_assign_role(user_id, role_name)
# 动态职责分离:敏感操作需要多角色会签
def approve_high_value_transaction(transaction_id: str):
"""高价值交易需要多角色审批"""
approvals = get_transaction_approvals(transaction_id)
required_roles = {'finance_manager', 'risk_officer'}
approved_roles = {a.role for a in approvals}
if not required_roles.issubset(approved_roles):
raise PermissionError('需要财务经理和风控官双重审批')
execute_transaction(transaction_id)
ABAC:基于属性的访问控制
ABAC(Attribute-Based Access Control)根据用户属性、资源属性、环境属性动态计算权限,是零信任架构的核心。
核心概念
决策 = Policy(User, Resource, Action, Environment)
Policy 示例:
用户.部门 == "财务部" AND
资源.敏感级别 <= 用户.安全许可 AND
环境.时间 IN 工作时间 AND
环境.IP IN 内网IP范围
关键元素:
- 主体属性(Subject Attributes):用户相关属性(部门、职级、安全许可)
- 资源属性(Resource Attributes):资源相关属性(敏感级别、所有者、部门)
- 动作属性(Action Attributes):操作相关属性(读、写、删除)
- 环境属性(Environment Attributes):上下文信息(时间、IP、设备)
与 RBAC 对比
| 特性 | RBAC | ABAC |
|---|---|---|
| 灵活性 | 较低,基于固定角色 | 高,基于属性组合 |
| 粒度 | 角色级别 | 属性级别 |
| 管理复杂度 | 低,角色-权限映射清晰 | 高,策略规则复杂 |
| 动态决策 | 不支持 | 支持基于上下文动态决策 |
| 适用场景 | 组织结构清晰、权限稳定 | 权限规则复杂、需要动态决策 |
策略语言(XACML 风格)
<Policy>
<Rule Effect="Permit">
<Target>
<Action>read</Action>
<Resource>financial-report</Resource>
</Target>
<Condition>
<And>
<!-- 用户部门必须是财务部 -->
<Apply FunctionId="string-equal">
<AttributeDesignator AttributeId="user.department"/>
<AttributeValue>财务部</AttributeValue>
</Apply>
<!-- 资源敏感级别不能高于用户安全许可 -->
<Apply FunctionId="integer-less-than-or-equal">
<AttributeDesignator AttributeId="resource.sensitivity"/>
<AttributeDesignator AttributeId="user.clearance"/>
</Apply>
<!-- 必须在工作时间 -->
<Apply FunctionId="time-in-range">
<AttributeDesignator AttributeId="environment.time"/>
<AttributeValue>09:00:00</AttributeValue>
<AttributeValue>18:00:00</AttributeValue>
</Apply>
</And>
</Condition>
</Rule>
</Policy>
代码实现
策略引擎
from dataclasses import dataclass
from typing import Any, Dict, List
from enum import Enum
import ipaddress
from datetime import time
class Effect(Enum):
PERMIT = "permit"
DENY = "deny"
@dataclass
class Policy:
"""ABAC 策略"""
name: str
effect: Effect
conditions: List[dict] # 条件列表
def evaluate(self, subject: dict, resource: dict,
action: str, environment: dict) -> bool:
"""评估策略是否满足"""
for condition in self.conditions:
if not self._evaluate_condition(condition, subject, resource, action, environment):
return False
return True
def _evaluate_condition(self, condition: dict,
subject: dict, resource: dict,
action: str, environment: dict) -> bool:
"""评估单个条件"""
attr_type = condition['attribute_type'] # subject, resource, environment
attr_name = condition['attribute_name']
operator = condition['operator']
expected_value = condition['value']
# 获取属性值
if attr_type == 'subject':
actual_value = subject.get(attr_name)
elif attr_type == 'resource':
actual_value = resource.get(attr_name)
elif attr_type == 'environment':
actual_value = environment.get(attr_name)
elif attr_type == 'action':
actual_value = action
else:
return False
# 应用操作符
return self._apply_operator(actual_value, operator, expected_value)
def _apply_operator(self, actual: Any, operator: str, expected: Any) -> bool:
"""应用比较操作符"""
operators = {
'eq': lambda a, e: a == e,
'ne': lambda a, e: a != e,
'in': lambda a, e: a in e,
'not_in': lambda a, e: a not in e,
'gt': lambda a, e: a > e,
'gte': lambda a, e: a >= e,
'lt': lambda a, e: a < e,
'lte': lambda a, e: a <= e,
'contains': lambda a, e: e in a if a else False,
'starts_with': lambda a, e: a.startswith(e) if a else False,
'ip_in_range': self._ip_in_range,
'time_in_range': self._time_in_range,
}
func = operators.get(operator)
if not func:
return False
try:
return func(actual, expected)
except Exception:
return False
@staticmethod
def _ip_in_range(actual: str, expected: str) -> bool:
"""检查 IP 是否在范围内"""
try:
ip = ipaddress.ip_address(actual)
network = ipaddress.ip_network(expected, strict=False)
return ip in network
except ValueError:
return False
@staticmethod
def _time_in_range(actual: time, expected: List[str]) -> bool:
"""检查时间是否在范围内"""
start = time.fromisoformat(expected[0])
end = time.fromisoformat(expected[1])
return start <= actual <= end
class ABACEngine:
"""ABAC 决策引擎"""
def __init__(self):
self.policies: List[Policy] = []
def add_policy(self, policy: Policy):
"""添加策略"""
self.policies.append(policy)
def evaluate(self, subject: dict, resource: dict,
action: str, environment: dict) -> Effect:
"""
评估访问请求
决策逻辑:
1. 如果有任何 DENY 策略匹配,返回 DENY
2. 如果有任何 PERMIT 策略匹配,返回 PERMIT
3. 默认返回 DENY
"""
# 先检查拒绝策略
for policy in self.policies:
if policy.effect == Effect.DENY:
if policy.evaluate(subject, resource, action, environment):
return Effect.DENY
# 再检查允许策略
for policy in self.policies:
if policy.effect == Effect.PERMIT:
if policy.evaluate(subject, resource, action, environment):
return Effect.PERMIT
# 默认拒绝
return Effect.DENY
使用示例
# 创建 ABAC 引擎
engine = ABACEngine()
# 添加策略:只有财务部员工在工作时间可以查看财务报告
engine.add_policy(Policy(
name="财务报告查看权限",
effect=Effect.PERMIT,
conditions=[
{
'attribute_type': 'subject',
'attribute_name': 'department',
'operator': 'eq',
'value': '财务部'
},
{
'attribute_type': 'resource',
'attribute_name': 'type',
'operator': 'eq',
'value': 'financial_report'
},
{
'attribute_type': 'action',
'attribute_name': 'action',
'operator': 'eq',
'value': 'read'
},
{
'attribute_type': 'environment',
'attribute_name': 'time',
'operator': 'time_in_range',
'value': ['09:00:00', '18:00:00']
}
]
))
# 添加策略:高敏感资源需要高安全许可
engine.add_policy(Policy(
name="敏感资源访问限制",
effect=Effect.PERMIT,
conditions=[
{
'attribute_type': 'subject',
'attribute_name': 'clearance_level',
'operator': 'gte',
'value': 3
},
{
'attribute_type': 'resource',
'attribute_name': 'sensitivity',
'operator': 'lte',
'value': 3
}
]
))
# 添加拒绝策略:外部网络禁止访问核心资源
engine.add_policy(Policy(
name="外网访问限制",
effect=Effect.DENY,
conditions=[
{
'attribute_type': 'environment',
'attribute_name': 'ip',
'operator': 'not_in',
'value': ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
},
{
'attribute_type': 'resource',
'attribute_name': 'classification',
'operator': 'eq',
'value': 'core'
}
]
))
# 评估访问请求
subject = {
'id': 'user-123',
'department': '财务部',
'clearance_level': 3
}
resource = {
'id': 'doc-456',
'type': 'financial_report',
'sensitivity': 2,
'classification': 'internal'
}
action = 'read'
environment = {
'time': time(14, 30), # 下午 2:30
'ip': '192.168.1.100',
'device': 'laptop'
}
result = engine.evaluate(subject, resource, action, environment)
print(result) # Effect.PERMIT
Node.js 实现
class ABACEngine {
constructor() {
this.policies = [];
}
addPolicy(policy) {
this.policies.push(policy);
}
evaluate(subject, resource, action, environment) {
// 先检查拒绝策略
for (const policy of this.policies) {
if (policy.effect === 'deny' && this.evaluatePolicy(policy, subject, resource, action, environment)) {
return 'deny';
}
}
// 再检查允许策略
for (const policy of this.policies) {
if (policy.effect === 'permit' && this.evaluatePolicy(policy, subject, resource, action, environment)) {
return 'permit';
}
}
return 'deny';
}
evaluatePolicy(policy, subject, resource, action, environment) {
for (const condition of policy.conditions) {
if (!this.evaluateCondition(condition, subject, resource, action, environment)) {
return false;
}
}
return true;
}
evaluateCondition(condition, subject, resource, action, environment) {
let actualValue;
switch (condition.attributeType) {
case 'subject':
actualValue = subject[condition.attributeName];
break;
case 'resource':
actualValue = resource[condition.attributeName];
break;
case 'environment':
actualValue = environment[condition.attributeName];
break;
case 'action':
actualValue = action;
break;
}
return this.applyOperator(actualValue, condition.operator, condition.value);
}
applyOperator(actual, operator, expected) {
const operators = {
'eq': (a, e) => a === e,
'ne': (a, e) => a !== e,
'in': (a, e) => e.includes(a),
'not_in': (a, e) => !e.includes(a),
'gt': (a, e) => a > e,
'gte': (a, e) => a >= e,
'lt': (a, e) => a < e,
'lte': (a, e) => a <= e,
'contains': (a, e) => a && a.includes(e),
};
return operators[operator]?.(actual, expected) ?? false;
}
// Express 中间件
middleware() {
return async (req, res, next) => {
const subject = {
id: req.user.id,
department: req.user.department,
clearanceLevel: req.user.clearanceLevel,
};
const resource = {
type: req.params.resourceType,
id: req.params.resourceId,
// 从数据库获取资源属性
...(await this.getResourceAttributes(req.params.resourceId)),
};
const action = this.mapMethodToAction(req.method);
const environment = {
ip: req.ip,
time: new Date(),
userAgent: req.headers['user-agent'],
};
const result = this.evaluate(subject, resource, action, environment);
if (result === 'permit') {
next();
} else {
res.status(403).json({ error: '访问被拒绝' });
}
};
}
}
module.exports = ABACEngine;
ReBAC:基于关系的访问控制
ReBAC(Relationship-Based Access Control)通过实体间的关系判断权限,特别适合社交网络、协作工具等场景。
核心概念
用户 A ──关注──> 用户 B ──拥有──> 文档 C
规则:用户可以查看他关注的用户创建的文档
决策:A 是否可以查看 C?→ 检查 A 是否关注 B → 检查 B 是否拥有 C
实现示例
# 使用图数据库(如 Neo4j)存储关系
# 或者使用关系数据库存储
class ReBACEngine:
"""基于关系的访问控制引擎"""
def __init__(self, db):
self.db = db
def check_relation(self, subject: str, relation: str, object_id: str) -> bool:
"""检查主体和客体之间是否存在指定关系"""
query = """
SELECT 1 FROM relations
WHERE subject_id = %s AND relation = %s AND object_id = %s
"""
result = self.db.execute(query, (subject, relation, object_id))
return len(result) > 0
def check_permission(self, user_id: str, permission: str, resource_id: str) -> bool:
"""检查用户是否有权限访问资源"""
# 规则:用户是资源的所有者
if self.check_relation(user_id, 'owner', resource_id):
return True
# 规则:用户是资源所在组的成员
query = """
SELECT 1 FROM relations r1
JOIN relations r2 ON r1.object_id = r2.subject_id
WHERE r1.subject_id = %s
AND r1.relation = 'member_of'
AND r2.relation = 'contains'
AND r2.object_id = %s
"""
result = self.db.execute(query, (user_id, resource_id))
if result:
return True
# 规则:用户关注的人拥有该资源
query = """
SELECT 1 FROM relations r1
JOIN relations r2 ON r1.object_id = r2.subject_id
WHERE r1.subject_id = %s
AND r1.relation = 'follows'
AND r2.relation = 'owner'
AND r2.object_id = %s
"""
result = self.db.execute(query, (user_id, resource_id))
return len(result) > 0
混合模型实践
在实际应用中,通常结合多种模型:
class HybridAccessControl {
constructor(rbac, abac, rebac) {
this.rbac = rbac;
this.abac = abac;
this.rebac = rebac;
}
async checkAccess(user, resource, action, context) {
// 1. 先检查 RBAC(快速过滤)
const hasRolePermission = await this.rbac.hasPermission(user.id, resource.type, action);
if (!hasRolePermission) {
// 检查是否有基础角色
if (!await this.rbac.hasRole(user.id, 'user')) {
return { allowed: false, reason: 'no_base_role' };
}
}
// 2. 检查 ABAC(细粒度控制)
const abacResult = this.abac.evaluate(
{ id: user.id, department: user.department, clearance: user.clearance },
{ id: resource.id, type: resource.type, sensitivity: resource.sensitivity },
action,
{ time: new Date(), ip: context.ip }
);
if (abacResult === 'deny') {
return { allowed: false, reason: 'abac_denied' };
}
// 3. 检查 ReBAC(关系检查)
if (resource.requiresOwnership) {
const hasRelation = await this.rebac.checkRelation(user.id, 'owner', resource.id);
if (!hasRelation) {
return { allowed: false, reason: 'no_ownership' };
}
}
return { allowed: true };
}
}
选型决策指南
| 场景 | 推荐模型 |
|---|---|
| 组织结构清晰、权限稳定 | RBAC |
| 权限规则复杂、需要动态决策 | ABAC |
| 社交网络、协作工具 | ReBAC |
| 企业应用、需要细粒度控制 | RBAC + ABAC 混合 |
| 零信任架构 | ABAC |
参考资料
- NIST RBAC 标准
- XACML 规范
- Google Zanzibar 论文 - ReBAC 实践
- OWASP 访问控制备忘单