Kafka:生产消费压测

1. 关键参数说明

1.1 Producer 参数

参数 说明 推荐值
thread 单机线程数 根据CPU核数调整
batch-size 数据批次大小 10000-100000
acks 主从同步策略 1(leader确认)/-1(全部确认)
message-size 单条消息大小 根据业务需求
compression-codec 压缩方式 snappy/lz4
partition 分区数 与线程数匹配
replication 副本数 2-3
linger.ms 发送间隔 0-100ms

1.2 Consumer 参数

参数 说明 推荐值
thread 单机线程数 根据分区数调整
fetch-size 抓取数据量 1048576(1MB)
partition 分区数 与消费能力匹配

1.3 Broker 参数

参数 说明 推荐值
num.replica.fetchers 副本抓取线程 ≤CPU核数+1
num.io.threads 磁盘IO线程 CPU核数×2
num.network.threads 网络IO线程 CPU核数+1
log.flush.interval.messages 刷盘消息数 10000
log.flush.interval.ms 刷盘时间间隔 1000

2. Producer 压测

2.1 batch-size 测试

1
2
3
4
5
6
./kafka-producer-perf-test.sh \
--topic test \
--num-records 100000000 \
--record-size 687 \
--producer-props bootstrap.servers=192.168.154.34:9092 batch.size=10000 \
--throughput 30000

2.2 acks 测试

1
2
3
4
5
6
7
8
# acks=0 (不等待响应)
./kafka-producer-perf-test.sh ... acks=0

# acks=1 (leader确认)
./kafka-producer-perf-test.sh ... acks=1

# acks=-1 (全部确认)
./kafka-producer-perf-test.sh ... acks=-1

2.3 压缩测试

1
2
3
4
5
6
7
8
9
10
11
# 无压缩
./kafka-producer-perf-test.sh ... compression.type=none

# GZIP压缩
./kafka-producer-perf-test.sh ... compression.type=gzip

# Snappy压缩
./kafka-producer-perf-test.sh ... compression.type=snappy

# LZ4压缩
./kafka-producer-perf-test.sh ... compression.type=lz4

2.4 消息大小测试

1
2
3
4
5
6
7
8
# 687字节消息
./kafka-producer-perf-test.sh ... --record-size 687

# 454字节消息
./kafka-producer-perf-test.sh ... --record-size 454

# 1500字节消息
./kafka-producer-perf-test.sh ... --record-size 1500

2.5 分区数测试

1
2
3
4
5
6
7
# 创建不同分区数的topic
kafka-topics.sh --create ... --partitions 1
kafka-topics.sh --create ... --partitions 3
kafka-topics.sh --create ... --partitions 5

# 生产测试
kafka-producer-perf-test.sh --topic test_2 ...

2.6 副本数测试

1
2
3
4
5
6
7
# 创建不同副本数的topic
kafka-topics.sh --create ... --replication-factor 1
kafka-topics.sh --create ... --replication-factor 2
kafka-topics.sh --create ... --replication-factor 3

# 生产测试
kafka-producer-perf-test.sh --topic test_r3 ...

3. Consumer 压测

3.1 线程数测试

1
2
3
4
5
# 单线程消费
./kafka-consumer-perf-test.sh ... --threads 1

# 多线程消费
./kafka-consumer-perf-test.sh ... --threads 5

3.2 fetch-size 测试

1
2
3
4
# 不同fetch-size测试
./kafka-consumer-perf-test.sh ... --fetch-size 1000
./kafka-consumer-perf-test.sh ... --fetch-size 5000
./kafka-consumer-perf-test.sh ... --fetch-size 10000

3.3 分区数测试

1
2
3
4
5
6
7
# 创建不同分区数的topic
kafka-topics.sh --create ... --partitions 3
kafka-topics.sh --create ... --partitions 5
kafka-topics.sh --create ... --partitions 7

# 消费测试
./kafka-consumer-perf-test.sh --topic test_kafka_part ...

3.4 fetch-threads 测试

1
2
3
4
# 不同fetch-threads测试
./kafka-consumer-perf-test.sh ... --num-fetch-threads 1
./kafka-consumer-perf-test.sh ... --num-fetch-threads 3
./kafka-consumer-perf-test.sh ... --num-fetch-threads 5

4. Broker 参数测试

4.1 num.replica.fetchers

4.2 num.io.threads

4.3 num.network.threads

4.4 interval.messages

4.5 interval.ms

5. 测试结论

5.1 Producer 结论

  1. batch-size 增大可提高吞吐但增加延迟
  2. acks=1 提供较好的吞吐和可靠性平衡
  3. LZ4/Snappy 压缩效率较高
  4. 分区数与生产者线程数匹配可获得最佳性能

5.2 Consumer 结论

  1. 消费者线程数应与分区数匹配
  2. 适当增大fetch-size可提高吞吐
  3. 分区数增加可提高并行消费能力

5.3 Broker 结论

  1. IO线程数建议为CPU核数2倍
  2. 网络线程数建议为CPU核数+1
  3. 副本抓取线程不宜过多