跳到主要内容

权限控制:RBAC 与 ABAC 实践

权限控制(Authorization)是确定"用户能做什么"的核心安全机制。在企业应用中,选择合适的权限模型直接影响系统的安全性、可维护性和扩展性。本章深入讲解两种主流权限模型:基于角色的访问控制(RBAC)和基于属性的访问控制(ABAC)。

认证 vs 授权:再次明确

在深入权限模型之前,让我们再次明确两者的区别:

维度认证 (Authentication)授权 (Authorization)
问题你是谁?你能做什么?
输入凭据(密码、令牌、生物特征)身份 + 请求上下文
输出用户身份标识允许/拒绝决策
时机先发生后发生(认证成功后)
HTTP 状态码401 Unauthorized403 Forbidden

权限控制发生在认证之后,系统根据用户身份和请求上下文判断是否允许执行操作。

RBAC:基于角色的访问控制

RBAC(Role-Based Access Control)是最广泛应用的权限模型,核心思想是通过角色作为用户和权限之间的桥梁。

核心概念

用户 (User) ──┬── 角色 (Role) ──┬── 权限 (Permission)
│ │
│ ├── 读取文档
│ └── 删除文档

└── 管理员 ──────────── 所有权限
编辑 ────────────── 编辑文档
查看者 ──────────── 只读权限

关键元素

  • 用户(User):系统的使用者
  • 角色(Role):权限的集合,代表一类职能
  • 权限(Permission):对资源的操作能力
  • 会话(Session):用户到角色的映射

RBAC 的层次

RBAC 有四个层次:

层次名称特点
RBAC0基础 RBAC用户-角色-权限三层结构
RBAC1角色继承支持角色继承层次
RBAC2约束 RBAC添加职责分离等约束
RBAC3统一 RBACRBAC1 + RBAC2

数据库设计

-- 用户表
CREATE TABLE users (
id UUID PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 角色表
CREATE TABLE roles (
id UUID PRIMARY KEY,
name VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 权限表
CREATE TABLE permissions (
id UUID PRIMARY KEY,
resource VARCHAR(100) NOT NULL, -- 资源:user, document, order
action VARCHAR(50) NOT NULL, -- 操作:create, read, update, delete
description TEXT,
UNIQUE(resource, action)
);

-- 用户-角色关联表(多对多)
CREATE TABLE user_roles (
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, role_id)
);

-- 角色-权限关联表(多对多)
CREATE TABLE role_permissions (
role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
permission_id UUID REFERENCES permissions(id) ON DELETE CASCADE,
PRIMARY KEY (role_id, permission_id)
);

-- 角色继承表(支持 RBAC1)
CREATE TABLE role_hierarchy (
parent_role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
child_role_id UUID REFERENCES roles(id) ON DELETE CASCADE,
PRIMARY KEY (parent_role_id, child_role_id)
);

代码实现

Node.js 实现

// 权限检查中间件
class RBAC {
constructor(db) {
this.db = db;
}

// 获取用户的所有权限
async getUserPermissions(userId) {
const query = `
SELECT DISTINCT p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN user_roles ur ON rp.role_id = ur.role_id
WHERE ur.user_id = $1
`;

const result = await this.db.query(query, [userId]);
return result.rows;
}

// 检查用户是否有特定权限
async hasPermission(userId, resource, action) {
const permissions = await this.getUserPermissions(userId);
return permissions.some(p => p.resource === resource && p.action === action);
}

// 授权中间件
requirePermission(resource, action) {
return async (req, res, next) => {
if (!req.user) {
return res.status(401).json({ error: '未认证' });
}

const hasAccess = await this.hasPermission(req.user.id, resource, action);

if (!hasAccess) {
return res.status(403).json({ error: '权限不足' });
}

next();
};
}

// 角色检查中间件
requireRole(roleName) {
return async (req, res, next) => {
if (!req.user) {
return res.status(401).json({ error: '未认证' });
}

const query = `
SELECT 1 FROM user_roles ur
JOIN roles r ON ur.role_id = r.id
WHERE ur.user_id = $1 AND r.name = $2
`;

const result = await this.db.query(query, [req.user.id, roleName]);

if (result.rows.length === 0) {
return res.status(403).json({ error: '角色不足' });
}

next();
};
}
}

// 使用示例
const rbac = new RBAC(db);

// 检查权限
app.delete('/documents/:id',
rbac.requirePermission('document', 'delete'),
deleteDocument
);

// 检查角色
app.get('/admin/dashboard',
rbac.requireRole('admin'),
adminDashboard
);

Python 实现

from functools import wraps
from flask import request, g, jsonify
from typing import List, Tuple

class RBAC:
def __init__(self, db):
self.db = db

def get_user_permissions(self, user_id: str) -> List[Tuple[str, str]]:
"""获取用户的所有权限"""
query = """
SELECT DISTINCT p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN user_roles ur ON rp.role_id = ur.role_id
WHERE ur.user_id = %s
"""
return self.db.execute(query, (user_id,))

def has_permission(self, user_id: str, resource: str, action: str) -> bool:
"""检查用户是否有特定权限"""
permissions = self.get_user_permissions(user_id)
return any(p[0] == resource and p[1] == action for p in permissions)

def has_role(self, user_id: str, role_name: str) -> bool:
"""检查用户是否有特定角色"""
query = """
SELECT 1 FROM user_roles ur
JOIN roles r ON ur.role_id = r.id
WHERE ur.user_id = %s AND r.name = %s
"""
result = self.db.execute(query, (user_id, role_name))
return len(result) > 0

def require_permission(self, resource: str, action: str):
"""权限检查装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': '未认证'}), 401

if not self.has_permission(g.user.id, resource, action):
return jsonify({'error': '权限不足'}), 403

return f(*args, **kwargs)
return decorated_function
return decorator

def require_role(self, role_name: str):
"""角色检查装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not g.user:
return jsonify({'error': '未认证'}), 401

if not self.has_role(g.user.id, role_name):
return jsonify({'error': '角色不足'}), 403

return f(*args, **kwargs)
return decorated_function
return decorator


# 使用示例
rbac = RBAC(db)

@app.route('/documents/<id>', methods=['DELETE'])
@rbac.require_permission('document', 'delete')
def delete_document(id):
# 删除文档逻辑
pass

@app.route('/admin/dashboard')
@rbac.require_role('admin')
def admin_dashboard():
# 管理员面板
pass

角色继承(RBAC1)

// 获取用户权限(支持角色继承)
async getUserPermissionsWithInheritance(userId) {
const query = `
WITH RECURSIVE role_tree AS (
-- 用户直接分配的角色
SELECT r.id, r.name
FROM roles r
JOIN user_roles ur ON r.id = ur.role_id
WHERE ur.user_id = $1

UNION

-- 通过继承获得的角色
SELECT r.id, r.name
FROM roles r
JOIN role_hierarchy rh ON r.id = rh.parent_role_id
JOIN role_tree rt ON rh.child_role_id = rt.id
)
SELECT DISTINCT p.resource, p.action
FROM permissions p
JOIN role_permissions rp ON p.id = rp.permission_id
JOIN role_tree rt ON rp.role_id = rt.id
`;

const result = await this.db.query(query, [userId]);
return result.rows;
}

职责分离(RBAC2)

职责分离确保关键操作需要多个角色共同授权,防止单人滥用权限。

# 静态职责分离:同一用户不能同时拥有冲突角色
CONFLICTING_ROLES = [
('admin', 'auditor'), # 管理员和审计员不能同时拥有
('approver', 'requester'), # 审批人和申请人不能同时拥有
]

def assign_role(user_id: str, role_name: str):
"""分配角色时检查职责分离约束"""
user_roles = get_user_roles(user_id)

for r1, r2 in CONFLICTING_ROLES:
if role_name == r1 and r2 in user_roles:
raise ValueError(f'职责冲突:不能同时拥有 {r1}{r2}')
if role_name == r2 and r1 in user_roles:
raise ValueError(f'职责冲突:不能同时拥有 {r1}{r2}')

# 执行角色分配
do_assign_role(user_id, role_name)


# 动态职责分离:敏感操作需要多角色会签
def approve_high_value_transaction(transaction_id: str):
"""高价值交易需要多角色审批"""
approvals = get_transaction_approvals(transaction_id)

required_roles = {'finance_manager', 'risk_officer'}
approved_roles = {a.role for a in approvals}

if not required_roles.issubset(approved_roles):
raise PermissionError('需要财务经理和风控官双重审批')

execute_transaction(transaction_id)

ABAC:基于属性的访问控制

ABAC(Attribute-Based Access Control)根据用户属性、资源属性、环境属性动态计算权限,是零信任架构的核心。

核心概念

决策 = Policy(User, Resource, Action, Environment)

Policy 示例:
用户.部门 == "财务部" AND
资源.敏感级别 <= 用户.安全许可 AND
环境.时间 IN 工作时间 AND
环境.IP IN 内网IP范围

关键元素

  • 主体属性(Subject Attributes):用户相关属性(部门、职级、安全许可)
  • 资源属性(Resource Attributes):资源相关属性(敏感级别、所有者、部门)
  • 动作属性(Action Attributes):操作相关属性(读、写、删除)
  • 环境属性(Environment Attributes):上下文信息(时间、IP、设备)

与 RBAC 对比

特性RBACABAC
灵活性较低,基于固定角色高,基于属性组合
粒度角色级别属性级别
管理复杂度低,角色-权限映射清晰高,策略规则复杂
动态决策不支持支持基于上下文动态决策
适用场景组织结构清晰、权限稳定权限规则复杂、需要动态决策

策略语言(XACML 风格)

<Policy>
<Rule Effect="Permit">
<Target>
<Action>read</Action>
<Resource>financial-report</Resource>
</Target>
<Condition>
<And>
<!-- 用户部门必须是财务部 -->
<Apply FunctionId="string-equal">
<AttributeDesignator AttributeId="user.department"/>
<AttributeValue>财务部</AttributeValue>
</Apply>
<!-- 资源敏感级别不能高于用户安全许可 -->
<Apply FunctionId="integer-less-than-or-equal">
<AttributeDesignator AttributeId="resource.sensitivity"/>
<AttributeDesignator AttributeId="user.clearance"/>
</Apply>
<!-- 必须在工作时间 -->
<Apply FunctionId="time-in-range">
<AttributeDesignator AttributeId="environment.time"/>
<AttributeValue>09:00:00</AttributeValue>
<AttributeValue>18:00:00</AttributeValue>
</Apply>
</And>
</Condition>
</Rule>
</Policy>

代码实现

策略引擎

from dataclasses import dataclass
from typing import Any, Dict, List
from enum import Enum
import ipaddress
from datetime import time

class Effect(Enum):
PERMIT = "permit"
DENY = "deny"

@dataclass
class Policy:
"""ABAC 策略"""
name: str
effect: Effect
conditions: List[dict] # 条件列表

def evaluate(self, subject: dict, resource: dict,
action: str, environment: dict) -> bool:
"""评估策略是否满足"""
for condition in self.conditions:
if not self._evaluate_condition(condition, subject, resource, action, environment):
return False
return True

def _evaluate_condition(self, condition: dict,
subject: dict, resource: dict,
action: str, environment: dict) -> bool:
"""评估单个条件"""
attr_type = condition['attribute_type'] # subject, resource, environment
attr_name = condition['attribute_name']
operator = condition['operator']
expected_value = condition['value']

# 获取属性值
if attr_type == 'subject':
actual_value = subject.get(attr_name)
elif attr_type == 'resource':
actual_value = resource.get(attr_name)
elif attr_type == 'environment':
actual_value = environment.get(attr_name)
elif attr_type == 'action':
actual_value = action
else:
return False

# 应用操作符
return self._apply_operator(actual_value, operator, expected_value)

def _apply_operator(self, actual: Any, operator: str, expected: Any) -> bool:
"""应用比较操作符"""
operators = {
'eq': lambda a, e: a == e,
'ne': lambda a, e: a != e,
'in': lambda a, e: a in e,
'not_in': lambda a, e: a not in e,
'gt': lambda a, e: a > e,
'gte': lambda a, e: a >= e,
'lt': lambda a, e: a < e,
'lte': lambda a, e: a <= e,
'contains': lambda a, e: e in a if a else False,
'starts_with': lambda a, e: a.startswith(e) if a else False,
'ip_in_range': self._ip_in_range,
'time_in_range': self._time_in_range,
}

func = operators.get(operator)
if not func:
return False

try:
return func(actual, expected)
except Exception:
return False

@staticmethod
def _ip_in_range(actual: str, expected: str) -> bool:
"""检查 IP 是否在范围内"""
try:
ip = ipaddress.ip_address(actual)
network = ipaddress.ip_network(expected, strict=False)
return ip in network
except ValueError:
return False

@staticmethod
def _time_in_range(actual: time, expected: List[str]) -> bool:
"""检查时间是否在范围内"""
start = time.fromisoformat(expected[0])
end = time.fromisoformat(expected[1])
return start <= actual <= end


class ABACEngine:
"""ABAC 决策引擎"""

def __init__(self):
self.policies: List[Policy] = []

def add_policy(self, policy: Policy):
"""添加策略"""
self.policies.append(policy)

def evaluate(self, subject: dict, resource: dict,
action: str, environment: dict) -> Effect:
"""
评估访问请求

决策逻辑:
1. 如果有任何 DENY 策略匹配,返回 DENY
2. 如果有任何 PERMIT 策略匹配,返回 PERMIT
3. 默认返回 DENY
"""
# 先检查拒绝策略
for policy in self.policies:
if policy.effect == Effect.DENY:
if policy.evaluate(subject, resource, action, environment):
return Effect.DENY

# 再检查允许策略
for policy in self.policies:
if policy.effect == Effect.PERMIT:
if policy.evaluate(subject, resource, action, environment):
return Effect.PERMIT

# 默认拒绝
return Effect.DENY

使用示例

# 创建 ABAC 引擎
engine = ABACEngine()

# 添加策略:只有财务部员工在工作时间可以查看财务报告
engine.add_policy(Policy(
name="财务报告查看权限",
effect=Effect.PERMIT,
conditions=[
{
'attribute_type': 'subject',
'attribute_name': 'department',
'operator': 'eq',
'value': '财务部'
},
{
'attribute_type': 'resource',
'attribute_name': 'type',
'operator': 'eq',
'value': 'financial_report'
},
{
'attribute_type': 'action',
'attribute_name': 'action',
'operator': 'eq',
'value': 'read'
},
{
'attribute_type': 'environment',
'attribute_name': 'time',
'operator': 'time_in_range',
'value': ['09:00:00', '18:00:00']
}
]
))

# 添加策略:高敏感资源需要高安全许可
engine.add_policy(Policy(
name="敏感资源访问限制",
effect=Effect.PERMIT,
conditions=[
{
'attribute_type': 'subject',
'attribute_name': 'clearance_level',
'operator': 'gte',
'value': 3
},
{
'attribute_type': 'resource',
'attribute_name': 'sensitivity',
'operator': 'lte',
'value': 3
}
]
))

# 添加拒绝策略:外部网络禁止访问核心资源
engine.add_policy(Policy(
name="外网访问限制",
effect=Effect.DENY,
conditions=[
{
'attribute_type': 'environment',
'attribute_name': 'ip',
'operator': 'not_in',
'value': ['10.0.0.0/8', '172.16.0.0/12', '192.168.0.0/16']
},
{
'attribute_type': 'resource',
'attribute_name': 'classification',
'operator': 'eq',
'value': 'core'
}
]
))

# 评估访问请求
subject = {
'id': 'user-123',
'department': '财务部',
'clearance_level': 3
}

resource = {
'id': 'doc-456',
'type': 'financial_report',
'sensitivity': 2,
'classification': 'internal'
}

action = 'read'

environment = {
'time': time(14, 30), # 下午 2:30
'ip': '192.168.1.100',
'device': 'laptop'
}

result = engine.evaluate(subject, resource, action, environment)
print(result) # Effect.PERMIT

Node.js 实现

class ABACEngine {
constructor() {
this.policies = [];
}

addPolicy(policy) {
this.policies.push(policy);
}

evaluate(subject, resource, action, environment) {
// 先检查拒绝策略
for (const policy of this.policies) {
if (policy.effect === 'deny' && this.evaluatePolicy(policy, subject, resource, action, environment)) {
return 'deny';
}
}

// 再检查允许策略
for (const policy of this.policies) {
if (policy.effect === 'permit' && this.evaluatePolicy(policy, subject, resource, action, environment)) {
return 'permit';
}
}

return 'deny';
}

evaluatePolicy(policy, subject, resource, action, environment) {
for (const condition of policy.conditions) {
if (!this.evaluateCondition(condition, subject, resource, action, environment)) {
return false;
}
}
return true;
}

evaluateCondition(condition, subject, resource, action, environment) {
let actualValue;

switch (condition.attributeType) {
case 'subject':
actualValue = subject[condition.attributeName];
break;
case 'resource':
actualValue = resource[condition.attributeName];
break;
case 'environment':
actualValue = environment[condition.attributeName];
break;
case 'action':
actualValue = action;
break;
}

return this.applyOperator(actualValue, condition.operator, condition.value);
}

applyOperator(actual, operator, expected) {
const operators = {
'eq': (a, e) => a === e,
'ne': (a, e) => a !== e,
'in': (a, e) => e.includes(a),
'not_in': (a, e) => !e.includes(a),
'gt': (a, e) => a > e,
'gte': (a, e) => a >= e,
'lt': (a, e) => a < e,
'lte': (a, e) => a <= e,
'contains': (a, e) => a && a.includes(e),
};

return operators[operator]?.(actual, expected) ?? false;
}

// Express 中间件
middleware() {
return async (req, res, next) => {
const subject = {
id: req.user.id,
department: req.user.department,
clearanceLevel: req.user.clearanceLevel,
};

const resource = {
type: req.params.resourceType,
id: req.params.resourceId,
// 从数据库获取资源属性
...(await this.getResourceAttributes(req.params.resourceId)),
};

const action = this.mapMethodToAction(req.method);

const environment = {
ip: req.ip,
time: new Date(),
userAgent: req.headers['user-agent'],
};

const result = this.evaluate(subject, resource, action, environment);

if (result === 'permit') {
next();
} else {
res.status(403).json({ error: '访问被拒绝' });
}
};
}
}

module.exports = ABACEngine;

ReBAC:基于关系的访问控制

ReBAC(Relationship-Based Access Control)通过实体间的关系判断权限,特别适合社交网络、协作工具等场景。

核心概念

用户 A ──关注──> 用户 B ──拥有──> 文档 C

规则:用户可以查看他关注的用户创建的文档
决策:A 是否可以查看 C?→ 检查 A 是否关注 B → 检查 B 是否拥有 C

实现示例

# 使用图数据库(如 Neo4j)存储关系
# 或者使用关系数据库存储

class ReBACEngine:
"""基于关系的访问控制引擎"""

def __init__(self, db):
self.db = db

def check_relation(self, subject: str, relation: str, object_id: str) -> bool:
"""检查主体和客体之间是否存在指定关系"""
query = """
SELECT 1 FROM relations
WHERE subject_id = %s AND relation = %s AND object_id = %s
"""
result = self.db.execute(query, (subject, relation, object_id))
return len(result) > 0

def check_permission(self, user_id: str, permission: str, resource_id: str) -> bool:
"""检查用户是否有权限访问资源"""
# 规则:用户是资源的所有者
if self.check_relation(user_id, 'owner', resource_id):
return True

# 规则:用户是资源所在组的成员
query = """
SELECT 1 FROM relations r1
JOIN relations r2 ON r1.object_id = r2.subject_id
WHERE r1.subject_id = %s
AND r1.relation = 'member_of'
AND r2.relation = 'contains'
AND r2.object_id = %s
"""
result = self.db.execute(query, (user_id, resource_id))
if result:
return True

# 规则:用户关注的人拥有该资源
query = """
SELECT 1 FROM relations r1
JOIN relations r2 ON r1.object_id = r2.subject_id
WHERE r1.subject_id = %s
AND r1.relation = 'follows'
AND r2.relation = 'owner'
AND r2.object_id = %s
"""
result = self.db.execute(query, (user_id, resource_id))

return len(result) > 0

混合模型实践

在实际应用中,通常结合多种模型:

class HybridAccessControl {
constructor(rbac, abac, rebac) {
this.rbac = rbac;
this.abac = abac;
this.rebac = rebac;
}

async checkAccess(user, resource, action, context) {
// 1. 先检查 RBAC(快速过滤)
const hasRolePermission = await this.rbac.hasPermission(user.id, resource.type, action);
if (!hasRolePermission) {
// 检查是否有基础角色
if (!await this.rbac.hasRole(user.id, 'user')) {
return { allowed: false, reason: 'no_base_role' };
}
}

// 2. 检查 ABAC(细粒度控制)
const abacResult = this.abac.evaluate(
{ id: user.id, department: user.department, clearance: user.clearance },
{ id: resource.id, type: resource.type, sensitivity: resource.sensitivity },
action,
{ time: new Date(), ip: context.ip }
);

if (abacResult === 'deny') {
return { allowed: false, reason: 'abac_denied' };
}

// 3. 检查 ReBAC(关系检查)
if (resource.requiresOwnership) {
const hasRelation = await this.rebac.checkRelation(user.id, 'owner', resource.id);
if (!hasRelation) {
return { allowed: false, reason: 'no_ownership' };
}
}

return { allowed: true };
}
}

选型决策指南

场景推荐模型
组织结构清晰、权限稳定RBAC
权限规则复杂、需要动态决策ABAC
社交网络、协作工具ReBAC
企业应用、需要细粒度控制RBAC + ABAC 混合
零信任架构ABAC

参考资料