跳到主要内容

Kafka Connect

Kafka Connect 是 Kafka 的数据集成工具,用于在 Kafka 和外部系统之间传输数据。本章将详细介绍 Kafka Connect 的概念、使用方法和常见配置。

Kafka Connect 概述

什么是 Kafka Connect?

Kafka Connect 是一个可扩展的、可靠的数据管道工具,用于在 Kafka 和其他数据系统之间同步数据。

┌─────────────────────────────────────────────────────────────┐
│ Kafka Connect 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Source │ │ Sink │ │
│ │ Database │ │ Consumer │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Connect Workers │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Worker 1 │ │ Worker 2 │ │ │
│ │ │ (Task 1,2) │ │ (Task 3,4) │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Kafka │◀─────▶│ Kafka │ │
│ │ (Topic) │ │ (Topic) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Target │ │
│ │ Database │ │
│ └──────────────┘ │
│ │
│ Source: 外部系统 -> Kafka │
│ Sink: Kafka -> 外部系统 │
│ │
└─────────────────────────────────────────────────────────────┘

核心概念

概念说明
Connector连接器插件,决定数据如何传输
Task实际执行数据移动的组件
Worker运行 Connector 和 Task 的进程
Converter数据格式转换器
Transform数据转换器

Source Connectors

Source Connector 示例

// 自定义 Source Connector 需要实现以下接口
// SourceConnector - 生命周期管理
// SourceTask - 实际数据读取

常用 Source Connectors

Connector数据源说明
JDBC Source关系型数据库定期轮询数据库
DebeziumMySQL/PostgreSQLCDC 变更数据捕获
MQTT SourceMQTT Broker物联网数据
File Source文件系统监控文件变化

JDBC Source 配置

# connect-jdbc-source.properties
name=jdbc-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

# 数据库连接
connection.url=jdbc:mysql://localhost:3306/mydb
connection.user=root
connection.password=secret

# 源表配置
tables.pattern=mydb.*

# 查询模式
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id

# Topic 映射
topic.prefix=mydb-

# 轮询间隔
poll.interval.ms=1000
batch.size=100

Debezium MySQL 配置

{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz123",
"database.server.id": "184054",
"topic.prefix": "mysql",
"database.include.list": "mydb",
"table.include.list": "mydb\\.orders,mydb\\.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.mysql"
}
}

Sink Connectors

Sink Connector 示例

// 自定义 Sink Connector 需要实现以下接口
// SinkConnector - 生命周期管理
// SinkTask - 实际数据写入

常用 Sink Connectors

Connector目标系统说明
JDBC Sink关系型数据库批量写入
Elasticsearch SinkElasticsearch全文搜索
S3 SinkAmazon S3对象存储
HDFS SinkHDFSHadoop

JDBC Sink 配置

# connect-jdbc-sink.properties
name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

# Kafka Topic
topics=orders,customers

# 数据库连接
connection.url=jdbc:mysql://localhost:3306/mydb
connection.user=root
connection.password=secret

# 写入模式
insert.mode=upsert
pk.mode=record_key

# 字段映射
fields.whitelist=id,name,amount,created_at

# 批处理
batch.size=1000
flush.size=1000

Elasticsearch Sink 配置

{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "orders,customers",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "false",
"schema.ignore": "true",
"topic.index.map": "{\"orders\":\"orders-index\",\"customers\":\"customers-index\"}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.confluent.connect.transforms.ExtractField$Value",
"transforms.unwrap.field": "after"
}
}

运行 Kafka Connect

Standalone 模式

# 启动 Standalone Worker
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties

配置文件 config/connect-standalone.properties:

# Worker 配置
bootstrap.servers=localhost:9092

# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# 偏移量存储
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Distributed 模式

# 启动 Distributed Worker
bin/connect-distributed.sh config/connect-distributed.properties

配置文件 config/connect-distributed.properties:

bootstrap.servers=localhost:9092

# Group ID(同一 Group 的 Worker 形成集群)
group.id=connect-cluster

# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 偏移量和配置存储
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status

# 副本数
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1

管理 Connectors

REST API

# 查看 Connectors
curl http://localhost:8083/connectors

# 创建 Connector
curl -X POST -H "Content-Type: application/json" \
--data @config.json \
http://localhost:8083/connectors

# 查看 Connector 状态
curl http://localhost:8083/connectors/my-connector/status

# 重启 Connector
curl -X POST http://localhost:8083/connectors/my-connector/restart

# 删除 Connector
curl -X DELETE http://localhost:8083/connectors/my-connector

# 查看 Connector 配置
curl http://localhost:8083/connectors/my-connector/config

# 更新 Connector 配置
curl -X PUT -H "Content-Type: application/json" \
--data @new-config.json \
http://localhost:8083/connectors/my-connector/config

常用命令

# 查看所有 Connector 状态
curl -s http://localhost:8083/ | jq

# 暂停 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/pause

# 恢复 Connector
curl -X PUT http://localhost:8083/connectors/my-connector/resume

转换器

常用转换器

转换器格式说明
JsonConverterJSON支持 Schema
AvroConverterAvro需要 Schema Registry
ProtobufConverterProtobuf需要 Schema Registry
StringConverterString简单字符串

Schema Registry 配置

# 使用 Avro 转换器
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

Single Message Transforms (SMT)

常用 Transform

# 添加字段
transforms=addField
transforms.addField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addField.static.field=source
transforms.addField.static.value=my-source

# 提取字段
transforms=extract
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field=after

# 重命名字段
transforms=rename
transforms.rename.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.rename.renames=oldName:newName

# 过滤字段
transforms=filter
transforms.filter.type=org.apache.kafka.connect.transforms.Filter$Value
transforms.filter.include=id,name,amount

# 正则替换
transforms=replace
transforms.replace.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replace.renames=created_at:timestamp

组合 Transform

transforms=unwrap,addField,replace
transforms.unwrap.type=io.confluent.connect.transforms.ExtractField$Value
transforms.unwrap.field=after
transforms.addField.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addField.static.field=source
transforms.addField.static.value=db-source
transforms.replace.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.replace.field=timestamp
transforms.replace.format=yyyy-MM-dd'T'HH:mm:ss.SSSZ

错误处理和监控

错误处理策略

# 错误处理
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true

监控指标

# 查看 Connect Worker 指标
curl -s http://localhost:8083/metrics | grep kafka.connect

关键指标:

  • connector-count: Connector 数量
  • connector-failed-count: 失败次数
  • task-count: Task 数量
  • task-running-count: 运行中的 Task

最佳实践

生产配置建议

# 高可用配置
offset.storage.replication.factor=3
config.storage.replication.factor=3
status.storage.replication.factor=3

# 性能配置
task.max=4
batch.size=2000
flush.size=10000
flush.interval.ms=30000

# 错误处理
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.topic.replication.factor=3

部署建议

  1. 使用 Distributed 模式:支持高可用和扩展
  2. 配置足够的 Worker:根据 Task 数量
  3. 监控错误队列:及时发现数据问题
  4. 使用 SMT 清理数据:在写入前转换数据

小结

  1. Kafka Connect 是 Kafka 的数据集成工具
  2. Source 从外部系统读取数据到 Kafka
  3. Sink 从 Kafka 写入数据到外部系统
  4. Connector 定义数据迁移逻辑
  5. SMT 支持在传输过程中转换数据

下一步

接下来让我们学习 安全配置