Kafka 安全配置
本章将详细介绍 Kafka 的安全机制,包括认证、授权和加密。
安全概述
Kafka 安全机制
┌─────────────────────────────────────────────────────────────┐
│ Kafka 安全架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 安全层次 │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 认证 │ │ 授权 │ │ 加密 │ │ │
│ │ │ (SASL/SSL) │ │ (ACL) │ │ (SSL) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 认证 (Authentication): │
│ - 验证客户端身份 │
│ - SASL/SSL 机制 │
│ │
│ 授权 (Authorization): │
│ - 控制资源访问权限 │
│ - ACL 机制 │
│ │
│ 加密 (Encryption): │
│ - 传输加密(SSL/TLS) │
│ - 数据加密 │
│ │
└─────────────────────────────────────────────────────────────┘
认证机制
SASL 认证
Kafka 支持多种 SASL 机制:
| 机制 | 说明 | 适用场景 |
|---|---|---|
| PLAINTEXT | 简单用户名/密码 | 开发测试 |
| SCRAM-SHA-256 | Salt Challenge Response | 生产推荐 |
| SCRAM-SHA-512 | 更强的 SCRAM | 高安全需求 |
| GSSAPI (Kerberos) | Kerberos 集成 | 企业环境 |
配置 SASL_PLAINTEXT
# broker config/server.properties
# 启用 SASL
listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
# SASL 机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
# JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
生产者/消费者配置
// SASL 认证的生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"producer\" " +
"password=\"producer-secret\";");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
SSL/TLS 加密
配置 SSL 加密
# broker config/server.properties
# 监听器配置
listeners=SSL://0.0.0.0:9092
advertised.listeners=SSL://localhost:9092
listener.security.protocol.map=SSL:SSL
# SSL 配置
ssl.keystore.location=/var/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
# SSL 协议
ssl.protocol=TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.3
# 客户端认证(可选)
ssl.client.auth=none # 可选值: none, requested, required
生成 SSL 证书
# 1. 创建目录
mkdir -p /var/ssl
# 2. 生成 keystore
keytool -keystore kafka.server.keystore.jks \
-alias kafka \
-validity 365 \
-genkeypair \
-keyalg RSA \
-storetype PKCS12 \
-storepass keystore-password \
-keypass key-password \
-dname "CN=kafka"
# 3. 创建 CA
openssl req -new -x509 \
-keyout ca-key \
-out ca-cert \
-days 365 \
-subj "/CN=Kafka CA"
# 4. 签名证书
keytool -keystore kafka.server.keystore.jks \
-alias kafka \
-certreq \
-file cert-file \
-storepass keystore-password
openssl x509 -req \
-CA ca-cert \
-CAkey ca-key \
-in cert-file \
-out cert-signed \
-days 365 \
-CAcreateserial
keytool -keystore kafka.server.keystore.jks \
-alias CARoot \
-importcert \
-file ca-cert \
-storepass keystore-password
keytool -keystore kafka.server.keystore.jks \
-alias kafka \
-importcert \
-file cert-signed \
-storepass keystore-password
# 5. 创建 truststore
keytool -keystore kafka.server.truststore.jks \
-alias CARoot \
-importcert \
-file ca-cert \
-storepass truststore-password
ACL 授权
启用 ACL
# broker config/server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
使用 kafka-acls 命令
# 添加生产者权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--producer \
--topic "*" \
--user producer
# 添加消费者权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--consumer \
--topic "*" \
--group "*" \
--user consumer
# 添加特定主题权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal "User:alice" \
--operation Read \
--operation Write \
--topic orders
# 禁止访问
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--deny-principal "User:bob" \
--operation Read \
--topic sensitive-data
# 查看 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--list \
--topic orders
ACL 操作类型
| 操作 | 说明 |
|---|---|
| Read | 读取主题 |
| Write | 写入主题 |
| Create | 创建主题 |
| Delete | 删除主题 |
| Describe | 查看主题元数据 |
| Alter | 修改主题配置 |
| ClusterAction | 集群操作 |
| All | 所有操作 |
完整安全配置示例
Broker 配置
# server.properties
listeners=SASL_SSL://0.0.0.0:9092
advertised.listeners=SASL_SSL://localhost:9092
listener.security.protocol.map=SASL_SSL:SASL_SSL
# SSL 配置
ssl.keystore.location=/var/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required
ssl.protocol=TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.3
# SASL 配置
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";
# ACL 配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 安全协议
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"producer\" " +
"password=\"producer-secret\";");
props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
// 安全协议
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"consumer\" " +
"password=\"consumer-secret\";");
props.put("key.deserializer", "StringDeserializer");
props.put("value.deserializer", "StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
最佳实践
安全配置建议
- 使用 SASL + SSL:生产环境推荐组合
- 定期轮换证书:SSL 证书定期更新
- 最小权限原则:只授予必要的 ACL
- 启用客户端认证:ssl.client.auth=required
- 监控访问日志:审计安全事件
生产检查清单
- 启用认证(SASL)
- 启用 SSL/TLS 加密
- 配置 ACL
- 配置 super.users
- 监控未授权访问
- 定期审计日志
小结
- Kafka 支持多种认证机制(SASL)
- SSL/TLS 提供传输加密
- ACL 控制资源访问权限
- 生产环境应启用完整安全配置
下一步
接下来让我们学习 监控运维。