跳到主要内容

Kafka 安全配置

本章将详细介绍 Kafka 的安全机制,包括认证、授权和加密。

安全概述

Kafka 安全机制

┌─────────────────────────────────────────────────────────────┐
│ Kafka 安全架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 安全层次 │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 认证 │ │ 授权 │ │ 加密 │ │ │
│ │ │ (SASL/SSL) │ │ (ACL) │ │ (SSL) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 认证 (Authentication): │
│ - 验证客户端身份 │
│ - SASL/SSL 机制 │
│ │
│ 授权 (Authorization): │
│ - 控制资源访问权限 │
│ - ACL 机制 │
│ │
│ 加密 (Encryption): │
│ - 传输加密(SSL/TLS) │
│ - 数据加密 │
│ │
└─────────────────────────────────────────────────────────────┘

认证机制

SASL 认证

Kafka 支持多种 SASL 机制:

机制说明适用场景
PLAINTEXT简单用户名/密码开发测试
SCRAM-SHA-256Salt Challenge Response生产推荐
SCRAM-SHA-512更强的 SCRAM高安全需求
GSSAPI (Kerberos)Kerberos 集成企业环境

配置 SASL_PLAINTEXT

# broker config/server.properties

# 启用 SASL
listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT

# SASL 机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512

# JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";

生产者/消费者配置

// SASL 认证的生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"producer\" " +
"password=\"producer-secret\";");

props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");

SSL/TLS 加密

配置 SSL 加密

# broker config/server.properties

# 监听器配置
listeners=SSL://0.0.0.0:9092
advertised.listeners=SSL://localhost:9092
listener.security.protocol.map=SSL:SSL

# SSL 配置
ssl.keystore.location=/var/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password

# SSL 协议
ssl.protocol=TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.3

# 客户端认证(可选)
ssl.client.auth=none # 可选值: none, requested, required

生成 SSL 证书

# 1. 创建目录
mkdir -p /var/ssl

# 2. 生成 keystore
keytool -keystore kafka.server.keystore.jks \
-alias kafka \
-validity 365 \
-genkeypair \
-keyalg RSA \
-storetype PKCS12 \
-storepass keystore-password \
-keypass key-password \
-dname "CN=kafka"

# 3. 创建 CA
openssl req -new -x509 \
-keyout ca-key \
-out ca-cert \
-days 365 \
-subj "/CN=Kafka CA"

# 4. 签名证书
keytool -keystore kafka.server.keystore.jks \
-alias kafka \
-certreq \
-file cert-file \
-storepass keystore-password

openssl x509 -req \
-CA ca-cert \
-CAkey ca-key \
-in cert-file \
-out cert-signed \
-days 365 \
-CAcreateserial

keytool -keystore kafka.server.keystore.jks \
-alias CARoot \
-importcert \
-file ca-cert \
-storepass keystore-password

keytool -keystore kafka.server.keystore.jks \
-alias kafka \
-importcert \
-file cert-signed \
-storepass keystore-password

# 5. 创建 truststore
keytool -keystore kafka.server.truststore.jks \
-alias CARoot \
-importcert \
-file ca-cert \
-storepass truststore-password

ACL 授权

启用 ACL

# broker config/server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin

使用 kafka-acls 命令

# 添加生产者权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--producer \
--topic "*" \
--user producer

# 添加消费者权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--consumer \
--topic "*" \
--group "*" \
--user consumer

# 添加特定主题权限
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--allow-principal "User:alice" \
--operation Read \
--operation Write \
--topic orders

# 禁止访问
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--add \
--deny-principal "User:bob" \
--operation Read \
--topic sensitive-data

# 查看 ACL
kafka-acls.sh \
--bootstrap-server localhost:9092 \
--list \
--topic orders

ACL 操作类型

操作说明
Read读取主题
Write写入主题
Create创建主题
Delete删除主题
Describe查看主题元数据
Alter修改主题配置
ClusterAction集群操作
All所有操作

完整安全配置示例

Broker 配置

# server.properties
listeners=SASL_SSL://0.0.0.0:9092
advertised.listeners=SASL_SSL://localhost:9092
listener.security.protocol.map=SASL_SSL:SASL_SSL

# SSL 配置
ssl.keystore.location=/var/ssl/kafka.server.keystore.jks
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/var/ssl/kafka.server.truststore.jks
ssl.truststore.password=truststore-password
ssl.client.auth=required
ssl.protocol=TLS
ssl.enabled.protocols=TLSv1.2,TLSv1.3

# SASL 配置
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512

# JAAS 配置
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" \
password="admin-secret";

# ACL 配置
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin

生产者配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

// 安全协议
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"producer\" " +
"password=\"producer-secret\";");

props.put("key.serializer", "StringSerializer");
props.put("value.serializer", "StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

消费者配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");

// 安全协议
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"consumer\" " +
"password=\"consumer-secret\";");

props.put("key.deserializer", "StringDeserializer");
props.put("value.deserializer", "StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);

最佳实践

安全配置建议

  1. 使用 SASL + SSL:生产环境推荐组合
  2. 定期轮换证书:SSL 证书定期更新
  3. 最小权限原则:只授予必要的 ACL
  4. 启用客户端认证:ssl.client.auth=required
  5. 监控访问日志:审计安全事件

生产检查清单

  • 启用认证(SASL)
  • 启用 SSL/TLS 加密
  • 配置 ACL
  • 配置 super.users
  • 监控未授权访问
  • 定期审计日志

小结

  1. Kafka 支持多种认证机制(SASL)
  2. SSL/TLS 提供传输加密
  3. ACL 控制资源访问权限
  4. 生产环境应启用完整安全配置

下一步

接下来让我们学习 监控运维