Kafka:常用命令

1. Topic管理

创建Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 基础创建命令
./kafka-topics.sh --create \
--bootstrap-server 192.168.154.34:9092,192.168.154.35:9092,192.168.154.36:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test

# 带更多配置的创建
./kafka-topics.sh --create \
--bootstrap-server 192.168.154.34:9092 \
--replication-factor 3 \
--partitions 6 \
--topic important-data \
--config retention.ms=172800000 \
--config segment.bytes=1073741824

查看Topic详情

1
2
3
4
5
6
7
8
# 查看单个Topic详情
./kafka-topics.sh --describe \
--bootstrap-server 192.168.154.34:9092 \
--topic test

# 查看所有Topic列表
./kafka-topics.sh --list \
--bootstrap-server 192.168.154.34:9092

修改Topic配置

1
2
3
4
5
6
7
8
9
10
11
12
# 增加分区数量
./kafka-topics.sh --alter \
--bootstrap-server 192.168.154.34:9092 \
--topic test \
--partitions 3

# 修改Topic配置参数
./kafka-configs.sh --alter \
--bootstrap-server 192.168.154.34:9092 \
--entity-type topics \
--entity-name test \
--add-config retention.ms=86400000

2. 数据生产与消费

生产消息

1
2
3
4
5
./kafka-console-producer.sh \
--broker-list 192.168.154.34:9092,192.168.154.35:9092,192.168.154.36:9092 \
--topic test \
--property "parse.key=true" \
--property "key.separator=:"

消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 从头开始消费
./kafka-console-consumer.sh \
--bootstrap-server 192.168.154.34:9092,192.168.154.35:9092,192.168.154.36:9092 \
--topic test \
--from-beginning

# 消费特定分区
./kafka-console-consumer.sh \
--bootstrap-server 192.168.154.34:9092 \
--topic test \
--partition 0 \
--offset latest

# 带过滤条件的消费
./kafka-console-consumer.sh \
--bootstrap-server 192.168.154.34:9092 \
--topic test \
--from-beginning \
--property print.key=true \
| grep "important_key" > filtered.log

3. 消费者组管理

查看消费者组

1
2
3
4
5
6
7
8
# 列出所有消费者组
./kafka-consumer-groups.sh --list \
--bootstrap-server 192.168.154.34:9092

# 查看消费者组详情
./kafka-consumer-groups.sh --describe \
--bootstrap-server 192.168.154.34:9092 \
--group my-consumer-group

重置消费偏移量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 重置到最早偏移量
./kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server 192.168.154.34:9092 \
--group my-group \
--topic test \
--to-earliest \
--execute

# 重置到指定时间点
./kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server 192.168.154.34:9092 \
--group my-group \
--topic test \
--to-datetime "2023-01-01T00:00:00.000" \
--execute

4. 数据统计与监控

获取Topic偏移量信息

1
2
3
4
./kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list 192.168.154.34:9092 \
--topic test \
--time -1 # -1表示最新,-2表示最早

查看消息积压情况

1
2
3
4
5
./kafka-consumer-groups.sh --describe \
--bootstrap-server 192.168.154.34:9092 \
--group my-group \
| awk '{print $1,$2,$3,$4,$5,$6}' \
| column -t

5. Topic删除

删除Topic

1
2
3
4
5
6
7
8
9
./kafka-topics.sh --delete \
--bootstrap-server 192.168.154.34:9092 \
--topic test

# 强制立即删除(需要配置delete.topic.enable=true)
./kafka-topics.sh --delete \
--bootstrap-server 192.168.154.34:9092 \
--topic test \
--if-exists

6. 高级操作

查看消息详情

1
2
3
./kafka-dump-log.sh \
--files /tmp/kafka-logs/test-0/00000000000000000000.log \
--print-data-log

平衡集群分区

1
2
3
4
./kafka-reassign-partitions.sh \
--bootstrap-server 192.168.154.34:9092 \
--reassignment-json-file reassign.json \
--execute

注意事项

  1. 生产环境操作Topic时建议使用--bootstrap-server而非--zookeeper参数
  2. 重要操作前建议先使用--verify--dry-run参数测试
  3. 修改分区数只能增加不能减少
  4. 删除Topic操作不可逆,需谨慎执行
  5. 建议对重要Topic设置适当的保留策略(retention.ms)