深刻解析Apache Pulsar系列(一):客户端消息确认

2021年11月21日 阅读数:13
这篇文章主要向大家介绍深刻解析Apache Pulsar系列(一):客户端消息确认,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

解析 Apache Pulsar —— 客户端消息确认

做者介绍:并发

腾讯云中间件专家工程师异步

Apache Pulsar PMC,《深刻解析Apache Pulsar》做者。微服务

目前专一于中间件领域,在消息队列和微服务方向具备丰富的经验。高并发

负责 CKafka、TDMQ的设计与开发工做,目前致力于打造稳定、高效和可扩展的基础组件与服务。性能

导语

在 Apache Pulsar 中,为了不消息的重复投递,消费者进行消息确认是很是重要的一步。当一条消息被消费者消费后,须要消费者发送一个Ack请求给Broker,Broker才会认为这条消息被真正消费掉。被标记为已经消费的消息,后续不会再次重复投递给消费者。在这篇文章中,咱们会介绍Pulsar中消息确认的模式,以及正常消息确认在Broker侧是如何实现的。学习

1 确认消息的模式

在了解Pulsar消息确认模式以前,咱们须要先了解一些前置知识 —— Pulsar中的订阅以及游标(Cursor)。Pulsar中有多种消费模式,如:Share、Key_share、Failover等等,不管用户使用哪一种消费模式都会建立一个订阅。订阅分为持久订阅和非持久订阅,对于持久订阅,Broker上会有一个持久化的Cursor,即Cursor的元数据被记录在ZooKeeper。Cursor以订阅(或称为消费组)为单位,保存了当前订阅已经消费到哪一个位置了。由于不一样消费者使用的订阅模式不一样,能够进行的ack行为也不同。整体来讲能够分为如下几种Ack场景:设计

(1)单条消息确认(Acknowledge)

和其余的一些消息系统不一样,Pulsar支持一个Partition被多个消费者消费。假设消息一、二、3发送给了Consumer-A,消息四、五、6发送给了Consumer-B,而Consumer-B又消费的比较快,先Ack了消息4,此时Cursor中会单独记录消息4为已Ack状态。若是其余消息都被消费,但没有被Ack,而且两个消费者都下线或Ack超时,则Broker会只推送消息一、二、三、五、6,已经被Ack的消息4不会被再次推送。code

(2)累积消息确认(AcknowledgeCumulative)

假设Consumer接受到了消息一、二、三、四、5,为了提高Ack的性能,Consumer能够不分别Ack 5条消息,只须要调用AcknowledgeCumulative,而后把消息5传入,Broker会把消息5以及以前的消息所有标记为已Ack。中间件

(3)批消息中的单个消息确认(Acknowledge)

这种消息确认模式,调用的接口和单条消息的确认同样,可是这个能力须要Broker开启配置项AcknowledgmentAtBatchIndexLevelEnabled。当开启后,Pulsar能够支持只Ack一个Batch里面的某些消息。假设Consumer拿到了一个批消息,里面有消息一、二、3,若是不开启这个选项,咱们只能消费整个Batch再Ack,不然Broker会以批为单位从新所有投递一次。前面介绍的选项开启以后,咱们能够经过Acknowledge方法来确认批消息中的单条消息。对象

(4)否认应答(NegativeAcknowledge)

客户端发送一个RedeliverUnacknowledgedMessages命令给Broker,明确告知Broker,当前Consumer没法消费这条消息,消息将会被从新投递。

并非全部的订阅模式下都能用上述这些ack行为,例如:Shared或者Key_shared模式下就不支持累积消息确认(AcknowledgeCumulative)。由于在Shared或者Key_Shared模式下,前面的消息不必定是被当前Consumer消费的,若是使用AcknowledgeCumulative,会把别人的消息也一块儿确认掉。订阅模式与消息确认之间的关系以下所示:

订阅模式 单条Ack 累积Ack 批量消息中单个Ack 否认Ack
Exclusive 支持 支持 支持 不支持
Shared 支持 不支持 支持 支持
Failover 支持 支持 支持 不支持
Key_Shared 支持 不支持 支持 支持

2 Acknowledge与AcknowledgeCumulative的实现

Acknowledge与AcknowledgeCumulative接口不会直接发送消息确认请求给Broker,而是把请求转交给AcknowledgmentsGroupingTracker处理。这是咱们要介绍的Consumer里的第一个Tracker,它只是一个接口,接口下有两个实现,一个是持久化订阅的实现,另外一个是非持久化订阅的实现。因为非持久化订阅的Tracker实现都是空,即不作任何操做,所以咱们只介绍持久化订阅的实现——PersistentAcknowledgmentsGroupingTracker。

在Pulsar中,为了保证消息确认的性能,并避免Broker接收到很是高并发的Ack请求,Tracker中默认支持批量确认,即便是单条消息的确认,也会先进入队列,而后再一批发往Broker。咱们在建立Consumer时能够设置参数AcknowledgementGroupTimeMicros,若是设置为0,则Consumer每次都会当即发送确认请求。全部的单条确认(individualAck)请求会先放入一个名为PendingIndividualAcks的Set,默认是每100ms或者堆积的确认请求超过1000,则发送一批确认请求。

消息确认的请求最终都是异步发送出去,若是Consumer设置了须要回执(Receipt),则会返回一个CompletableFuture,成功或失败都能经过Future感知到。默认都是不须要回执的,此时直接返回一个已经完成的CompletableFuture。

对于Batch消息中的单条确认(IndividualBatchAck),用一个名为PendingIndividualBatchIndexAcks的Map进行保存,而不是普通单条消息的Set。这个Map的Key是Batch消息的MessageId,Value是一个BitSet,记录这批消息里哪些须要Ack。使用BitSet能大幅下降保存消息Id的能存占用,1KB能记录8192个消息是否被确认。因为BitSet保存的内容都是0和1,所以能够很方便地保存在堆外,BitSet对象也作了池化,能够循环使用,不须要每次都建立新的,对内存很是友好。

以下图所示,只用了8位,就表示了Batch里面8条消息的Ack状况,下图表示EntryId为0、二、五、六、7的Entry都被确认了,确认的位置会被置为1:

对于累计确认(CumulativeAck)实现方式就更简单了,Tracker中只保存最新的确认位置点便可。例如,如今Tracker中保存的CumulativeAck位置为5:10,表明该订阅已经消费到LedgerId=5,EntryId=10的这条消息上了。后续又ack了一个5:20,则直接替换前面的5:10为5:20便可。

最后就是Tracker的Flush,全部的确认最终都须要经过触发flush方法发送到Broker,不管是哪一种确认,Flush时建立的都是同一个命令并发送给Broker,不过传参中带的AckType会不同。

3 NegativeAcknowledge的实现

否认应答和其余消息确认同样,不会当即请求Broker,而是把请求转交给NegativeAcksTracker进行处理。Tracker中记录着每条消息以及须要延迟的时间。Tracker复用了PulsarClient的时间轮,默认是33ms左右一个时间刻度进行检查,默认延迟时间是1分钟,抽取出已经到期的消息并触发从新投递。Tracker主要存在的意义是为了合并请求。另外若是延迟时间还没到,消息会暂存在内存,若是业务侧有大量的消息须要延迟消费,仍是建议使用ReconsumeLater接口。NegativeAck惟一的好处是,不须要每条消息都指定时间,能够全局设置延迟时间。

4 未确认消息的处理

若是消费者获取到消息后一直不Ack会怎么样?这要分两种状况,第一种是业务侧已经调用了Receive方法,或者已经回调了正在异步等待的消费者,此时消息的引用会被保存进UnAckedMessageTracker,这是Consumer里的第三个Tracker。UnAckedMessageTracker中维护了一个时间轮,时间轮的刻度根据AckTimeoutTickDurationInMs这两个参数生成,每一个刻度时间=AckTimeout / TickDurationInMs。新追踪的消息会放入最后一个刻度,每次调度都会移除队列头第一个刻度,并新增一个刻度放入队列尾,保证刻度总数不变。每次调度,队列头刻度里的消息将会被清理,UnAckedMessageTracker会自动把这些消息作重投递。

重投递就是客户端发送一个RedeliverUnacknowledgedMessages命令给Broker。每一条推送给消费者可是未Ack的消息,在Broker侧都会有一个集合来记录(PengdingAck),这是用来避免重复投递的。触发重投递后,Broker会把对应的消息从这个集合里移除,而后这些消息就能够再次被消费了。注意,当重投递时,若是消费者不是Share模式是没法重投递单条消息的,只能把这个消费者全部已经接收可是未Ack的消息所有从新投递。下图是一个时间轮的简单示例:

另一种状况就是消费者作了预拉取,可是还没调用过任何Receive方法,此时消息会一直堆积在本地队列。预拉取是客户端SDK的默认行为,会预先拉取消息到本地,咱们能够在建立消费者时经过ReceiveQueueSize参数来控制预拉取消息的数量。Broker侧会把这些已经推送到Consumer本地的消息记录为PendingAck,而且这些消息也不会再投递给别的消费者,且不会Ack超时,除非当前Consumer被关闭,消息才会被从新投递。Broker侧有一个RedeliveryTracker接口,暂时的实现是内存追踪(InMemoryRedeliveryTracker)。这个Tracker会记录消息到底被从新投递了多少次,每条消息推送给消费者时,会先从Tracker的哈希表中查询一下重投递的次数,和消息一并推送给消费者。

由上面的逻辑咱们能够知道,建立消费者时设置的ReceiveQueueSize真的要慎重,避免大量的消息堆积在某一个Consumer的本地预拉取队列,而其余Consumer又没有消息可消费。PulsarClient上能够设置启用ConsumerStatsRecorder,启用后,消费者会在固定间隔会打印出当前消费者的metrics信息,例如:本地消息堆积量、接受的消息数等,方便业务排查性能问题。

尾声

Pulsar中的设计细节很是多,因为篇幅有限,做者会整理一系列的文章进行技术分享,敬请期待。若是各位但愿系统性地学习Pulsar,能够购买做者出版的新书《深刻解析Apache Pulsar》。