Iawen's Blog

我喜欢这样自由的随手涂鸦, 因为我喜欢风......

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的, 而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 9

主要特性:

  • 可伸缩性: 集群服务
  • 消息持久化: 从内存持久化消息到硬盘, 再从硬盘加载到内存

项目地址: https://github.com/rabbitmq/rabbitmq-server

1. RabbitMQ 的安装与基本使用

1.1 安装

yum install -y epel-release
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

yum install -y erlang socat

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.9.11/rabbitmq-server-3.9.11-1.el8.noarch.rpm
rpm -ivh rabbitmq-server-3.9.11-1.el8.noarch.rpm

1.2 配置RabbitMQ

关于RabbitMQ的配置, 可以下载RabbitMQ的配置文件模板到/etc/rabbitmq/rabbitmq.config, 然后按照需求更改即可。 关于每个配置项的具体作用, 可以参考官方文档

1.2.1 用户名和密码

rabbitmq默认创建的用户guest, 密码也是guest, 这个用户默认只能是本机访问, localhost或者127.0.0.1, 从外部访问需要添加的配置, 将配置文件中的loopback_users列表置为空即可. 关于新添加的用户, 直接就可以从远程访问的, 如果想让新添加的用户只能本地访问, 可以将用户名添加到上面的列表, 如只允许admin用户本机访问:

// 开启guest 远程访问,  禁用admin 远程访问
[{rabbit, [{loopback_users, ["admin"]}]}].

1.2.2 RabbitMQ 用户级别:

  • administrator 可以登录控制台、查看所有信息、可以对rabbitmq进行管理
  • monitoring 监控者 登录控制台, 查看所有信息
  • policymaker 策略制定者 登录控制台,指定策略
  • managment 普通管理员 登录控制台
rabbitmqctl add_user iawen admin123
rabbitmqctl set_user_tags iawen administrator

1.2.3 开启web管理接口

如果只从命令行操作RabbitMQ, 多少有点不方便。幸好RabbitMQ自带了web管理界面, 只需要启动插件便可以使用。

rabbitmq-plugins enable rabbitmq_management
# 然后通过浏览器访问 http://localhost:15672

2. AMQP

高级消息队列协议即Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的开放的应用层协议, 其设计目标是对于消息的排序、路由(包括点对点和订阅-发布)、保持可靠性、保证安全性。AMQP规范了消息传递方和接收方的行为, 以使消息在不同的提供商之间实现互操作性, 就像SMTP, HTTP, FTP等协议可以创建交互系统一样。与先前的中间件标准(如Java消息服务)不同的是, JMS在特定的API接口层面和实现行为上进行了统一, 而高级消息队列协议则关注于各种消息如何以字节流的形式进行传递。因此, 使用了符合协议实现的任意应用程序之间可以保持对消息的创建、传递。

高级消息队列协议是一种二进制应用层协议, 用于应对广泛的面向消息应用程序的支持。协议提供了消息流控制, 保证的一个消息对象的传递过程, 如至多一次、保证多次、仅有一次等, 和基于SASL和TLS的身份验证和消息加密.

高级消息队列协议对于实现有如下规定

  • 类型系统
  • 对称的异步消息传递
  • 标准的、可扩展的消息格式
  • 标准的、可扩展的消息存储池 0

2.1 Base Properties

1

3. 核心特性

3.1 队列

3.1.1 五种队列模式

  • 简单队列: 生产者将消息发送到队列, 消费者从队列中获取消息。
  • Work模式: 一个生产者、2个消费者.一个消息只能被一个消费者获取
  • 订阅模式: 1个生产者, 多个消费者, 每一个消费者都有自己的一个队列.生产者没有将消息直接发送到队列, 而是发送到了交换机
  • 路由模式: 需要将一个队列绑定到交换机上, 要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”, 则只有被标记为“dog”的消息才被转发, 不会转发dog.puppy, 也不会转发dog.guard, 只会转发dog。
  • 通配符模式: Topic Exchange

3.1.2 队列保留参数:

参数名 目的
x-dead-letter-exchange 该参数值为一个(死信)交换机的名称,当队列中的消息的生存期到了,或者因长度限制被丢弃时,消息会被推送到(绑定到)这台交换机(的队列中),而不是直接丢掉.
x-dead-letter-routing-key 用于死信消息的可选路由键
x-expires 队列多长时间(毫秒)没有被使用(访问)就会被删除.换个说法就是,当队列在指定的时间内没有被使用(访问)就会被删除.
x-max-length 队列可以容纳的消息的最大条数,超过这个条数,队列头部的消息将会被丢弃
x-max-length-bytes 队列可以容纳的消息的最大字节数,超过这个字节数,队列头部的消息将会被丢弃.
x-message-tt 以亳秒为单位的消息过期时间, 队列级别执行
x-max-priority 启用最大优先级值为255 (RabliitMQ3.5.0及更高版本)的队列优先排序功能
x-match 使用headers exchange时, 值: all or any
x-queue-mode lazy, 设置队列为懒人模式.该模式下的队列会先将交换机推送过来的消息(尽可能多的)保存在磁盘上,以减少内存的占用.当消费者开始消费的时候才加载到内存中;如果没有设置懒人模式,队列则会直接利用内存缓存,以最快的速度传递消息.
x-queue-master-locator
x-ha-policy 创建HA队列时, 指定跨节点实现HA的模式
x-ha-nodes HA队列分布的节点
x-overflow 队列中的消息溢出时,如何处理这些消息.要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息: drop-head or reject-publish

6

3.1.3 死信队列

当RabbitMQ出现死信, 可能会导致业务逻辑错误, 比如下订单后修改库存操作, 在下单后因为某种原因, 发送的消息未被签收, 这时库存数据会出现不一致。有死信队列之后我们就可以监听死信队列, 来处理业务逻辑。

  • 声明队列, 添加参数x-dead-letter-exchange
  • 死信队列, 是一个普通的Exchange和queue, 需要设置死信Exchange和queue, 并进行绑定

3.1.4 备用队列

当消息经过交换器准备路由给队列的时候, 发现没有对应的队列可以投递信息, 在rabbitmq中会默认丢弃消息, 如果我们想要监测哪些消息被投递到没有对应的队列, 我们可以用备用交换器来实现。 大概原理如下, 如下图所示, 消息发送给交换器, 交换器发现没有可路由的队列, 于是消息发给备用交换器, 备用交换器再发给队列2, 由队列2的消费者来处理消息。 8

交换器的定义, 需要一个参数, 可以通过参数的方式, 来指定备用交换器。参数的key是alternate-exchange,value是交换器的名称。通常备用交换器的类型是fanout

3.2 四种Exchange类型

RabbitMQ常用的Exchange Type有fanout、direct、topic、headers这四种, 下面分别进行介绍。

3.2.1 Direct 处理器

这是一个完整的匹配。当需要投递的消息有一个确定的目标(或者多个目标)时, direct 交换器就能派上用场。任何绑定在交换器上的队列, 只要它的路由键和发布消息时的一致, 它就能收到消息。如果一个队列绑定到该交换机上要求路由键为 “green”, 则只有路由键为“green”的消息才被转发, 不会转发路由键为"red", 只会转发路由键为"green"。对于direct 交换器来说, RabbitMQ 在检查绑定时会比较字符串是否相等。 此时不允许使用任意类型的模式匹配 2

3.2.2 Topic 处理器

通过采用句点分隔的形式, 队列可以通过使用基于通配符的模式匹配的方式来绑定到路由键上。通过使用星号(*)和井号(#)字符, 你可以在同一时刻匹配路由键的特定部分, 甚至是多个部分。星号将会匹配路由键中下一个句点前的所有字符 , 而井号键将会匹配接下来所有的字符, 包括句点。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词, 符号“*”只能匹配一个词。 3

3.2.3 Fanout 处理器

不处理路由键, 你只需要简单的将队列绑定到交换机上。一个发送到该类型交换机的消息都会被广播到与该交换机绑定的所有队列上。 4

3.2.4 Headers 处理器

不处理路由键, 而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对; 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配; 如果完全匹配则消息会路由到该队列, 否则不会路由到该队列。headers属性是一个键值对, 可以是Hashtable, 键值对的值可以是任何类型。而fanout, direct, topic 的路由键都需要要字符串形式的 5

3.2.5 consistent-hashing exchange 一致性哈希交换器

rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
# type: x-consistent-hash

# 必须在声 明交换器时传入 hash-header 值

一致性哈希交换器(consistent-hashing exchange)插件随 RabbitMQ 一同发布, 它将数据分发给绑定的队列上。它可以为用于接收消息的队列做负载均衡。你可以在集群中用该插件来将消息分发到不同物理服务器上的队列中去, 或者分发到那些只有单个消费者的队列中 去。相比 RabbitMQ 将消息分发至单个队列的多个消费者来说, 它提供了潜在的更为快速的吞吐量。当使用数据库或者其他系统以消费者的身份直接集成至 RabbitMQ 上时, 一致性哈希交换器提供了扩展数据的能力而无须编写中间件。

在为消息选择目的地时午, 你无法施加影响来确保消息的均匀分布。一致性哈希交换器不会轮询( round-robin)消息, 而是基于路由键或者消息属性中 header-type 值的哈希值来做出明确的路由。 7

在将队列绑定至一致性哈希交换器时, 需要输入队列的权重(字符串类型)用于哈希算法。

如果你想要在多个队列之间对消息进行负载均衡, 而又不想使用一致性晗希方法的话, 那么请参考 John Brisbin 的随机交换器 https://github.com/jbrisbin/random-exchange。这种交换器并不使用路由键来将消息分发至队列当中去, 而是使用随机数字生成的方式。

4. 高级

4.1 MQTT 与 RabbitMQ

首先开启rabbitmq服务的mqtt插件:

rabbitmq-plugins enable rabbitmq_mqtt

当通过 MQTT 连接 RabbitMQ 来订阅消息时, RabbitMQ 将创建新的队列。队列名称将采用 mqtt-subscriber-[NAME]qos[N]的格式。 其中 [NAME]是唯一的客户端名称, [N]是客户端连接设置的 QoS 等级。举例来说, 一个名为 mqtt-subscriber-facebookqosO 的队列, 代表订阅者名称为 facebook, 并且QoS 设置为了 0 ….

订阅者能够使用类似于 AMQP 中 topic 和交换器间的路由键绑定的方式, 通过字符串匹配或者模式匹配绑定到 topic 上。井号键(#)在 AMQP 和 MQTT 中可以用于多层匹配。不过对于 MQTT 客户端发送消息来说, 加号符号(+)是用于路由键的单层匹配, 而非星号(*)。举例来说, 假设将图片消息通过 MQTT 发布至名为 image/new/profileimage/new/gallery 的 topic 上的话, 订阅至 image/# 的消费者将能收到所有的图片消息, 订阅至 image/new/+ 的消费者将能收到所有新的图片消息, 而订阅至 image/new/profile 的消费者只能接收到新的资料图片消息. 9

4.2 STOMP 和 RabbitMQ

首先开启rabbitmq服务的stomp插件:

rabbitmq-plugins enable rabbitmq_stomp

STOMP 专门设计用于基于流的处理, STOMP 帧是 UTF-8 文本, 由命令和命令对应的载荷组成, 并以 null(OxOO)字节结束。不同于 AMQP 和 MQTT 协议, STOMP 是可读的, 而且不需要二进制位封装信息来定义 STOMP 消息帧和内容。

  • 通过 STOMP 发送消息和通过 MQTT 或者 AMQP 发送消息类似。想要直接向队列发送消息的话, 采用格式为/queue/<queue-narne>的字符串作为目的地
  • 通过来用/exchange/<exchange-name>/<routing-key> 格式的方式, 我们可以在 RabbitMQ 的 STOMP 插件中使用路由键向交换器发送消息 10

4.3 RabbitMQ 与数据库集成

4.4 节点管理

 erl -sname test
Erlang/OTP 24 [erts-12.1.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1]

Eshell V12.1.5  (abort with ^G)
(test@localhost)1> node().
test@localhost
(test@localhost)2> net_adm:names().
{ok,[{"rabbit",25672},{"test",33985}]}
(test@localhost)3> net_adm:ping('rabbit@localhost').
pang
(test@localhost)4> nodes().
[]
(test@localhost)5> q().
ok
(test@localhost)6>

5. RabbitMQ 集群部署

5.1 集群部署

# RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached

# 需要先禁用所有插件
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
$ RABBITMQ_NODE_PORT=5674 RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached 

rabbitmqctl -n rabbit2@localhost stop_app
rabbitmqctl -n rabbit2@localhost reset 
rabbitmqctl -n rabbit2@localhost join_cluster rabbit@localhost
rabbitmqctl -n rabbit2@localhost start_app

# 对第3个节点重复上面的操作

11

5.2 主备模式, 也称warren(兔子窝) 模式

warren 是指一对主/备独立服务器, 并前置一台负载均衡器来处理故障转移。也就是一个主/备方案, 主节点提供读写, 备用节点不提供读写。如果主节点挂了, 就切换到备用节点, 原来的备用节点升级为主节点提供读写服务, 当原来的主节点恢复运行后, 原来的主节点就变成备用节点, 和 activeMQ 利用 zookeeper 做主/备一样, 也可以一主多备。 12

5.4 镜像队列

RabbitMQ的镜像队列同时支持publisher confirm和事务两种机制。在事务机制中, 只有当前事务在全部镜像queue中执行之后, 客户端才会收到Tx.CommitOk的消息。同样的, 在publisher confirm机制中, 向publisher进行当前message确认的前提是该message被全部镜像所接受了。

镜像队列的配置通过添加policy完成, policy添加的命令为:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

# 对队列名称以“queue_”开头的所有队列进行镜像, 并在集群的两个节点上完成进行, policy的设置命令为: 
rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
ch.QueueDeclare("ha.msg", true, false, false, false, map[string]interface{}{"x-ha-policy": "all"})

可以将镜像队列视为拥有一个隐藏的fanout 交换器, 它指示着信道将消息分发到队列的从拷贝上。

5.5 跨集群模式下的联合插件

作为核心 RabbitMQ 发行版的一部分, 联合插件提供了 灵活的方式透明地实现节点间和集群间的消息中 继。这一插件的功能可以分为以下两个主要组件: 联合交换器和联合队列。

联合交换器允许发往上游节点交换器的消息被透明地发送至下游节点中相同名称的交换器上。 而联合队列则允许下游节点扮演上游节点中共享队列的消费者角色, 为多个下游节点提供了轮询(round-robin)消费消息的能力。需要注意的是如果rabbitmq本身是一个集群, 那么集群中所有的rabbitmq服务都必须开启federation插件功能。

rabbitmq-plugins enable rabbitmq_federation rabbitmq_federation_management

5.6 shovel

远程模式可以实现双活的一种模式, 简称 shovel 模式, 所谓的 shovel 就是把消息进行不同数据中心的复制工作, 可以跨地域的让两个 MQ 集群互联, 远距离通信和复制。 Shovel 就是我们可以把消息进行数据中心的复制工作, 我们可以跨地域的让两个 MQ 集群互联。

生产者发布一个消息到MQ1服务器的交换器, 交换器路由到队列1中, 然后通过Shovel复制到MQ2服务器的交换器上, 最终到达队列3中。 13