Kafka Connect
Kafka Connect 是 Kafka 的数据集成工具,用于在 Kafka 和外部系统之间传输数据。本章将详细介绍 Kafka Connect 的概念、使用方法和常见配置。
Kafka Connect 概述
什么是 Kafka Connect?
Kafka Connect 是一个可扩展的、可靠的数据管道工具,用于在 Kafka 和其他数据系统之间同步数据。
┌─────────────────────────────────────────────────────────────┐
│ Kafka Connect 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Source │ │ Sink │ │
│ │ Database │ │ Consumer │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Connect Workers │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Worker 1 │ │ Worker 2 │ │ │
│ │ │ (Task 1,2) │ │ (Task 3,4) │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Kafka │◀─────▶│ Kafka │ │
│ │ (Topic) │ │ (Topic) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Target │ │
│ │ Database │ │
│ └──────────────┘ │
│ │
│ Source: 外部系统 -> Kafka │
│ Sink: Kafka -> 外部系统 │
│ │
└─────────────────────────────────────────────────────────────┘
核心概念
| 概念 | 说明 |
|---|---|
| Connector | 连接器插件,决定数据如何传输 |
| Task | 实际执行数据移动的组件 |
| Worker | 运行 Connector 和 Task 的进程 |
| Converter | 数据格式转换器 |
| Transform | 数据转换器 |
Source Connectors
Source Connector 示例
// 自定义 Source Connector 需要实现以下接口
// SourceConnector - 生命周期管理
// SourceTask - 实际数据读取
常用 Source Connectors
| Connector | 数据源 | 说明 |
|---|---|---|
| JDBC Source | 关系型数据库 | 定期轮询数据库 |
| Debezium | MySQL/PostgreSQL | CDC 变更数据捕获 |
| MQTT Source | MQTT Broker | 物联网数据 |
| File Source | 文件系统 | 监控文件变化 |
JDBC Source 配置
# connect-jdbc-source.properties
name=jdbc-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
# 数据库连接
connection.url=jdbc:mysql://localhost:3306/mydb
connection.user=root
connection.password=secret
# 源表配置
tables.pattern=mydb.*
# 查询模式
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
# Topic 映射
topic.prefix=mydb-
# 轮询间隔
poll.interval.ms=1000
batch.size=100
Debezium MySQL 配置
{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz123",
"database.server.id": "184054",
"topic.prefix": "mysql",
"database.include.list": "mydb",
"table.include.list": "mydb\\.orders,mydb\\.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.mysql"
}
}
Sink Connectors
Sink Connector 示例
// 自定义 Sink Connector 需要实现以下接口
// SinkConnector - 生命周期管理
// SinkTask - 实际数据写入
常用 Sink Connectors
| Connector | 目标系统 | 说明 |
|---|---|---|
| JDBC Sink | 关系型数据库 | 批量写入 |
| Elasticsearch Sink | Elasticsearch | 全文搜索 |
| S3 Sink | Amazon S3 | 对象存储 |
| HDFS Sink | HDFS | Hadoop |
JDBC Sink 配置
# connect-jdbc-sink.properties
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
# Kafka Topic
topics=orders,customers
# 数据库连接
connection.url=jdbc:mysql://localhost:3306/mydb
connection.user=root
connection.password=secret
# 写入模式
insert.mode=upsert
pk.mode=record_key
# 字段映射
fields.whitelist=id,name,amount,created_at
# 批处理
batch.size=1000
flush.size=1000
Elasticsearch Sink 配置
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "orders,customers",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"topic.index.map": "{\"orders\":\"orders-index\",\"customers\":\"customers-index\"}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.confluent.connect.transforms.ExtractField$Value",
"transforms.unwrap.field": "after"
}
}
运行 Kafka Connect
Standalone 模式
# 启动 Standalone Worker
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties
配置文件 config/connect-standalone.properties:
# Worker 配置
bootstrap.servers=localhost:9092
# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 偏移量存储
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
Distributed 模式
# 启动 Distributed Worker
bin/connect-distributed.sh config/connect-distributed.properties
配置文件 config/connect-distributed.properties:
bootstrap.servers=localhost:9092
# Group ID(同一 Group 的 Worker 形成集群)
group.id=connect-cluster
# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 偏移量和配置存储
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
# 副本数
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1
管理 Connectors
REST API
# 查看 Connectors
curl http://localhost:8083/connectors
# 创建 Connector
curl -X POST -H "Content-Type: application/json" \
--data @config.json \
http://localhost:8083/connectors
# 查看 Connector 状态
curl http://localhost:8083/connectors/my-connector/status
# 重启 Connector
curl -X POST http://localhost:8083/connectors/my-connector/restart
# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/my-connector
# 查看 Connector 配置
curl http://localhost:8083/connectors/my-connector/config
# 更新 Connector 配置
curl -X PUT -H "Content-Type: application/json" \
--data @new-config.json \
http://localhost:8083/connectors/my-connector/config
常用命令
# 查看所有 Connector 状态
curl -s http://localhost:8083/ | jq
# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/pause
# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/resume
转换器
常用转换器
| 转换器 | 格式 | 说明 |
|---|---|---|
| JsonConverter | JSON | 支持 Schema |
| AvroConverter | Avro | 需要 Schema Registry |
| ProtobufConverter | Protobuf | 需要 Schema Registry |
| StringConverter | String | 简单字符串 |
Schema Registry 配置
# 使用 Avro 转换器
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
Single Message Transforms (SMT)
常用 Transform
# 添加字段
transforms=addField
transforms.addField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addField.static.field=source
transforms.addField.static.value=my-source
# 提取字段
transforms=extract
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field=after
# 重命名字段
transforms=rename
transforms.rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.rename.renames=oldName:newName
# 过滤字段
transforms=filter
transforms.filter.type=org.apache.kafka.connect.transforms.Filter$Value
transforms.filter.include=id,name,amount
# 正则替换
transforms=replace
transforms.replace.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replace.renames=created_at:timestamp
组合 Transform
transforms=unwrap,addField,replace
transforms.unwrap.type=io.confluent.connect.transforms.ExtractField$Value
transforms.unwrap.field=after
transforms.addField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addField.static.field=source
transforms.addField.static.value=db-source
transforms.replace.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.replace.field=timestamp
transforms.replace.format=yyyy-MM-dd'T'HH:mm:ss.SSSZ
错误处理和监控
错误处理策略
# 错误处理
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true
监控指标
# 查看 Connect Worker 指标
curl -s http://localhost:8083/metrics | grep kafka.connect
关键指标:
connector-count: Connector 数量connector-failed-count: 失败次数task-count: Task 数量task-running-count: 运行中的 Task
最佳实践
生产配置建议
# 高可用配置
offset.storage.replication.factor=3
config.storage.replication.factor=3
status.storage.replication.factor=3
# 性能配置
task.max=4
batch.size=2000
flush.size=10000
flush.interval.ms=30000
# 错误处理
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=3
部署建议
- 使用 Distributed 模式:支持高可用和扩展
- 配置足够的 Worker:根据 Task 数量
- 监控错误队列:及时发现数据问题
- 使用 SMT 清理数据:在写入前转换数据
小结
- Kafka Connect 是 Kafka 的数据集成工具
- Source 从外部系统读取数据到 Kafka
- Sink 从 Kafka 写入数据到外部系统
- Connector 定义数据迁移逻辑
- SMT 支持在传输过程中转换数据
下一步
接下来让我们学习 安全配置。