跳到主要内容

Schema Registry 数据治理

在 Kafka 生产环境中,数据治理是保障数据质量和系统稳定性的关键环节。Schema Registry 作为 Kafka 生态系统的核心组件,提供了集中式的 Schema 管理和验证能力。本章将详细介绍 Schema Registry 的原理、配置和使用方法。

为什么需要 Schema Registry?

数据格式问题

在没有 Schema Registry 的环境中,生产者和消费者之间通过隐式契约进行数据交换,这会导致一系列问题:

常见问题

  1. 数据不一致:不同生产者可能使用不同的数据格式
  2. 格式变更风险:Schema 变更可能导致消费者解析失败
  3. 缺乏版本控制:无法追踪 Schema 的变更历史
  4. 数据丢失:格式不匹配可能导致数据被丢弃或损坏

Schema Registry 的解决方案

Schema Registry 通过提供集中式的 Schema 管理来解决这些问题:

核心优势

  • 集中管理:所有 Schema 存储在统一位置,便于管理和查找
  • 版本控制:支持 Schema 版本管理,可追溯变更历史
  • 兼容性检查:自动验证 Schema 变更的兼容性
  • 数据验证:确保数据符合预定义的 Schema 结构
  • 性能优化:消息中只传递 Schema ID,而非完整 Schema

核心概念

Schema

Schema 定义了消息数据的结构,包括字段名称、数据类型和约束条件。Schema Registry 支持三种主流格式:

格式特点适用场景
Avro二进制格式、Schema 演进支持好大数据生态、Hadoop
Protobuf高效、支持多语言微服务、gRPC 场景
JSON Schema可读性好、易调试Web API、简单场景

Schema Subject

Subject 是 Schema Registry 中组织 Schema 的逻辑单元,每个 Subject 对应一组版本的 Schema。Kafka 中通常将 Subject 与 Topic 关联:

  • Topic 策略<topic-name>-value<topic-name>-key
  • Record 策略:根据记录类型命名
  • TopicRecord 策略:结合 Topic 和 Record 信息

Schema ID

每个注册的 Schema 都会分配一个唯一的整数 ID。生产者发送消息时,只需在消息头中携带 Schema ID,消费者通过 ID 从 Registry 获取完整的 Schema 定义。

兼容性规则

Schema Registry 支持多种兼容性级别,控制 Schema 演进的方式:

兼容性级别说明允许的操作
BACKWARD新 Schema 可以读取旧数据添加可选字段、删除字段
FORWARD旧 Schema 可以读取新数据添加字段(有默认值)、删除可选字段
FULL双向兼容添加可选字段、删除可选字段
BACKWARD_TRANSITIVE新 Schema 可读取所有旧版本数据与 BACKWARD 相同,但对所有版本生效
FORWARD_TRANSITIVE旧 Schema 可读取所有新版本数据与 FORWARD 相同,但对所有版本生效
FULL_TRANSITIVE所有版本双向兼容与 FULL 相同,但对所有版本生效
NONE不检查兼容性任意变更

安装与配置

Docker 部署

使用 Docker 快速启动 Schema Registry:

# docker-compose.yml
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

启动服务:

docker-compose up -d

# 验证 Schema Registry 是否启动
curl http://localhost:8081/subjects

配置说明

Schema Registry 的关键配置项:

# schema-registry.properties

# 主机名(用于集群内通信)
schema.registry.host.name=schema-registry

# 监听地址
listeners=http://0.0.0.0:8081

# Kafka 存储地址(Schema 存储在 Kafka Topic 中)
kafkastore.bootstrap.servers=PLAINTEXT://kafka:9092

# Schema 存储 Topic(默认 _schemas)
kafkastore.topic=_schemas

# 默认兼容性级别
schema.compatibility.level=BACKWARD

# 是否允许 Schema 删除
schema.registry.delete.enabled=true

与 Kafka 集成

Schema Registry 与 Kafka 的集成架构:

使用 Avro Schema

定义 Schema

Avro Schema 使用 JSON 格式定义数据结构:

{
"type": "record",
"name": "User",
"namespace": "com.example.kafka",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null},
{"name": "age", "type": ["null", "int"], "default": null},
{"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
]
}

Schema 说明

  • type: record:表示这是一个记录类型
  • name:记录名称
  • namespace:命名空间,用于区分同名类型
  • fields:字段列表
  • 联合类型 ["null", "string"] 表示该字段可以为 null 或字符串

Maven 依赖

<dependencies>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>

<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>

<!-- Confluent Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
</dependencies>

<!-- 需要添加 Confluent 仓库 -->
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

生产者配置

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class AvroProducer {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

// 使用 Avro 序列化器
props.put("key.serializer", KafkaAvroSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);

// Schema Registry 地址
props.put("schema.registry.url", "http://localhost:8081");

Producer<String, User> producer = new KafkaProducer<>(props);

// 创建 User 对象(使用 Avro 生成的类)
User user = User.newBuilder()
.setId(1L)
.setName("张三")
.setEmail("[email protected]")
.setAge(25)
.setCreatedAt(System.currentTimeMillis())
.build();

// 发送消息
ProducerRecord<String, User> record = new ProducerRecord<>(
"users", // Topic
"user-1", // Key
user // Value
);

producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("Sent to partition " + metadata.partition());
} else {
exception.printStackTrace();
}
});

producer.close();
}
}

消费者配置

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class AvroConsumer {

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-consumer-group");

// 使用 Avro 反序列化器
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);

// Schema Registry 地址
props.put("schema.registry.url", "http://localhost:8081");

// 返回特定类型(否则返回 GenericRecord)
props.put("specific.avro.reader", "true");

Consumer<String, User> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("users"));

while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(1000));

for (ConsumerRecord<String, User> record : records) {
User user = record.value();
System.out.printf("User: id=%d, name=%s, email=%s%n",
user.getId(), user.getName(), user.getEmail());
}
}
}
}

Schema 演进

Schema 演进是 Schema Registry 的核心能力,允许在不中断服务的情况下修改数据结构。

演进规则

以 BACKWARD 兼容性为例,Schema 演进需要遵循以下规则:

允许的变更

1. 添加带默认值的新字段

// v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}

// v2 - 添加可选字段(BACKWARD 兼容)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": null}
]
}

新消费者使用 v2 Schema 可以读取 v1 写入的数据,新增字段会使用默认值。

2. 删除字段

// v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}

// v2 - 删除 age 字段(BACKWARD 兼容)
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}

新消费者使用 v2 Schema 可以忽略 v1 数据中的 age 字段。

不允许的变更

1. 添加没有默认值的必填字段

// v1
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"}
]
}

// v2 - 不兼容!缺少默认值
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "long"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"} // 没有默认值!
]
}

2. 修改字段类型

// v1
{"name": "age", "type": "int"}

// v2 - 不兼容!类型变更
{"name": "age", "type": "long"}

REST API 操作

Schema Registry 提供完整的 REST API:

# 查看所有 Subject
curl http://localhost:8081/subjects

# 注册新 Schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/users-value/versions

# 查看特定版本的 Schema
curl http://localhost:8081/subjects/users-value/versions/1

# 查看最新版本
curl http://localhost:8081/subjects/users-value/versions/latest

# 检查 Schema 兼容性
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}' \
http://localhost:8081/compatibility/subjects/users-value/versions/latest

# 获取 Schema ID 对应的 Schema
curl http://localhost:8081/schemas/ids/1

# 配置 Subject 的兼容性级别
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FORWARD"}' \
http://localhost:8081/config/users-value

使用 Protobuf Schema

Protobuf 是另一种流行的序列化格式,Schema Registry 完整支持。

定义 Proto 文件

// user.proto
syntax = "proto3";

package com.example.kafka;

option java_package = "com.example.kafka.protobuf";
option java_outer_classname = "UserProto";

message User {
int64 id = 1;
string name = 2;
string email = 3;
int32 age = 4;
int64 created_at = 5;
}

Maven 配置

<dependencies>
<!-- Protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.1</version>
</dependency>

<!-- Confluent Protobuf Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.5.0</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>

<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.1:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

生产者配置

import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import com.example.kafka.protobuf.UserProto.User;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", KafkaProtobufSerializer.class);
props.put("value.serializer", KafkaProtobufSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

Producer<String, User> producer = new KafkaProducer<>(props);

User user = User.newBuilder()
.setId(1L)
.setName("张三")
.setEmail("[email protected]")
.setAge(25)
.setCreatedAt(System.currentTimeMillis())
.build();

producer.send(new ProducerRecord<>("users-protobuf", "user-1", user));

使用 JSON Schema

JSON Schema 是最易读的格式,适合需要人工查看数据的场景。

定义 JSON Schema

{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"type": "object",
"properties": {
"id": {
"type": "integer",
"minimum": 1
},
"name": {
"type": "string",
"minLength": 1,
"maxLength": 100
},
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "integer",
"minimum": 0,
"maximum": 150
}
},
"required": ["id", "name"],
"additionalProperties": false
}

生产者配置

import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaJsonSchemaSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

// 使用 POJO 或 Map
Map<String, Object> user = new HashMap<>();
user.put("id", 1);
user.put("name", "张三");
user.put("email", "[email protected]");
user.put("age", 25);

Producer<String, Map<String, Object>> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("users-json", "user-1", user));

最佳实践

1. 选择合适的 Schema 格式

场景推荐格式原因
大数据处理Avro与 Hadoop 生态集成好
微服务通信Protobuf高效、多语言支持
调试友好JSON Schema可读性好
已有 Avro 资产Avro保持一致性

2. Schema 设计原则

  • 为未来设计:预留可能需要的字段
  • 合理使用默认值:新字段必须有默认值
  • 避免过度嵌套:扁平结构更易维护
  • 文档化:为字段添加 description 属性

3. 兼容性策略

推荐配置:

# 全局默认:BACKWARD(向后兼容)
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config

# 关键 Topic 使用 FULL(完全兼容)
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FULL"}' \
http://localhost:8081/config/critical-topic-value

4. Schema Registry 高可用

生产环境建议:

# 多实例部署
services:
schema-registry-1:
image: confluentinc/cp-schema-registry:7.5.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

schema-registry-2:
image: confluentinc/cp-schema-registry:7.5.0
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-2
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

客户端配置多个地址:

props.put("schema.registry.url", "http://sr1:8081,http://sr2:8081");

5. 监控告警

监控关键指标:

  • Schema 注册频率
  • 兼容性检查失败次数
  • Schema Registry 请求延迟
  • _schemas Topic 大小

小结

  1. Schema Registry 提供 Kafka 生态系统的集中式 Schema 管理
  2. 支持 Avro、Protobuf、JSON Schema 三种主流格式
  3. Schema 演进 允许在不中断服务的情况下修改数据结构
  4. 合理选择 兼容性级别 平衡灵活性和安全性
  5. 生产环境需要考虑 Schema Registry 的 高可用部署

下一步

接下来让我们学习 安全配置,了解如何保护 Kafka 集群。