消息队列那么多,为何建议深刻了解下RabbitMQ?

2021年11月25日 阅读数:1
这篇文章主要向大家介绍消息队列那么多,为何建议深刻了解下RabbitMQ?,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

image-20211010214319029

你为啥要在项目中选择xxx消息中间件?html

提起消息队列,也许你的脑海里会不自觉地蹦出好多概念:JMS、Kafka、RocketMQ、AMQP、RabbitMQ、ActiveMQ、Pulsar、Redis Stream...若是你的项目中刚好用到了其中的一个消息中间件,那么你出去面试或者与同事交流技术的时候,对方很大几率会问你:为啥要选择xxx消息中间件?java

若是你恰好只了解你正在用的消息中间件,那么你只能回答:由于只会xxx...面试

image-20211006183323991

不...这绝对不是你想要的结局!在对方准备看你的笑话以前,你也许能够主动发起还击,把全部的框架的诞生背景、优缺点,适用场景等都说一遍,从概念到原理,从特性到源码。在说完了以后,为了避免让对方感受到尴尬,你应该故意停顿片刻,暗示对方本身不想再聊这个话题了,好让对方有喘息的机会,让他趁早切换话题,给他一个台阶下。算法

image-20211006183625852

为了让本身能有如此实力,你务必要对这些常见的消息中间件有比较深刻的了解。数据库

咱们先来看看这些技术的发展史。编程

MQ技术发展史:数组

以下图所示:缓存

image-20211010220759711

  • 操做系统中的消息队列:在操做系统里面,咱们能够经过消息队列实现两个或多个进程/线程之间的异步通讯,发送方和接收方不须要同时与消息队列交互。放置在队列中的消息会一直存储,直到接收者取回它们。消息队列对能够在单个消息中传输的数据大小和队列可以存储的消息数量上限有隐式或显式限制;
  • TIB:1983年,一位来自孟买的工程师Vivek Ranadive提出了一个问题:为何没有通用的软件总线--一种通讯系统,能够将信息从一个应用程序传递到另外一个应用程序呢?最终,Vivek Ranadive创办了Teknekron公司,在1985年实现了第一个消息中间件:Teknekron的The Information Bus(TIB);
  • MQSeries:TIB受到了企业的欢迎,同时这也被IBM看在眼里,因而他们决定研发本身的消息队列软件。最终,在1993年,IBM推出了面向消息中间件的产品,MQSeries,2002年改名为WebSphereMQ,2014年改名为IBM MQ;
  • MSMQ:这么好的发财机会怎么能错过呢,因而微软也加入了竞争队伍,并在1997年发布了自家的消息中间件产品:MSMQ;
  • JMS:这些巨头推出的消息中间件价格昂贵,通常只应用于大型组织机构。而且,因为商业壁垒,MQ厂商们只关注于应用互通的问题,而不会去考虑建立标准来实现不一样的MQ产品之间的互通。为了打破这个壁垒,因而JMS诞生了;
    • JMS,即Java消息服务(Java Message Service),是由Sun公司早期提出的消息标准,为Java提供统一的消息操做,是Java平台中关于面向消息中间件的接口;
    • JMS是一种与厂商无关的API,相似于JDBC(Java Database Connectivity),用来访问消息系统和收发消息的编程API;
    • 不过JMS毕竟是在真实的消息中间件API上面作了一层适配,各个消息中间件的实现仍旧是没有一个规范,最终会暴露出问题,使得程序更加混乱与脆弱。此刻,咱们须要的是一种消息通讯标准;
  • AMQP:在2004至2006年,摩根大通在着手设计AMQP,最终,与其余公司(Cisco, Red Hat, iMatix等)成立了AMQP工做组,愈来愈多的公司参与进来,最终在2006年制定了AMQP的公开标准,由此,AMQP登上了历史的舞台,你们能够基于此标准来实现消息中间件,不受任何开发语言、产品等的条件限制;
  • RabbitMQ:RabbitMQ最初就是一个实现了AMQP的消息中间件,本文咱们会详细介绍这个家伙;
  • Kafka: Kafka是一种分布式流式系统,被设计为可以做为一个统一平台来处理大型公司可能拥有的全部实时数据馈送。为此,它必须具备高吞吐量才能支持大容量事件流,例如实时日志聚合;
    • RabbitMQ是基于队列和交换器的消息中间件,而Kafka是使用分区事务日志来实现存储层的分布式流式系统
    • Kafka不存在队列,而是按照主题存储记录集,而且为每一个主题维护一个消息分区日志;
    • Kafka中消费者本身维护消息的消费偏移量,支持持久订阅和临时订阅(重启后丢失偏移);
    • Kafka中的消息是按照预设的时间进行持久化的,而不是根据消费状态;
    • Kafka的设计之初就考虑到了高性能,经过如下方式实现:
      • 利用分区实现并行处理;
      • 使用磁盘顺序写,以及充分利用页缓存;
      • 零拷贝技术;
      • 批处理技术,数据压缩等;
  • RocketMQ:随着阿里巴巴的电商业务不断发展,须要一款更高性能的消息中间件,RocketMQ就是这个业务背景的产物。RocketMQ是一个分布式消息中间件,具备低延迟、高性能和可靠性、万亿级别的容量和灵活的可扩展性,它是阿里巴巴于2012年开源的第三代分布式消息中间件。RocketMQ经历了多年双十一的洗礼,在可用性、可靠性以及稳定性等方面都有出色的表现。值得一提的是,RocketMQ最初就是借鉴了Kafka进行改造开发而来的,因此熟悉Kafka的朋友,会发现RocketMQ的原理和Kafka有不少类似之处;
  • Pulsar:在Yahoo,为了追求大集群多租户、稳定可靠的 IO 服务质量、百万级 Topic、跨地域复制等需求,Pulsar 应运而生,以弥补Kafka在这方面的不足,Pulsar的优势:
    • 应用场景:Pulsar 对用户来讲提供了统一的消息模型,能够知足各类MQ;
    • 架构优点:有存储计算分离的云原生架构的优点,使用BookKeeper做为Pulsar的存储层。在 Broker 层不存储任何数据,具备更高的可用性、更灵活的扩容和管理,避免数据的 reblance 和 catch-up;
    • 社区活跃度:Pulsar 用户和贡献者数量也在快速增长...

Pub/Sub模式以及第一个消息中间件诞生的故事安全

1985年在高盛,Ranadive 找到了他的第一个客户,并肯定了他的软件总线要解决的问题:金融交易。服务器

当时,一个交易员的摊位挤满了不一样的终端,用于提供交易员完成工做所需的每种类型的信息。 Teknekron 看到了替换全部这些终端及其应用程序的机会:经过Ranadive软件总线取而代之,只需保留一个工做站便可,其显示程序能够做为消费者插入Teknekron软件总线,并容许交易者“订阅”其想要查看的信息。 Pub/Sub 诞生了,世界上第一个现代消息队列软件也诞生了:Teknekron的The Information Bus(TIB)。

而RabbitMQ做为传统的消息中间件,被大量应用于各类古老的项目,你第一个要拿下的就是它了,本文将带您从如下各个方面了解RabbitMQ相关知识:

  • 什么是AMQP?
  • 常见的交换机类型有哪些?
  • 如何实现消息的持久化?
  • RabbitMQ的链接复用有啥优点?
  • RabbitMQ的消息ACK机制是如何实现的?
  • RabbitMQ消息持久化机制性能如何?
  • 如何避免消费过载的问题?
  • 如何提升手动ACK签收的效率?
  • 何时须要让消息重回队列?
  • 如何保证消息的顺序消费?
  • 如何实现可靠的消息投递?

关于其余的消息中间件,我会在下篇文章中继续给你们分享。

RabbitMQ是一种使用Erlang语言编写的开源的消息中间件,最初实现了AMQP(高级消息队列协议),后来经过插件架构进行了扩展,支持STOMP(面向流文本的消息传递协议)、MQTT(MQ遥控传输)等协议。

详细关于RabbitMQ支持的消息协议,参考官网:Which protocols does RabbitMQ support?[1]

更多内容欢迎关注公众号Java架构杂谈,或者个人博客IT宅itzhai.com

1. RabbitMQ优点

RabbitMQ支持多种客户端,如Python、Java、.NET、C、Ruby等,在易用性、扩展性、高可用性等方面表现都不错,而且能够与SpringAMQP完美整合,API丰富易用。

RabbitMQ程序健壮、稳定、易用,跨平台、支持多种语言,管理界面简单易用,功能全面,文档相对比较齐全,社区活跃。

2. AMQP简介

AMQP,全称为:Advanced Message Queuing Protocol,高级消息队列协议,是面向消息中间件的开放标准的二进制应用层协议。AMQP的核心特性是:面向消息、排队、路由(包括点对点和发布订阅)、可靠性和安全性。这些功能使其很是适合在应用程序之间传递业务消息,AMQP还能够用做物联网IoT协议。

目前,AMQP 1.0已经被批准为国际标准,具体规范文档,能够进一步阅读:OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0[2]

而RabbitMQ最初是为了支持AMQP 0-9-1[1:1]而开发的,所以,该协议是RabbitMQ Broker支持的核心协议。

下面咱们就简要介绍下AMQP 0-9-1协议[3]。这部份内容,概念会比较多,稍微有点枯燥,可是能够说RabbitMQ就是按照这个协议去实现的,因此熟悉这个协议很重要。

完整的AMQP文档能够从这里下载:AMQP Working Group 0-9-1[4]

2.1 AMQP模型概述

2.1.1 AMQP 0-9-1

AMQP 0-9-1 是一个二进制协议,定义了很是强大的消息传递语义。对于客户端来讲,这是一个相对容易实现的协议,所以有大量客户端库可用于许多不一样的编程语言和环境。

AMQP 0-9-1一般划分为两层:

image-20211010221214823

  • 功能层(Functional Layer):定义了一组命令(按功能作不一样的分类),提供给应用程序,用于支撑消息相关的工做;
  • 传输层(Transport Layer):传输层将这些方法从应用程序传送到服务器并返回,并处理通道多路复用、成帧、内容编码、心跳、数据表示和错误处理。

能够在不改变协议的应用程序相关功能的状况下用任意的传输协议来替换传输层,也能够将传输层用于不一样的高级协议。

2.1.2 AMQP 0-9-1模型简介

以下图,消息Broker(代理)从消息发布者(发布消息的应用程序,也称为生产者)接收消息并将它们路由到消费者(处理消息的应用程序)。

因为AMQP是一个网络协议,所以,生产者消费者代理均可以部署在不一样的机器上。AMQP模型以下图所示:

image-20211010221148759

消息发布到交换机(exchanges)(一般将其比做邮局或邮箱),而后使用称为绑定(Bindings)的规则将消息副本分发到队列(queues)。而后代理(brokers)要么将消息传递(deliver)给订阅队列的消费者(consumers),要么消费者主动按需从队列中获取(fetch)/拉取(pull)消息。

消息元数据:发布消息的时候,发布者能够指定各类消息元数据(消息属性),其中一些元数据可能由代理使用,其他的元数据仅由接收消息的应用程序使用。

消息确认:因为网络是不可靠的,而且应用程序可能没法正确处理消息,所以AMQP 0-9-1模型有一个消息确认的概念:当消息传递给消费者时,消费者会自动或者由开发人员在应用程序中手动指定通知代理Broker,代理只会在收到消息(或消息组)的通知时从队列中彻底删除该消息。

死信队列:在某些状况下,例如,当消息没法路由时,消息可能会返回给发布者、或者丢弃掉、或者将其放入所谓的死信队列(若是代理扩展支持),发布者经过使用某些参数来选择如何处理此类状况。

队列(queues)/交换机(exchanges)和绑定(bindings)统称为AMQP实体。

2.1.3 AMQP 0-9-1 是一个可编程的协议

AMQP 0-9-1是一种可编程的协议,由于AMQP 0-9-1实体路由方案由应用程序自己定义,而不是代理管理员。所以AMQP制定了一些规定来实现这些协议操做:

  • 声明队列和交换机;
  • 定义他们之间的绑定;
  • 订阅队列等。

这为应用程序开发人员提供了很大的自由,但也要求他们了解潜在的定义冲突。在实践中,定义冲突不多见,一般表示为配置错误。

应用程序声明它们须要的AMQP 0-9-1实体,定义必要的路由方案,并在不须要使用它们时进行删除。

2.2 交换机(Exchanges)和交换机类型

交换机是发送消息的AMQP 0-9-1实体。交换机收到一条消息,并将其路由到零个或者多个队列中。咳咳,Java架构杂谈提醒你们,不要联想到了网络的交换机(Network switch),只是中文名称同样而已。

使用的路由算法取决于交换机类型和称为绑定的规则。如下是AMQP 0-9-1 Broker提供的四种交换机类型:

交换类型 默认的预约义名称
直连交换机(Direct exchange) 空字符串和amq.direct
扇形交换机(Fanout exchange) amq.fanout
主题交换机(Topic exchange) amq.topic
头信息交换机(Headers exchange) amq.match 和RabbitMQ中的 amq.headers

除了交换类型以外,交换机还声明了许多属性,关键属性有:

  • Name,交换机的名称
  • 持久性,保证交换机在Broker重启后仍然存在,若是没有指定持久,那么交换机在Broker重启后就不存在了,须要从新声明,并不是全部场景和用例都要求交换机是持久的;
  • 自动删除,当最后一个队列解除绑定时,交换机被删除;
  • 参数,可选,由插件和特定于代理的功能使用。

2.2.1 默认交换机

默认交换机是由Broker预先声明的匿名直连交换机。使用默认交换机的时候,每一个新建队列都会自动绑定到默认交换机上,绑定的路由键与队列名称相同,默认交换机看起来能够将消息直接传递到队列。

2.2.2 直连交换机

交换机根据消息路由键(router_key)将消息传递到队列,消息将会投递到与路由键名称和队列名称相同的队列上。直接交换机是消息单播路由的理想选择(尽管它们也能够用于多播路由)。

直连交换机以下图所示:

image-20211010221330224
  • 一个队列N使用路由键 K 绑定到交换机;
  • 当具备路由键 M 的新消息到达直连交换机时,若是 K = M,则交换机将其路由到队列N。

如上图,具备路由键"itzhai.com"的消息达到交换机以后,则会路由到Queue1中。

直连交换机一般用于以循环的方式在多个消费者之间分配任务,也就是说,消息的负载均衡是发生在消费者之间而不是队列之间

2.2.3 扇形交换机

扇形交换机将消息路由到绑定到它的全部队列,而且忽略路由键。也就是说,当新消息发布到该交换机时,该消息的副本将投递到全部绑定该交换机的队列。扇形交换机是消息广播路由的理想选择

扇形交换机以下图所示:

image-20211010221412597

使用扇形交换机的案例都很是类似:

  • 大型多人在线游戏(MMO)能够将其用于排行榜更新或其余全局事件;
  • 体育新闻网站可使用扇形交换机向客户端近乎实时的分发比分信息;
  • 分布式系统使用它来广播各类状态和配置更新;
  • 群聊可使用它在参与者之间分发消息(AMQP没有内置presence的概念,所以XMPP可能会是更好的选择)。

2.2.4 主题交换机

主题交换机根据消息路由键和和用于将队列绑定到交换机的模式匹配字符串之间的匹配将消息路由到一个或者多个队列。

也就是说经过消息的路由键去匹配到绑定到交换机的路由键匹配字符串,若是匹配上了,就进行投递消息。

routing key模糊匹配的通配符以下:

  • *:用于匹配一个单词,好比 itzhai.com.*,能够匹配:itzhai.com.aitzhai.com.b
  • #:用于匹配0个或者多个单词,好比 itzhai.com.#,能够匹配:itzhai.com.aitzhai.com.a.b

routing key经过.分隔字符串。

主题交换机以下图所示:

image-20211010221438018

当生产者发送的routing_key=itzhai.com的时候,会把消息路由投递到Queue1和Queue2。

当生产者发送的routing_key=www.itzhai.com的时候,会把消息路由投递到Queue3。

当须要有针对性的选择多个接收消息的消费者或者应用的时候,主题交换机均可以被列入考虑的范围。常见的使用场景:

  • 后台任务由多个工做线程完成,每一个工做线程负责处理某些特定的任务,这个时候能够经过主题交换机订阅感兴趣的消息;
  • 分发特定地理位置的信息,每一个工做线程只订阅感兴趣的地理位置的信息;
  • ...

2.2.5 头交换机

头交换机不依赖路由键的匹配规则来路由消息,而是根据发送消息内容中的请求头属性进行匹配。

头交换机相似于直连交换机,可是直连交换机的路由键必须是一个字符串,而请求头信息则没有这个约束,它们甚至能够是整数或者字典。所以能够用做路由键没必要是字符串的直连交换。

绑定一个队列到头交换机上的时候,会同时绑定多个用于匹配的头信息。

投递消息的时候,能够携带一个x-match参数,指定是否要求必须全部的头信息都匹配(all)才能进行投递,仍是只要匹配任何一个就能够了(any)。

注意:以字符串x-打头的头属性,不会做为匹配项。

2.3 队列(Queues)

AMQP 0-9-1 中的队列与其余消息队列和任务队列系统中的队列相似,用于存储即将被消费的消息。通常地,队列与交换机共享一些属性,但队列也有一些特定的属性:

  • Name:队列名称;
  • Durable:队列持久化,队列在Broker重启以后是否继续存在;
  • Exclusive:队列是否仅由一个链接使用,若是是,在链接关闭的时候,队列将被删除;
  • Auto-delete:当最后一个消费者取消订阅的时候,当即删除;
  • Arguments:可选,一些特定的插件和Broker功能使用,例如实现消息的TTL,队列长度限制等。

关于队列的声明:

在使用队列以前,必须先声明它。声明队列的时候,若是队列尚不存在,则将建立一个队列;若是队列已存在,而且属性与声明中的属性相同,则不用从新建立一个;若是现有队列属性与声明的队列属性不一样,将会引起406(PRECONDITION_FAILED)的通道级异常。

2.3.1 队列名称

应用程序能够设置队列名称,若是设置为空字符串,Broker会为它们生成一个惟一的队列名称,在队列声明响应体中一块儿返回给客户端。队列名称为255个字节之内的UTF-8字符。

amq开头的队列名称,保留给Broker内部使用,若是尝试使用此类名称声明一个队列将致使通道级别异常:403(ACCESS_REFUSED)

2.3.2 队列持久化

持久化的队列的元数据会存储在磁盘上,当Broker重启以后,队列依然存在。没有被持久化的队列称为暂存队列。发布的消息也有一样的区分,也就是说,持久化的队列并不会使得路由到它的消息也具备持久性,须要手动把消息也标记为持久化才能保证消息的持久性。

2.4 绑定(Bindings)

绑定是交换机将消息路由到队列的规则。为了让交换机可以正确的把消息投递到对应的队列,须要把交换机和队列经过路由键绑定起来,路由键就像是一个过滤器,决定了消息是否能够投递给消息队列。

若是一条消息不能被路由到任何队列(例如,由于它被发布到的交换机没有绑定),它要么被丢弃,要么返回给发布者,这取决于发布者设置的消息属性。

2.5 消费者

若是消息只是存储在队列里没有被消费,是没有什么实际做用的。在AMQP 0-9-1中,有两种途径能够进行消息的消费:

  • 订阅消息队列,以将消息投递给应用(push API),这是推荐的作法;
  • 应用根据须要主动的轮训获取消息(pull API),这种方式很是低效,在大多数状况下应该避免。

若是应用程序对某一个特定队列的消息感兴趣,则能够注册一个消费者,对队列进行订阅。每一个队列能够有多个消费者,固然也能够注册一个独享的消费者,这个时候其余消费者会被排除在外。

每一个消费者(订阅)都有一个称为消费者标签的字符串类型的标识符,能够用它来退订消息。

2.5.1 消息确认

消费者应用程序可能偶尔没法处理单个消息或有时会崩溃,另外网络问题也有可能致使问题。这就提出了一个问题:Broker什么时候应该从队列中删除消息?AMQP 0-9-1 规范中约定让消费者对此进行控制,有两种确认模式:

  • 自动确认模式:在Broker向应用程序发送消息以后(使用basic.deliver或basic.get-ok方法),将消息从消息队列中删除;
  • 显示确认模式:在应用程序向broker发回确认以后(使用basic.ack方法),将消息从消息队列中删除。

在显示模式下,应用程序选择什么时候发送确认消息。若是消费者在没有发送确认的状况下就挂掉了,那么Broker会将其从新投递给另外一个消费者,若是此时没有可用的消费者,那么Broker将等到至少有一个消费者注册到该队列时,再尝试从新投递消息。

另外,若是应用程序崩溃(当链接关闭时 AMQP Broker会感知到这一点),而且AMQP Broker在预期的时间内未收到消息确认,则消息将从新入队,若是此时有其余消费者,可能当即传递给另外一个消费者。为此,咱们的消费者作好业务的幂等处理也是很是重要的

2.5.2 拒绝消息

当消费者接收到消息以后,可能处理成功或者失败。应用程序能够经过拒绝消息向Broker代表消息处理失败了(或者当时没法完成)。拒绝消息的时候,应用程序能够要求Broker丢弃消息或者从新入队。

当队列中只有一个消费者的时候,请确保您不会经过不断地拒绝消息和从新入队致使消息在同一个消费者身上无限循环的状况发生。

在AMQP中,basic.reject方法用来执行拒绝消息的操做。

2.5.3 预取消息

在多个消费者共享一个队列的状况,可以制定每一个消费者在发送下一个ack以前能够一次性接收多少条消息,这是很是有用的特性。这能够在试图批量发布消息的时候,起到简单的负载均衡和提升消息吞吐量的做用。

请注意:RabbitMQ仅支持通道级预取计数,不支持基于链接或者大小的预取。

2.6 消息属性和有效负载

AMQP 0-9-1模型中的消息是具备属性的,有些属性很是常见,以致于AMQP 0-9-1明肯定义了它们,例如:

  • Content type 内容类型
  • Content encoding 内容编码
  • Routing key 路由键
  • Delivery mode (persistent or not) 投递模式,是否持久化
  • Message priority 消息优先级
  • Message publishing timestamp 消息发布的时间戳
  • Expiration period 消息有效期
  • Publisher application id 发布消息的应用id

有些属性是被AMQP的Broker所使用的,可是大多数是开放给接收它们的应用程序用的。有些属性是可选的,称为消息头(headers),它们相似于HTTP协议的X-Headers,消息属性须要在消息被发布时定义

消息体:AMQP消息除了属性以外,还包括一个有效载荷Payload(消息实际携带的数据),AMQP Broker视其为一个透明的字节数组来对待。Broker不会修改payload。消息可能只包含属性而没有payload。payload一般使用JSON、Thrift、Protocol Buffers和MessagePack等序列化格式来序列化成结构化的数据,以便进行发布,协议对等方一般使用Content typeContent encoding字段来传达此信息。

消息持久化:消息能够做为持久性发布,这使得Broker将他们持久化到磁盘。若是服务器重启以后,系统能够确保接收到的持久化消息不会丢失。简单的将消息发布到持久化的交换机或者被路由到持久化的队列中,是不会让消息也持久化的,消息是否持久化彻底取决于消息自己的持久模式。将消息发布为持久性会影响性能,就像数据存储同样,持久性以必定的性能成本做为代价。

2.7 AMQP 0-9-1 方法

AMQP 0-9-1中定义了许多操做方法,详细参考:AMQP 0-9-1参考[5]

不少方法都有对应的响应方法,有些甚至有不止一种可能的响应,如basic.get,响应可能为:get-ok或者get-empty

以下是声明一个交换机和响应成功的方法:

image-20211010221514688

2.8 链接(Connections)

AMQP 0-9-1 链接一般是长链接,AMQP 0-9-1 是一种使用TCP提供可靠投递的应用层协议。AMQP 0-9-1链接使用身份认证机制并提供TLS (SSL)保护。当应用程序再也不须要链接到Broker时,它应该优雅地关闭其 AMQP 0-9-1 链接,而不是忽然关闭底层 TCP 链接。

2.9 通道(Channels)

某些应用程序须要同时开启链接到Broker,可是,同时保持许多TCP链接是不可取的,这样会消耗系统资源而且使得配置防火墙更加困难。

AMQP 0-9-1经过通道复用技术经过通道的形式实如今一个TCP链接上面支持多个链接(虚拟的连接)。同一个TCP链接中有多个通道,通道之间的通讯是彻底隔离的。客户端的每一个协议操做都携带了一个通道ID,代理和客户端都是用它来肯定该操做所走的通道。

image-20211010221549040

通道仅存在于TCP链接上下文中,一旦TCP链接关闭,其上全部通道也跟着关闭。

通常的,咱们会给每一个线程打开一个新的通道进行通讯。

2.10 虚拟主机

为了让单个代理能够托管多个隔离的环境(用户组、交换机、队列等),AMQP中提供了虚拟主机,这相似于许多流行的Web服务器使用的虚拟主机。协议客户端在链接协商期间须要指定想要使用的虚拟主机。

2.11. AMQP Client架构

推荐的AQMP Client架构须由下面多个抽象层组成:

image-20211010221733418
  • 成帧层:此层接收AMQP协议方法,并按某种语言格式(结构、类等)来序列化成线级帧,成帧层能够根据AMQP协议规范来实现;
  • 链接管理层:此层用于读写AMQP帧,并管理全部链接、会话逻辑。在此层中,咱们能够封装开启链接和会话、错误处理、内容传输和接收数据的所有逻辑;
  • API层:此层暴露了应用程序工做的特定API。API层可能会反映一些现有的标准,或暴露高层AMQP的方法。API层自己多是由多个层组成的,如构建于AMQP方法API之上的高级API;
  • IO层:此外,一般还会有一些I/O层,这此能够是很是简单的同步套接字读取和写入或复杂的异步多线程IO。

AMQP就介绍到这里了,接下来Java架构杂谈带你们详细看看RabbitMQ。

3. RabbitMQ架构

RabbitMQ的总体架构以下图所示:

image-20211010222005614

Broker:Broker中按虚拟主机(virtual host)划分,每一个虚拟主机下面有本身的交换机(exchange)和消息队列(queue),以及交换机和队列的绑定routing_key(有些人会把这个key称为binding_key);

生产端:通常地,同一个客户端(client)里面的每一个生产者(producer)建立一个专门的通道(channel),复用同一个TCP链接(connection),每一个生产者能够往Broker发布消息,发布消息的时候,需指定虚拟主机,以及虚拟主机上的交换机,而且消息须要带上routing_key;

消费端:通常地,同一个客户端(client)里面的每一个消费者(consumer)建立一个专门的通道(channel),复用同一个TCP链接,每一个消费者指定一个消息队列进行消费。同一个消息队列,能够有多个消费者共同消费,但消息队列里面的同一条消息,只会由一个消费者消费,多个消费者至关于给消息队列作了负载均衡。

针对默认交换机直连交换机主题交换机生产端带入的routing_key交换机与队列之间绑定的routing_key(binding_key)进行匹配,匹配上了,就把消息投递给对应的消息队列。

针对扇形交换机,直接把消息投递给全部与扇形交换机绑定的队列。

rabbitmqctl是管理RabbitMQ服务器节点的主要命令行工具,相关完成命令介绍参考:rabbitmqctl(8)[6]

4. RabbitMQ特性

4.1 消息ACK机制[7]

ACK (Acknowledge character),便是确认字符,消息的接收方须要告诉发送方已确认接收消息,这是实现可靠消息投递的必备特性。

MQ系统中,涉及到ACK的流程以下图所示:

image-20211010222138210

4.1.1 生产端ACK之Confirm消息机制

如上图所示:

  • Producer发布消息到Broker
  • Broker将消息落地;
  • Broker发送ack给Producer

若是Producer没有收到ack,那么能够重发消息,直到收到ack为止。为了不无限的给Broker投递消息,应该设置一个重试上限,并记录下发送失败的消息。在这个过程当中,MQ Server可能会收到重复消息。

在RabbitMQ中,生产端的ACK经过ConfirmListener机制来实现:

image-20211010222210899

在channel中开启确认模式confirmSelect(),而后在channel中添加监听,用来监听Broker返回的应答。

Broker什么时候给生产端发送ACK?

对于不可路由的消息,一旦交换机验证消息不会路由到任何队列,Broker将发出ack,若是开启了Return消息机制(下一小节讲解),那么Broker会先发送basic.return消息给客户端,再发送basic.ack消息。示例代码以下:

String message = "Hello itzhai.com....";
// Confirm消息机制
channel.addConfirmListener(new TestConfirmListener());

// Return消息机制
channel.addReturnListener(new TestRetrunListener());
// 错误的路由键,但交换机的名称正确
String errorRoutingKey = "itzhai.com.test1";
boolean mandatory = true;

channel.basicPublish(exchangeName, errorRoutingKey, mandatory, basicProperties, message.getBytes());

执行以上代码,生产者将依次收到basic.return(Return消息),basic.ack(Confirm消息)。

image-20211010222236372

对于可路由的消息,当全部队列都接收到消息的以后,Broker向生产端发送ACK。若是路由到的是持久队列,而且是持久消息,那么这个ACK就意味着消息持久化到了磁盘。

也就是说,路由到持久队列持久消息的ACK将在将消息持久化到磁盘后发送

RabbitMQ消息持久化的性能如何?

RabbitMQ持久化消息的刷盘策略:为了尽量减小fsync(2)的调用次数,RabbitMQ在间隔一段时间(几百毫秒)或者在队列空闲的时候将消息分批保存到磁盘中。

这就意味着,在正常的负载下,生产端接收Broker的ACK时延可达几百毫秒。为了提升吞吐量,强烈建议生产端应用程序异步处理ACK,或者批量发布消息,并等待ACK。

4.1.2 生产端ACK之Return消息机制

Return消息机制用于处理一些不可路由的消息。发送消息的时候,若是指定的routing_key路由不到队列,这个时候就能够经过ReturnListener监听这种异常状况。

4.1.3 消费端ACK

image-20211010222138210

如上图所示:

  • 消息服务器将消息投递给消费者;
  • 消费者消费消息,并向消息服务器发送ack;
  • 消息服务器收到消费者的ack以后,将已落地的消息删除掉。

当Broker一直没有收到消费端的ACK,则会重发消息,这个过程通常采用指数退避策略,时间间隔按指数增加。

Rabbit中的消费端ACK

在RabbitMQ中,消费端的ACK能够是自动的,或者手动的。

手动ACK签收

经过如下方法关闭自动ack签收(入参autoAck设置为false):

Channel.java
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

而后自定义一个支持ack的Consumer:

public class TestAckConsumer extends DefaultConsumer {

    ...

    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body)
        throws IOException
    {
        try{
            ...
            // 成功消费的ack
            boolean multiple = false;
            channel.basicAck(envelope.getDeliveryTag(), multiple);
        }catch (Exception e) {
            // 未成功消费的ack,设置为不重回队列,即马上删除消息
            boolean multiple = false;
            boolean requeue = false;
            channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);
        }
    }
}

channel中有三种ack相关的方法:

  • basic.ack:用于确定确认,指示RabbitMQ消息已经处理成功能够丢弃消息了;
  • basic.nack:用于否认确认,指示RabbitMQ消息未处理成功,能够经过参数指定是否须要丢弃消息仍是重回队列
  • basic.reject:用于否认确认,指示RabbitMQ消息未处理成功,能够经过参数指定是否须要丢弃消息仍是重回队列

basic.nack与basic.reject的区别就是,basic.nack支持批量手动确认,basic.nack是RabbitMQ对AMQP 0-9-1协议的扩展。

自动ACK签收

使用自动确认模式,消息在发送以后就马上被标记为投递消费成功。若是消费者的TCP链接或者通道在真正投递成功以前就关闭了,那么Broker发送的消息将会丢失。自动确认模式是以下降消息投递的可靠性来换取更高的消费端吞吐量(只要消费端处理速度可以跟上)

如何避免消费过载的问题(消费端限流)?

使用自动模式能够提升吞吐量,可是前提是消费端要可以处理得过来,若是处理不过来,就会在消费端的内存中积压消息,直至把内存耗尽。所以,自动确认模式仅推荐用于可以以稳定的速度高效地处理消息的消费者。

为了不消费过载问题,咱们通常使用手动确认模式,配合通道预取限制一块儿使用:

// 每条消息的大小限制,0表示不限制
int prefetchSize = 0;  
// MQ Server每次推送的消息的最大条数,0表示不限制
int prefetchCount = 1;  
// true 表示配置应用于整个通道,false表示只应用于消费级别
boolean global = false;  
channel.basicQos(prefetchSize, prefetchCount, global);

// 队列名称
String queueName = "com.itzhai.queue";  
// 设置为手动确认模式
boolean autoAck = false;
// 消费者对象实例
Consumer consumer = new ItzhaiTestConsumer(channel);
channel.basicConsume(queueName, autoAck, consumer);

如何提升手动ACK签收的效率

若是不须要严格控制发送消费端ACK的时间,即,只要消费者成功接收到消息,无论有没有消费成功,都容许进行ACK回复,那么就能够经过批量ACK签收的功能更来提升签收的消效率。作法以下:

// 手动签收模式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
             // 注意,这里设置为批量签收
             boolean mutiple = true;
             // 签收deliveryTag以及deliveryTag以前的全部消息
             channel.basicAck(deliveryTag, mutiple);
         }
     });

这样执行basicAck,deliveryTag以及deliveryTag以前的全部消息都将会被签收。

image-20211010222327194

何时须要让消息重回队列?

有时候消费者太繁忙致使没法当即处理接收到的消息,可是其余实例可能能够处理。这种状况,就能够拒绝消息,而且让消息重回队列。

另外,可使用channel.basicNack方法一次拒绝或者从新排队多条消息:

// 指定批量拒绝策略
boolean multiple = true;
// 指定拒绝以后从新入队
boolean requeue = true;
channel.basicNack(envelope.getDeliveryTag(), multiple, requeue);

极端状况下,若是全部消费者由于暂时没法处理接收的消息,会致使消息不断的循环重回入队,致使消耗网络带宽和CPU资源。为了不这种状况,能够跟踪重回队列的消息数量,决定是否须要永久拒绝消息(丢弃消息)仍是延迟重回队列的时间。

4.2 消息的顺序性可以获得保证吗?

通常状况下,在单个通道上发布的消息,Rabbit会按照消息发布的相通顺序向生产端发送ACK消息,但也不是绝对的。发布ACK的确切时刻取决于消息的传递模式(持久化或瞬时),以及消息路由到的队列的属性。也就是说,不一样的消息在不一样的时间准备好进行确认,确认消息能够以不一样的顺序达到。因此,应用程序尽量不要依赖于消息的顺序性。

4.3 消息处理的幂等性如何处理?

不管是生产端仍是消费端的ACK,都有可能由于网络或者程序问题致使ACK消息没有及时送达,这个时候会致使重复的消息投递。如何保证消费同一条消息的状况下不影响业务,这就须要保证消息处理的幂等性。

也就是说,针对同一条消息,不管消费者消费多少次,产生的效果始终应该跟消费一次的保持一致,而且返回的ACK结果也是一致的。

经常使用的实现消息处理幂等性的方法:

  • 每条消息生成惟一ID,消费端根据惟一ID判断是否已经消费过,若是消费过,则直接返回消费成功的ACK。
    • 针对入库的业务操做能够经过数据库的惟一索引来实现避免重复业务数据入库;
    • 针对修改数据类的操做,能够先判断数据是否已是目标状态了,若是是目标状态,直接返回再进行更新。
  • 针对并发的场景,咱们须要给业务消费程序添加分布式锁,避免并发执行致使触发业务重复处理。

4.4 死信队列[8]

若是消息队列中的消息没有被正常消费掉,那么该消息就会成为一个死信(Dead Letter),这条消息能够被从新发送到另外一个交换机上,后面这个交换机就是死信交换机(DLX),死信交换机绑定的队列就是死信队列。在如下状况下致使的消息未被正常消费,均会使消息变为死信:

  • 消费者使用basic.reject或者basic.nack来拒绝消息,同时设置requeue参数为false,表示消息不须要重回队列;
  • 消息设置了TTL,而且过时了,或者队列设置了消息的过时时间x-message-ttl
  • 因为消息队列超过了长度限制致使消息被丢弃了。

死信队列也是一个正常的交换机,它能够是任何常见的交换机类型,与常规交换机声明没有区别。

DLX能够有客户端使用队列参数(arguments)进行定义,或者在服务器中使用策略(policy)进行定义,在policy和arguments都定义了的状况下,arguments中指定的那个会否决policy中指定的那个。

经过policy启用死信队列:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

经过arguments启用死信队列:

// 声明一个交换机,做为死信交换机
channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

5. 持久化消息就意味着消息的可靠性吗?如何实现可靠性投递?

消息可靠性须要考虑生产端投递消息的可靠性以及保证消费端最终成功地消费消息。

虽然经过生产端的ACK机制,能够确保消息成功的投递到了RabbitMQ中,保证投递的消息不丢失。可是若是生产端不知道消费者究竟有没有成功的消费了消息,那也就没法实现可靠性投递了。

而生产端投递消息的过程当中,一般会涉及到生产端的事务提交,要保证消息跟随事务提交而发送,也是须要考虑的问题。

如何实现可靠投递呢?这里留给你们思考,关键设计要点:

  • 是否要发消息跟随生产端事务一块儿保存到发送日志表,提交事务以后马上向消息队列投递一次消息;
    • 生产端发送日志表消息状态:1 发送中,2 Broker签收成功,3 Broker签收失败,4 消费端签收成功,5 消费端签收失败
  • 使用消息队列模拟RPC调用,在消费者成功处理消息以后,向生产者投递成功消费的消息,以便让生产端知道消息已经处理成功了;
  • 定时任务定时扫描生产端发送日志表,对于超过固定时间以内,还未处理成功的消息,进行重试投递,重试可使用指数退避策略,并设置投递上限次数。若是达到上限次数还未成功,则预警人工介入排查;
  • 消费端必定要作好幂等处理,避免重复消费致使业务异常。

提示的还不够具体?我再上一张图:

image-20211010222410703

有更好的方案的朋友,欢迎在评论区留言交流,也许你就是评论区最靓的仔。

6. RabbitMQ更多使用场景

经过给消息设置TTL,超时时候放入死信队列进行处理,能够实现延迟队列,固然,RabbitMQ也有专门的延迟队列插件可使用;

另外,也可使用RabbitMQ模拟RPC调用,参考上一节实现消息可靠性投递的例子。

更多的使用场景欢迎你们进行补充。

关于更多消息中间件的文章,欢迎关注Java架构杂谈,或者个人博客IT宅(itzhai.com),我会持续的输出相关内容。


我精心整理了一份Redis宝典给你们,涵盖了Redis的方方面面,面试官懂的里面有,面试官不懂的里面也有,有了它,不怕面试官连环问,就怕面试官一上来就问你Redis的Redo Log是干啥的?毕竟这种问题我也不会。

image-20211007142531823

Java架构杂谈公众号发送Redis关键字获取pdf文件:

image-20211010220323135

本文做者: arthinking

博客连接: https://www.itzhai.com/articles/rabbitmq-advanced-tutorial.html

消息队列那么多,为何建议深刻了解下RabbitMQ?

版权声明: 版权归做者全部,未经许可不得转载,侵权必究!联系做者请加公众号。

References


  1. Which protocols does RabbitMQ support?. Retrieved from https://www.rabbitmq.com/protocols.html ↩︎ ↩︎

  2. OASIS Advanced Message Queuing Protocol (AMQP) Version 1.0. Retrieved from http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf ↩︎

  3. AMQP 0-9-1 Model Explained. Retrieved from https://www.rabbitmq.com/tutorials/amqp-concepts.html ↩︎

  4. AMQP Working Group 0-9-1. Retrieved from https://www.amqp.org/specification/0-9-1/amqp-org-download ↩︎

  5. AMQP 0-9-1 Complete Reference Guide. Retrieved from https://www.rabbitmq.com/amqp-0-9-1-reference.html ↩︎

  6. rabbitmqctl(8). Retrieved from https://www.rabbitmq.com/rabbitmqctl.8.html ↩︎

  7. Consumer Acknowledgements and Publisher Confirms. Retrieved from https://www.rabbitmq.com/confirms.html ↩︎

  8. Dead Letter Exchanges. Retrieved from https://www.rabbitmq.com/dlx.html ↩︎