Schema Registry 数据治理
在 Kafka 生产环境中,数据治理是保障数据质量和系统稳定性的关键环节。Schema Registry 作为 Kafka 生态系统的核心组件,提供了集中式的 Schema 管理和验证能力。本章将详细介绍 Schema Registry 的原理、配置和使用方法。
为什么需要 Schema Registry?
数据格式问题
在没有 Schema Registry 的环境中,生产者和消费者之间通过隐式契约进行数据交换,这会导致一系列问题:
常见问题:
- 数据不一致:不同生产者可能使用不同的数据格式
- 格式变更风险:Schema 变更可能导致消费者解析失败
- 缺乏版本控制:无法追踪 Schema 的变更历史
- 数据丢失:格式不匹配可能导致数据被丢弃或损坏
Schema Registry 的解决方案
Schema Registry 通过提供集中式的 Schema 管理来解决这些问题:
核心优势:
- 集中管理:所有 Schema 存储在统一位置,便于管理和查找
- 版本控制:支持 Schema 版本管理,可追溯变更历史
- 兼容性检查:自动验证 Schema 变更的兼容性
- 数据验证:确保数据符合预定义的 Schema 结构
- 性能优化:消息中只传递 Schema ID,而非完整 Schema
核心概念
Schema
Schema 定义了消息数据的结构,包括字段名称、数据类型和约束条件。Schema Registry 支持三种主流格式:
| 格式 | 特点 | 适用场景 |
|---|---|---|
| Avro | 二进制格式、Schema 演进支持好 | 大数据生态、Hadoop |
| Protobuf | 高效、支持多语言 | 微服务、gRPC 场景 |
| JSON Schema | 可读性好、易调试 | Web API、简单场景 |
Schema Subject
Subject 是 Schema Registry 中组织 Schema 的逻辑单元,每个 Subject 对应一组版本的 Schema。Kafka 中通常将 Subject 与 Topic 关联:
- Topic 策略:
<topic-name>-value或<topic-name>-key - Record 策略:根据记录类型命名
- TopicRecord 策略:结合 Topic 和 Record 信息
Schema ID
每个注册的 Schema 都会分配一个唯一的整数 ID。生产者发送消息时,只需在消息头中携带 Schema ID,消费者通过 ID 从 Registry 获取完整的 Schema 定义。
兼容性规则
Schema Registry 支持多种兼容性级别,控制 Schema 演进的方式:
| 兼容性级别 | 说明 | 允许的操作 |
|---|---|---|
| BACKWARD | 新 Schema 可以读取旧数据 | 添加可选字段、删除字段 |
| FORWARD | 旧 Schema 可以读取新数据 | 添加字段(有默认值)、删除可选字段 |
| FULL | 双向兼容 | 添加可选字段、删除可选字段 |
| BACKWARD_TRANSITIVE | 新 Schema 可读取所有旧版本数据 | 与 BACKWARD 相同,但对所有版本生效 |
| FORWARD_TRANSITIVE | 旧 Schema 可读取所有新版本数据 | 与 FORWARD 相同,但对所有版本生效 |
| FULL_TRANSITIVE | 所有版本双向兼容 | 与 FULL 相同,但对所有版本生效 |
| NONE | 不检查兼容性 | 任意变更 |
安装与配置
Docker 部署
使用 Docker 快速启动 Schema Registry:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
启动服务:
docker-compose up -d
# 验证 Schema Registry 是否启动
curl http://localhost:8081/subjects
配置说明
Schema Registry 的关键配置项:
# schema-registry.properties
# 主机名(用于集群内通信)
schema.registry.host.name=schema-registry
# 监听地址
listeners=http://0.0.0.0:8081
# Kafka 存储地址(Schema 存储在 Kafka Topic 中)
kafkastore.bootstrap.servers=PLAINTEXT://kafka:9092
# Schema 存储 Topic(默认 _schemas)
kafkastore.topic=_schemas
# 默认兼容性级别
schema.compatibility.level=BACKWARD
# 是否允许 Schema 删除
schema.registry.delete.enabled=true
与 Kafka 集成
Schema Registry 与 Kafka 的集成架构:
使用 Avro Schema
定义 Schema
Avro Schema 使用 JSON 格式定义数据结构:
{
"type": "record",
"name": "User",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "age", "type": ["null", "int"], "default": null},
{"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
]
}
Schema 说明:
type: record:表示这是一个记录类型name:记录名称namespace:命名空间,用于区分同名类型fields:字段列表- 联合类型
["null", "string"]表示该字段可以为 null 或字符串
Maven 依赖
<dependencies>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- Confluent Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
</dependencies>
<!-- 需要添加 Confluent 仓库 -->
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
生产者配置
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class AvroProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 使用 Avro 序列化器
props.put("key.serializer", KafkaAvroSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
// Schema Registry 地址
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, User> producer = new KafkaProducer<>(props);
// 创建 User 对象(使用 Avro 生成的类)
User user = User.newBuilder()
.setId(1L)
.setName("张三")
.setEmail("[email protected]")
.setAge(25)
.setCreatedAt(System.currentTimeMillis())
.build();
// 发送消息
ProducerRecord<String, User> record = new ProducerRecord<>(
"users", // Topic
"user-1", // Key
user // Value
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent to partition " + metadata.partition());
} else {
exception.printStackTrace();
}
});
producer.close();
}
}
消费者配置
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class AvroConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-consumer-group");
// 使用 Avro 反序列化器
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
// Schema Registry 地址
props.put("schema.registry.url", "http://localhost:8081");
// 返回特定类型(否则返回 GenericRecord)
props.put("specific.avro.reader", "true");
Consumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users"));
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.printf("User: id=%d, name=%s, email=%s%n",
user.getId(), user.getName(), user.getEmail());
}
}
}
}
Schema 演进
Schema 演进是 Schema Registry 的核心能力,允许在不中断服务的情况下修改数据结构。
演进规则
以 BACKWARD 兼容性为例,Schema 演进需要遵循以下规则:
允许的变更
1. 添加带默认值的新字段
// v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
// v2 - 添加可选字段(BACKWARD 兼容)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}
新消费者使用 v2 Schema 可以读取 v1 写入的数据,新增字段会使用默认值。
2. 删除字段
// v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
// v2 - 删除 age 字段(BACKWARD 兼容)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
新消费者使用 v2 Schema 可以忽略 v1 数据中的 age 字段。
不允许的变更
1. 添加没有默认值的必填字段
// v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}
// v2 - 不兼容!缺少默认值
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"} // 没有默认值!
]
}
2. 修改字段类型
// v1
{"name": "age", "type": "int"}
// v2 - 不兼容!类型变更
{"name": "age", "type": "long"}
REST API 操作
Schema Registry 提供完整的 REST API:
# 查看所有 Subject
curl http://localhost:8081/subjects
# 注册新 Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/users-value/versions
# 查看特定版本的 Schema
curl http://localhost:8081/subjects/users-value/versions/1
# 查看最新版本
curl http://localhost:8081/subjects/users-value/versions/latest
# 检查 Schema 兼容性
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}' \
http://localhost:8081/compatibility/subjects/users-value/versions/latest
# 获取 Schema ID 对应的 Schema
curl http://localhost:8081/schemas/ids/1
# 配置 Subject 的兼容性级别
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FORWARD"}' \
http://localhost:8081/config/users-value
使用 Protobuf Schema
Protobuf 是另一种流行的序列化格式,Schema Registry 完整支持。
定义 Proto 文件
// user.proto
syntax = "proto3";
package com.example.kafka;
option java_package = "com.example.kafka.protobuf";
option java_outer_classname = "UserProto";
message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
int64 created_at = 5;
}
Maven 配置
<dependencies>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.1</version>
</dependency>
<!-- Confluent Protobuf Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.5.0</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
生产者配置
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.kafka.protobuf.UserProto.User;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", KafkaProtobufSerializer.class);
props.put("value.serializer", KafkaProtobufSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, User> producer = new KafkaProducer<>(props);
User user = User.newBuilder()
.setId(1L)
.setName("张三")
.setEmail("[email protected]")
.setAge(25)
.setCreatedAt(System.currentTimeMillis())
.build();
producer.send(new ProducerRecord<>("users-protobuf", "user-1", user));
使用 JSON Schema
JSON Schema 是最易读的格式,适合需要人工查看数据的场景。
定义 JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"type": "object",
"properties": {
"id": {
"type": "integer",
"minimum": 1
},
"name": {
"type": "string",
"minLength": 1,
"maxLength": 100
},
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "integer",
"minimum": 0,
"maximum": 150
}
},
"required": ["id", "name"],
"additionalProperties": false
}
生产者配置
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaJsonSchemaSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// 使用 POJO 或 Map
Map<String, Object> user = new HashMap<>();
user.put("id", 1);
user.put("name", "张三");
user.put("email", "[email protected]");
user.put("age", 25);
Producer<String, Map<String, Object>> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("users-json", "user-1", user));
最佳实践
1. 选择合适的 Schema 格式
| 场景 | 推荐格式 | 原因 |
|---|---|---|
| 大数据处理 | Avro | 与 Hadoop 生态集成好 |
| 微服务通信 | Protobuf | 高效、多语言支持 |
| 调试友好 | JSON Schema | 可读性好 |
| 已有 Avro 资产 | Avro | 保持一致性 |
2. Schema 设计原则
- 为未来设计:预留可能需要的字段
- 合理使用默认值:新字段必须有默认值
- 避免过度嵌套:扁平结构更易维护
- 文档化:为字段添加 description 属性
3. 兼容性策略
推荐配置:
# 全局默认:BACKWARD(向后兼容)
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config
# 关键 Topic 使用 FULL(完全兼容)
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FULL"}' \
http://localhost:8081/config/critical-topic-value
4. Schema Registry 高可用
生产环境建议:
# 多实例部署
services:
schema-registry-1:
image: confluentinc/cp-schema-registry:7.5.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
schema-registry-2:
image: confluentinc/cp-schema-registry:7.5.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-2
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
客户端配置多个地址:
props.put("schema.registry.url", "http://sr1:8081,http://sr2:8081");
5. 监控告警
监控关键指标:
- Schema 注册频率
- 兼容性检查失败次数
- Schema Registry 请求延迟
_schemasTopic 大小
小结
- Schema Registry 提供 Kafka 生态系统的集中式 Schema 管理
- 支持 Avro、Protobuf、JSON Schema 三种主流格式
- Schema 演进 允许在不中断服务的情况下修改数据结构
- 合理选择 兼容性级别 平衡灵活性和安全性
- 生产环境需要考虑 Schema Registry 的 高可用部署
下一步
接下来让我们学习 安全配置,了解如何保护 Kafka 集群。