Kafka Connect
Kafka Connect 是 Kafka 生态系统中用于数据集成的核心组件,它提供了一种可靠、可扩展的方式来在 Kafka 和外部数据系统之间传输数据。本章将详细介绍 Kafka Connect 的核心概念、配置方法和最佳实践。
Kafka Connect 概述
什么是 Kafka Connect?
Kafka Connect 是一个可扩展、可靠的数据管道工具,专门用于在 Kafka 和其他数据系统之间同步数据。它的设计目标是简化数据集成工作,让用户无需编写代码就能实现数据传输。
核心概念
Kafka Connect 有几个核心概念需要理解:
| 概念 | 说明 |
|---|---|
| Connector | 连接器插件,定义数据如何从源系统读取或写入目标系统 |
| Task | 任务,实际执行数据传输的工作单元,一个 Connector 可以有多个 Task |
| Worker | 工作进程,运行 Connector 和 Task 的进程 |
| Converter | 转换器,负责在 Kafka Connect 格式和 Kafka 消息格式之间转换 |
| Transform | 单消息转换器(SMT),在传输过程中对消息进行轻量级修改 |
Source 与 Sink
Kafka Connect 的连接器分为两类:
- Source Connector:从外部系统读取数据并写入 Kafka Topic
- Sink Connector:从 Kafka Topic 读取数据并写入外部系统
运行模式
Kafka Connect 支持两种运行模式:Standalone(单机)和 Distributed(分布式)。
Standalone 模式
Standalone 模式在单个进程中运行所有工作,适合开发和测试环境,也适合只需单个 Worker 的场景(如收集日志文件)。
# 启动 Standalone Worker
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties
Standalone 模式配置:
# config/connect-standalone.properties
# Kafka 集群地址
bootstrap.servers=localhost:9092
# 键值转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 偏移量存储文件
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
特点:
- 配置简单,易于启动
- 所有工作在单一进程中完成
- 不支持自动故障转移
- Connector 配置通过命令行传递
Distributed 模式
Distributed 模式支持高可用和水平扩展,是生产环境的推荐模式。Worker 自动协调工作分配,故障时自动重新分配任务。
# 启动 Distributed Worker(每个节点执行相同命令)
bin/connect-distributed.sh config/connect-distributed.properties
Distributed 模式配置:
# config/connect-distributed.properties
# Kafka 集群地址
bootstrap.servers=localhost:9092
# 集群 Group ID(同一集群的 Worker 使用相同 ID)
group.id=connect-cluster
# 键值转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 内部主题配置(存储配置、偏移量、状态)
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
# 内部主题副本数
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# 内部主题分区数
config.storage.partitions=1
offset.storage.partitions=25
status.storage.partitions=5
内部主题说明:
| 主题 | 用途 | 建议配置 |
|---|---|---|
config.storage.topic | 存储 Connector 配置 | 单分区、高副本、压缩日志 |
offset.storage.topic | 存储源系统偏移量 | 多分区、高副本、压缩日志 |
status.storage.topic | 存储 Connector 状态 | 多分区、高副本、压缩日志 |
特点:
- 支持自动负载均衡
- 支持故障自动转移
- Connector 配置通过 REST API 管理
- 可以动态添加或移除 Worker
配置 Connector
Connector 通用配置
所有 Connector 都有一些通用配置项:
# Connector 名称(必须唯一)
name=mysql-source-connector
# Connector 类
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
# 最大任务数
tasks.max=4
# 覆盖 Worker 默认转换器(可选)
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 键值转换器配置(可选)
value.converter.schemas.enable=true
Source Connector 示例
JDBC Source Connector
从关系型数据库读取数据:
# jdbc-source-connector.properties
name=jdbc-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
# 数据库连接
connection.url=jdbc:mysql://localhost:3306/mydb
connection.user=root
connection.password=secret
# 表选择
tables.pattern=mydb.orders,mydb.customers
# 增量读取模式
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
# Topic 命名
topic.prefix=mysql-
# 轮询间隔
poll.interval.ms=5000
batch.max.rows=1000
增量读取模式说明:
| 模式 | 说明 | 适用场景 |
|---|---|---|
bulk | 每次全量读取 | 小表、配置表 |
incrementing | 基于自增列 | 有自增主键的表 |
timestamp | 基于时间戳 | 有更新时间戳的表 |
timestamp+incrementing | 时间戳+自增 | 最可靠的方式 |
Debezium MySQL CDC
使用 Debezium 捕获 MySQL 变更数据:
{
"name": "mysql-cdc-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz123",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "mydb",
"table.include.list": "mydb.orders,mydb.customers",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mydb",
"include.schema.changes": "true",
"snapshot.mode": "initial"
}
}
Debezium 的优势:
- 捕获所有数据变更(INSERT、UPDATE、DELETE)
- 低延迟,实时捕获
- 保留事务边界
Sink Connector 示例
JDBC Sink Connector
写入关系型数据库:
# jdbc-sink-connector.properties
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
# 订阅的 Topic
topics=mysql-orders,mysql-customers
# 数据库连接
connection.url=jdbc:postgresql://localhost:5432/analytics
connection.user=postgres
connection.password=secret
# 写入模式
insert.mode=upsert
pk.mode=record_key
pk.fields=id
# 字段映射
fields.whitelist=id,name,amount,created_at
# 批处理配置
batch.size=3000
flush.size=1000
写入模式说明:
| 模式 | 说明 |
|---|---|
insert | 只插入,主键冲突则失败 |
upsert | 插入或更新(主键存在则更新) |
update | 只更新,记录不存在则跳过 |
Elasticsearch Sink Connector
写入 Elasticsearch:
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "2",
"topics": "orders,customers",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"behavior.on.null.values": "delete",
"batch.size": 2000,
"max.in.flight.requests": 5
}
}
转换器(Converter)
转换器负责在 Kafka Connect 内部格式和 Kafka 消息格式之间转换数据。
常用转换器
| 转换器 | 格式 | 特点 |
|---|---|---|
| JsonConverter | JSON | 支持带 Schema 或无 Schema |
| AvroConverter | Avro | 需要 Schema Registry,体积小 |
| ProtobufConverter | Protobuf | 需要 Schema Registry |
| StringConverter | String | 简单字符串,适合文本数据 |
| ByteArrayConverter | byte[] | 原始字节,不做转换 |
JSON Converter 配置
# 使用 JSON 格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 是否包含 Schema
# true: 包含 schema 信息(结构化数据)
# false: 只包含数据(更简洁)
key.converter.schemas.enable=false
value.converter.schemas.enable=false
带 Schema 的 JSON 格式:
{
"schema": {
"type": "struct",
"fields": [
{"field": "id", "type": "int64"},
{"field": "name", "type": "string"}
]
},
"payload": {
"id": 1,
"name": "Alice"
}
}
不带 Schema 的 JSON 格式:
{
"id": 1,
"name": "Alice"
}
Avro Converter 配置
# 使用 Avro 格式(需要 Schema Registry)
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
Avro 的优势:
- 数据体积小(二进制格式)
- Schema 演进支持
- 强类型验证
单消息转换(SMT)
单消息转换(Single Message Transform,SMT)允许在消息传输过程中对每条消息进行轻量级修改,无需编写代码。
SMT 配置方式
# 定义转换链
transforms=unwrap,addTimestamp,renameField
# 每个转换的类型
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.addTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.renameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
# 每个转换的配置
transforms.addTimestamp.timestamp.field=message_time
transforms.renameField.renames=old_name:new_name
常用内置转换
InsertField —— 插入字段
transforms=insertSource
transforms.insertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertSource.static.field=source_system
transforms.insertSource.static.value=kafka-connect
transforms.insertSource.timestamp.field=ingest_time
ReplaceField —— 重命名字段
transforms=renameFields
transforms.renameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.renameFields.renames=created_at:timestamp,updated_at:last_modified
MaskField —— 脱敏字段
transforms=maskSensitive
transforms.maskSensitive.type=org.apache.kafka.connect.transforms.MaskField$Value
transforms.maskSensitive.fields=password,ssn
ExtractField —— 提取字段
transforms=extractValue
transforms.extractValue.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractValue.field=after
TimestampRouter —— 基于 Timestamp 路由 Topic
transforms=routeByTime
transforms.routeByTime.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.routeByTime.topic.format=${topic}-${timestamp}
transforms.routeByTime.timestamp.format=yyyyMMdd
RegexRouter —— 正则表达式路由 Topic
transforms=routeByRegex
transforms.routeByRegex.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.routeByRegex.regex=(.*)-source
transforms.routeByRegex.replacement=$1-processed
Flatten —— 扁平化嵌套结构
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_
Debezium 常用 SMT
Debezium 生成的消息包含复杂的结构,通常需要使用 SMT 进行转换:
# 提取 after 字段(新记录值)
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
# 路由到不同的 Topic
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3
REST API 管理
Distributed 模式下,Kafka Connect 提供完整的 REST API 用于管理 Connector。
Connector 管理
# 查看所有 Connector
curl http://localhost:8083/connectors
# 创建 Connector
curl -X POST -H "Content-Type: application/json" \
--data @connector-config.json \
http://localhost:8083/connectors
# 查看 Connector 详情
curl http://localhost:8083/connectors/my-connector
# 查看 Connector 配置
curl http://localhost:8083/connectors/my-connector/config
# 更新 Connector 配置
curl -X PUT -H "Content-Type: application/json" \
--data @new-config.json \
http://localhost:8083/connectors/my-connector/config
# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/my-connector
Connector 状态管理
# 查看 Connector 状态
curl http://localhost:8083/connectors/my-connector/status
# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/pause
# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/resume
# 重启 Connector
curl -X POST http://localhost:8083/connectors/my-connector/restart
# 重启特定 Task
curl -X POST http://localhost:8083/connectors/my-connector/tasks/0/restart
Task 管理
# 查看 Connector 的所有 Task
curl http://localhost:8083/connectors/my-connector/tasks
# 查看特定 Task 状态
curl http://localhost:8083/connectors/my-connector/tasks/0/status
插件管理
# 查看已安装的 Connector 插件
curl http://localhost:8083/connector-plugins
# 验证配置
curl -X PUT -H "Content-Type: application/json" \
--data @config.json \
http://localhost:8083/connector-plugins/JdbcSourceConnector/config/validate
健康检查
# 查看集群信息
curl http://localhost:8083/
# 查看 Worker 状态
curl http://localhost:8083/connectors?expand=status
错误处理
错误处理策略
# 错误容忍度
# all: 忽略所有错误
# none: 遇到错误立即停止
errors.tolerance=all
# 死信队列配置
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=3
errors.deadletterqueue.context.headers.enable=true
# 错误重试
errors.retry.timeout=60000
errors.retry.delay.max.ms=5000
死信队列(DLQ)
当消息处理失败时,可以将其发送到死信队列:
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=3
# 在 DLQ 消息中添加错误信息头
errors.deadletterqueue.context.headers.enable=true
DLQ 消息会包含以下头部信息:
| 头部 | 说明 |
|---|---|
__connect.errors.exception | 异常类名 |
__connect.errors.exception.message | 异常消息 |
__connect.errors.exception.stacktrace | 异常堆栈 |
__connect.errors.original.topic | 原始 Topic |
__connect.errors.original.partition | 原始分区 |
__connect.errors.original.offset | 原始偏移量 |
监控与运维
JMX 指标
Kafka Connect 通过 JMX 暴露丰富的监控指标:
# 启用 JMX
export JMX_PORT=9999
bin/connect-distributed.sh config/connect-distributed.properties
关键指标:
| 指标 | 说明 |
|---|---|
connector-count | Connector 数量 |
connector-failed-count | 失败的 Connector 数量 |
task-count | Task 总数 |
task-running-count | 运行中的 Task 数量 |
task-failed-count | 失败的 Task 数量 |
connector-status | Connector 状态 |
offset-commit-completion-rate | 偏移量提交速率 |
Prometheus + Grafana 监控
# prometheus.yml
scrape_configs:
- job_name: 'kafka-connect'
static_configs:
- targets: ['connect:9999']
日志配置
# log4j.properties
log4j.rootLogger=INFO, stdout, file
# Connect 日志
log4j.logger.org.apache.kafka.connect=INFO
# Connector 特定日志
log4j.logger.io.confluent.connect.jdbc=DEBUG
最佳实践
生产环境配置
# Worker 配置
# 健壮的内部主题配置
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
# 性能优化
offset.flush.interval.ms=10000
offset.flush.timeout.ms=60000
# 消费者配置
consumer.auto.offset.reset=earliest
consumer.enable.auto.commit=false
# 生产者配置
producer.acks=all
producer.enable.idempotence=true
高可用部署
- 多 Worker 部署:至少部署 3 个 Worker 节点
- Connector 并行度:设置合理的
tasks.max值 - 资源隔离:不同类型 Connector 使用独立的 Connect 集群
常见问题排查
问题 1:Connector 启动失败
# 查看 Connector 状态和错误信息
curl http://localhost:8083/connectors/my-connector/status | jq
# 查看 Worker 日志
tail -f logs/connect.log
问题 2:Task 一直重启
可能原因:
- 下游系统不可用
- 数据格式不匹配
- 内存不足
问题 3:数据延迟
检查:
tasks.max是否足够- 网络带宽是否足够
- 下游系统是否有瓶颈
小结
Kafka Connect 是构建数据管道的核心工具:
- 两种运行模式:Standalone 适合开发测试,Distributed 适合生产环境
- Source/Sink:Source 从外部系统读取数据到 Kafka,Sink 从 Kafka 写入外部系统
- Converter:负责数据格式转换,支持 JSON、Avro、Protobuf 等
- SMT:在传输过程中对消息进行轻量级转换
- REST API:提供完整的 Connector 管理接口
- 错误处理:支持死信队列和错误重试
下一步
接下来让我们学习 安全配置,了解如何保护 Kafka 集群。