一、背景
部门的开发同学最近在开发一个活动的过程中,需要关注大量的应用后台逻辑,捕捉各种事件的触发。在设计时打算采用Kafka消息队列进行业务逻辑的解耦,这样活动开发和后台开发同学的工作就分离开了。但是使用的同学不是很熟悉其原理,担心以下几个问题:
我什么业务场景下使用消息队列?
我发消息的时候,需要等ack吗?
我发了消息之后,消费者一定会收到吗?
申请腾讯云的Kafka实例后,各种参数怎么设置呀?
遇到各种故障时,我的消息会不会丢?
消费者侧会收到多条消息嘛?消费者svr重启后消息会丢失吗?
这些问题都很正常,在开始接触和使用时总会有这样或那样的问题。一般情况下,不做了解,使用各种默认的推荐值,也是可以work的。但是我们要优雅的提升自己的姿(知)势(识)。学习其背后的原理,至少在遇到一般的问题时,能够分析和处理问题,做到心中有数。
二、什么时候使用消息队列
简单来说,有3个关键词, 异步、削峰、解耦。可以理解为:
我做完了,后面的我不管了;
工作太多了,先放一放我慢慢处理;
怎么产生的我不管/怎么处理我不管。
以下图为例:
用户提交评论中, 写入数据库后,存在需要捕捉评论事件的多个逻辑步骤。如果在接口处理过程中,顺序的处理不同的步骤,非常繁琐。
我们可以批量的通知各个步骤(异步),无需返回直接处理当次的支付其他逻辑(解耦)。看起来就清爽多了。另外,消息队列也可以作为缓存暂存发出的消息,不再需要考虑调用各个步骤时时延逻辑的异常场景。 附注:本文以讲解Kafka中的可靠性设计为例,其它消息队列的选型暂不涉及。
三、Kafka基本概念
在回答文章前面的问题之前,需要简单介绍一下各种概念。 Kafka从拓扑上分有如下角色:
Consumer: 消费者,一般以API形式存在于各个业务svr中;
Producer: 生产者,一般以API形式存在于各个业务svr中;
Kafka broker: Kafka集群中的服务器,topic里的消息数据存在上面。
Producer采用发送push的方式将消息发到broker上,broker存储后。由consumer采用pull模式订阅并消费消息。
如图所示,Kafka从存储结构上,有如下角色:
Topic:Kafka处理的消息的逻辑大类集合,可以理解为表。写入不同的topic即写入不同的表。
Partition: Topic下的物理分组,1个topic可以分为多个partition, 每个partition是一个有序的队列(大文件)。Partition中每一条消息都有一个有序的offset。
Msg: 消息,通信的基本单位。每个msg在topic下的不同partiton仅有一份,在partition中有一个唯一的offset用于定位。
Replica: 副本,partition的数据冗余备份,用于实现分布式的数据可靠性,但引入了不同副本间的数据一致性问题,带来了一定的复杂度。
Leader/follower: replica的角色,leader replica 用来提供该partition的读写服务。Follower 不停的从leader侧同步写入的消息。它们之间的消息状态采用一致性策略来解决。
四、Kakfa 的存储格式
为了方便后文更好的理解broker上的消息状态一致性策略,需要再简单介绍一下消息的存储格式。
当Producer 发送一条消息到broker中, 会根据分配 partition 规则选择被存储到哪一个 partition, 如果 partition 规则设置的合理,消息会均匀的分布到不同的 partition 里,这样就实现了水平扩展。
Pruducer可以认为partition是一个大的串行文件,msg存储时被分配一个唯一的offset。Offset是一个逻辑意义上的偏移,用于区分每一条消息。
而partition本身作为文件,可以有多个多个副本replica(leader/follower)。多个replica分布在在不同的broker上。
如果要回答如何在broker之间保证存储的消息和状态不会丢失,就要回答broker之间的各个replica的消息状态一致性如何解决,包括producer已经提交了哪些消息,哪些消息已经落地,哪些消息在节点故障后不会丢失。
五、异步发送时的消息可靠性保证
回到文章开头提到的几个问题,在使用Kafka消息队列做异步发送时,如何保证消息的可靠性?这里可以分为3个部分讲解。
1. 生产者的可靠性保证
回答生产者的可靠性保证,即回答:
发消息之后有没有ack?
发消息收到ack后,是不是消息就不会丢失了?
而Kafka通过配置来指定producer生产者在发送消息时的ack策略:
- Request.required.acks=-1 (全量同步确认,强可靠性保证);
- Request.required.acks = 1(leader 确认收到, 默认);
Request.required.acks = 0 (不确认,但是吞吐量大)。
如果想实现Kafka配置为 CP(Consistency & Partition tolerance) 系统, 配置需要如下:
request.required.acks=-1min.insync.replicas = ${N/2 + 1}unclean.leader.election.enable = false
如图所示,在acks=-1 的情况下,新消息只有被ISR中的所有 follower(f1和f2, f3) 都从leader复制过去才会回ack, ack后,无论那种机器故障情况(全部或部分), 写入的msg4,都不会丢失, 消息状态满足一致性C 要求。
正常情况下,所有follower复制完成后,leader回producer ack。
异常情况下,如果当数据发送到 leader后部分副本(f1和f2同步), leader挂了?此时任何 follower 都有可能变成新的 leader, producer 端会得到返回异常,producer 端会重新发送数据,但这样数据可能会重复(但不会丢失), 暂不考虑数据重复的情况。 min.insync.replicas 参数用于保证当前集群中处于正常同步状态的副本follower数量,当实际值小于配置值时,集群停止服务。如果配置为 N/2+1, 即多一半的数量,则在满足此条件下,通过算法保证强一致性。当不满足配置数时,牺牲可用性即停服。 异常情况下,leader挂掉,此时需要重新从follower选举leader。可以为f2或者f3。
如果选举f3为新leader, 则可能会发生消息截断,因为f3还未同步msg4的数据。Kafka通过unclean.leader.election.enable来控制在这种情况下,是否可以选举f3为leader。旧版本中默认为true,在某个版本下已默认为false,避免这种情况下消息截断的出现。
通过ack和min.insync.replicas和unclean.leader.election.enable的配合,保证在Kafka配置为CP系统时,要么不工作,要么得到ack后,消息不会丢失且消息状态一致。
min.insync.replicas 参数默认值为1,即满足高可用性,只要有1台能工作即可。但此时可工作的broker状态不一定正确。
如果想实现Kafka配置为AP(Availability & Partition tolerance)系统:
request.required.acks=1min.insync.replicas = 1unclean.leader.election.enable = false当配置为acks=1 时,即leader接收消息后回ack,这时会出现消息丢失的问题:如果 leader接受到了 第4 条消息,此时还没有同步到 follower中,leader机器挂了,其中一个follower被选为 leader, 则 第 4 条消息丢失了。
当然这个也需要unclean.leader.election.enable参数配置为false来配合。但是leader回ack的情况下,follower未同步的概率会大大提升。
通过producer策略的配置和Kafka集群通用参数的配置,可以针对自己的业务系统特点来进行合理的参数配置,在通讯性能和消息可靠性下寻得某种平衡。 2. Broker的可靠性保证
消息通过producer发送到broker之后,还会遇到很多问题:
Partition leader 写入成功,follower什么时候同步?
Leader写入成功,消费者什么时候能读到这条消息?
Leader写入成功后,leader重启,重启后消息状态还正常嘛?
Leader重启,如何选举新的leader?
这些问题集中在:消息落到broker后,集群通过何种机制来保证不同副本间的消息状态一致性。
Kafka通过分区的多副本策略来解决消息的备份问题。通过HW和LEO的标识,来对应ISR和OSR的概念,用于类比共识性算法解决数据同步一致性的问题。 分区多副本即前文提到的Partition 的replica(副本) 分布在跟 partition 不相同的机器上, 通过数据冗余保证故障自动转移。而不同副本的状态形成了ISR和OSR的概念。
ISR : leader 副本保持一定同步的 follower 副本, 包括 leader 副本自己,叫 In Sync Replica;
AR: 所有副本 (replicas) 统称为 assigned replicas, 即 AR;
OSR: follower 同 leader 同步数据有一些延迟的节点。
ISR是Kafka的同步策略中独有的概念,区别于raft等共识性算法。Raft要求集群中要求N/2+1台正常,其在这种条件下通过复杂的算法保证选举出的新leader符合一致性状态。
而kafka的ISR同步策略,通过ISR列表的可伸缩性和HW&LEO更新,一定程度上解决了消息一致性和吞吐性能之间的平衡。 ISR通过HW和LEO的概念表示消息的同步状态:
HW: Highwatermark, 俗称高水位,它表示了一个特定的消息偏移量(offset), 在一个parttion中consumer只能拉取这个 offset 之前的消息(此 offset 跟 consumer offset 不是一个概念) ;
LEO: LogEndOffset, 日志末端偏移量, 用来表示当前日志文件中下一条写入消息的offset;
leader HW: 该Partititon所有副本的LEO最小值;
follower HW: min(follower自身LEO 和 leader HW);
Leader HW = 所有副本LEO最小值;
- Follower HW = min(follower 自身 LEO 和 leader HW);
Leader不仅保存了自己的HW & LEO,还保存了远端副本的HW & LEO。简单来说,每个副本都有HW和LEO的存储,而leader不但保存自己的HW和LEO, 还保存了每个远端副本的LEO,用于在自身的HW更新时计算值。
可以看出由于LEO远端存储的特性,其实会导致副本真实的LEO和leader存储的LEO有短暂的数值差异,这会带来一些问题,下文也会展开讲述。
HW和LEO的更新策略如下:
一次完整的写请求的HW / LEO更新流程如下图所示:
(1)初始状态 Leader 所有的 HW&LEO都为0, follower 与 leader 建立连接,follower fetch leader, follower 所有 HW & LEO 都为0
(2)Follower 第一次 fetch
Producer 发来一条消息到 leader, 此时 leader 的 LEO=1, follower 带着自己的 HW&LEO(都为0) 开始 fetch, leader的 HW=min(all follower LEO)=0, leader 记录follower的LEO=0;follower 拉取到一条消息,带着消息和leader的 HW(0)&LEO(1)返回自身更新自己的LEO=1, 更新自己的HW=min(follower 自身 LEO(1) 和 leader HW(0))=0
(3)Follower 第二次fetch
Follower带着自己的 HW(0)&LEO(1) 去请求leader .此时leader 的HW更新为1,leader 保存的follower的 LEO更新为1,带着leader 的 HW(1)&LEO(1)返回自身,更新自己的 HW&LEO
此时回到刚才提到的问题,这种HW和LEO更新策略有个很明显的问题,即follower的HW更新需要follower的2轮fetch中的leader返回才能更新,而Leader的HW已更新。
在这之间,如果follower和leader的节点发生故障,则follower的HW和leader的HW会处于不一致状态,带来比较多的一致性问题。比如如下场景:
Leader更新完分区HW后,follower HW还未更新,此时follower重启;
Follower重启后,LEO设置为之前的follower HW值(0), 此时发生消息截断(临时状态);
Follower重新同步leader, 此时leader宕机,则不选举则不可用;
Follower被选举为leader, 则msg 1 永久丢失了。
在Kafka配置为AP系统的情况下,由于min.insync.replicas为1,这种重启后follower发生截断发生的概率会大大提升, 而在多个副本存在的情况下,情况可能还会更加糟糕。
而kafka新版本为了解决这个HW&LEO的同步机制更新缺陷,引入了Epoch的概念。
Leader epoch 分两部分组成:
Epoch : 版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
Leader epoch(1, 120) 说明这个leader 的版本号为1,版本的起始位置是 第120条消息开始的 Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。
当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。
这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
示意图如下:
Kafka通过ISR的同步机制及优化策略,用 HW & LEO的方式很好的确保了数据不丢失以及吞吐率。而ISR的管理最终都会反馈到Zookeeper上,其实现和leader的选举策略不再赘述。
六、Consumer 的可靠性策略
Consumer的可靠性策略集中在consumer的投递语义上,即:
何时消费,消费到什么?
消费是否会丢?
消费是否会重复?
这些语义场景,可以通过Kafka消费者的部分参数进行配置,简单来说有以下3种场景:
1. AutoCommit(at most once, commit后挂,实际会丢)
enable.auto.commit = trueauto.commit.interval.ms
配置如上的consumer收到消息就返回正确给 brocker, 但是如果业务逻辑没有走完中断了,实际上这个消息没有消费成功。
这种场景适用于可靠性要求不高的业务。其中auto.commit.interval.ms代表了自动提交的间隔。比如设置为1s提交1次,那么在1s内的故障重启,会从当前消费offset进行重新消费时,1s内未提交但是已经消费的msg, 会被重新消费到。
2. 手动Commit(at least once, commit前挂,就会重复, 重启还会丢)
enable.auto.commit = false
配置为手动提交的场景下,业务开发者需要在消费消息到消息业务逻辑处理整个流程完成后进行手动提交。
如果在流程未处理结束时发生重启,则之前消费到未提交的消息会重新消费到,即消息显然会投递多次。此处应用与业务逻辑明显实现了幂等的场景下使用。
特别应关注到在golang中sarama库的几个参数的配置:
sarama.offset.initial (oldest, newest)offsets.retention.minutes
intitial = oldest代表消费可以访问到的topic里的最早的消息,大于commit的位置,但是小于HW。同时也受到broker上消息保留时间的影响和位移保留时间的影响。不能保证一定能消费到topic起始位置的消息。
如果设置为newest则代表访问commit位置的下一条消息。如果发生consumer重启且autocommit没有设置为false, 则之前的消息会发生丢失,再也消费不到了。在业务环境特别不稳定或非持久化consumer实例的场景下,应特别注意。
一般情况下, offsets.retention.minutes为1440s。
3. Exactly once(很难,需要msg持久化和commit是原子的) 消息投递且仅投递一次的语义是很难实现的。首先要消费消息并且提交保证不会重复投递,其次提交前要完成整体的业务逻辑关于消息的处理。
在Kafka本身没有提供此场景语义接口的情况下,这几乎是不可能有效实现的。一般的解决方案,也是进行原子性的消息存储,业务逻辑异步慢慢的从存储中取出消息进行处理。