kafka 一台挂掉导致消息消费不了的问题处理

本贴最后更新于 1888 天前,其中的信息可能已经物是人非

问题现象

Kafka 集群有 3 个节点,其中一个节点挂掉了。这时候,部分 group 可以消费消息,但是有一部分 group 存在消息无法消费的情况。重启服务后正常。

按理说,Kafka 集群已经保证了高可用,为什么会出现一台 down 掉服务却不可用了呢?

网上搜了下,大概率是需要调整 kafka 的主题__consumer_offsets 的副本数量。

确认__consumer_offsets 的主题信息

执行:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:1     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets       Partition: 0    Leader: 12      Replicas: 12    Isr: 12
Topic: __consumer_offsets       Partition: 1    Leader: 87      Replicas: 87    Isr: 87
Topic: __consumer_offsets       Partition: 2    Leader: 11      Replicas: 11    Isr: 11
Topic: __consumer_offsets       Partition: 3    Leader: 12      Replicas: 12    Isr: 12
        ....

确认 ReplicationFactor 是 1,说明 topic 存在单点问题:kafka 会把消息按组的形式放到一个 partition 里,每个 group 消费一个 partition,比如上面的 12 挂了,partition 0 和 3 的消息就无法进行消息处理。

修改分区

  1. 创建副本调整的 json 文件,执行以下命令,注意最后的 replicas 是集群的 id 列表:
cat > increase-replication-factor.json <<EOF
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":1,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":2,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":3,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":4,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":5,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":6,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":7,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":8,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":9,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":10,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":11,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":12,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":13,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":14,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":15,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":16,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":17,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":18,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":19,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":20,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":21,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":22,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":23,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":24,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":25,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":26,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":27,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":28,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":29,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":30,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":31,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":32,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":33,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":34,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":35,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":36,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":37,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":38,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":39,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":40,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":41,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":42,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":43,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":44,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":45,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":46,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":47,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":48,"replicas":[11,12,87]},
{"topic":"__consumer_offsets","partition":49,"replicas":[11,12,87]}]
}
EOF
  1. 修改分区命令执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
  1. 验证下分区是否执行成功
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

结果:

Status of partition reassignment:
Reassignment of partition __consumer_offsets-22 completed successfully
Reassignment of partition __consumer_offsets-30 completed successfully
Reassignment of partition __consumer_offsets-8 completed successfully
....

检查 Topic 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets

发现已经成功了:

Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets       Partition: 0    Leader: 11      Replicas: 11,12,87      Isr: 12,11,87
Topic: __consumer_offsets       Partition: 1    Leader: 11      Replicas: 11,12,87      Isr: 87,11,12
Topic: __consumer_offsets       Partition: 2    Leader: 11      Replicas: 11,12,87      Isr: 11,12,87
Topic: __consumer_offsets       Partition: 3    Leader: 11      Replicas: 11,12,87      Isr: 12,11,87
        ....

验证可用性

将其中一台的 kafka kill 掉,尝试在应用中发送消息,看消费端这时候能否消费到。

2019-01-22 13:25:54.038 WARN 37504 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=bonus-coin] Connection to node -2 could not be established. Broker may not be available.
2019-01-22 13:26:34.360 INFO 37504 --- [ntainer#0-0-C-1] q.b.c.c.CounterChangeMessageCoinConsumer : coin:counter message received! topic = bonus_counter_change_xiajinlong2, key = THUMBS_UP, offset = 27, value = {"actionId":2,"actionLogId":186,"cancel":false,"countType":"THUMBS_UP","createDate":"2019-01-22","createTime":"2019-01-22T13:26:34.304","currentCancelNum":0,"currentNum":1,"eventId":2,"staticsType":"USER_COUNTER","userCounter":{"cancelNum":0,"countDate":"2019-01-22","countType":"thumbs_up","id":161,"num":1,"updateTime":"2019-01-22T13:26:34.303","userId":"xjl222"},"userId":"xjl222"} 
2019-01-22 13:26:34.466 INFO 37504 --- [ntainer#0-0-C-1] o.h.h.i.QueryTranslatorFactoryInitiator : HHH000397: Using ASTQueryTranslatorFactory
2019-01-22 13:26:34.698 INFO 37504 --- [ntainer#0-0-C-1] q.b.c.c.CounterChangeMessageCoinConsumer : coin:counter message success handled!
2019-01-22 13:26:34.698 INFO 37504 --- [ntainer#0-0-C-1] q.b.c.c.CounterChangeMessageCoinConsumer : coin:counter message received! topic = bonus_counter_change_xiajinlong2, key = BE_THUMBS_UP_ED, offset = 27, value = {"actionId":2,"actionLogId":186,"cancel":false,"countType":"BE_THUMBS_UP_ED","createDate":"2019-01-22","createTime":"2019-01-22T13:26:34.343","currentCancelNum":0,"currentNum":1,"eventId":3,"staticsType":"USER_COUNTER","userCounter":{"cancelNum":0,"countDate":"2019-01-22","countType":"be_thumbs_up_ed","id":162,"num":1,"updateTime":"2019-01-22T13:26:34.343","userId":"xjl242"},"userId":"xjl242"} 
2019-01-22 13:26:34.753 INFO 37504 --- [ntainer#0-0-C-1] q.b.c.c.CounterChangeMessageCoinConsumer : coin:counter message success handled!


2019-01-22 13:25:53.060 WARN 47832 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-2, groupId=bonus-exp] Connection to node -2 could not be established. Broker may not be available.
2019-01-22 13:26:34.359 INFO 47832 --- [ntainer#0-0-C-1] .q.b.e.c.CounterChangeMessageExpConsumer : exp:counter message received! topic = bonus_counter_change_xiajinlong2, key = THUMBS_UP, offset = 27, value = {"actionId":2,"actionLogId":186,"cancel":false,"countType":"THUMBS_UP","createDate":"2019-01-22","createTime":"2019-01-22T13:26:34.304","currentCancelNum":0,"currentNum":1,"eventId":2,"staticsType":"USER_COUNTER","userCounter":{"cancelNum":0,"countDate":"2019-01-22","countType":"thumbs_up","id":161,"num":1,"updateTime":"2019-01-22T13:26:34.303","userId":"xjl222"},"userId":"xjl222"} 
2019-01-22 13:26:34.460 INFO 47832 --- [ntainer#0-0-C-1] o.h.h.i.QueryTranslatorFactoryInitiator : HHH000397: Using ASTQueryTranslatorFactory
2019-01-22 13:26:34.691 INFO 47832 --- [ntainer#0-0-C-1] .q.b.e.c.CounterChangeMessageExpConsumer : exp:counter message success handled!
2019-01-22 13:26:34.691 INFO 47832 --- [ntainer#0-0-C-1] .q.b.e.c.CounterChangeMessageExpConsumer : exp:counter message received! topic = bonus_counter_change_xiajinlong2, key = BE_THUMBS_UP_ED, offset = 27, value = {"actionId":2,"actionLogId":186,"cancel":false,"countType":"BE_THUMBS_UP_ED","createDate":"2019-01-22","createTime":"2019-01-22T13:26:34.343","currentCancelNum":0,"currentNum":1,"eventId":3,"staticsType":"USER_COUNTER","userCounter":{"cancelNum":0,"countDate":"2019-01-22","countType":"be_thumbs_up_ed","id":162,"num":1,"updateTime":"2019-01-22T13:26:34.343","userId":"xjl242"},"userId":"xjl242"} 
2019-01-22 13:26:34.757 INFO 47832 --- [ntainer#0-0-C-1] .q.b.e.c.CounterChangeMessageExpConsumer : exp:counter message success handled!

通过日志确认消息在 kafka 挂掉其中部分节点时可以正常消费。

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1090 引用 • 3467 回帖 • 298 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    35 引用 • 35 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...