目录

  1. 一、什么是RabbitMQ
  2. 二、 RabbitMQ简介
    1. 2.1 术语
    2. 2.2 基本特性
  3. 三、 官方Demo与思考
    1. 3.1 Hello World
      1. 3.1.1 操作
      2. 3.1.2 思考
    2. 3.2 工作队列
      1. 3.2.1 操作
      2. 3.2.2 思考
    3. 3.3 发布/订阅
      1. 3.3.1 操作
      2. 3.3.2 思考
    4. 3.4 路由
      1. 3.4.1 操作
      2. 3.4.2 思考
    5. 3.5 主题交换机
      1. 3.5.1 操作
      2. 3.5.2 思考
    6. 3.6 远程过程调用RPC
      1. 3.6.1 操作
      2. 3.6.2 思考
  4. 四、 思考总结
  5. 五、 场景模拟

一、什么是RabbitMQ

RabbitMQ是一个消息代理。

RabbitMQ基于AMQP协议用Erlang编写。

什么是AMQP?

  • AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。

简单来说: MQ是邮递过程,寄信人去邮局把信件放入邮箱,邮递员就会把信件投递到收件人

二、 RabbitMQ简介

2.1 术语

术语 类比
生产者Producing 寄信人,信件的来源
消息Message 信件
交换机Exchange 邮局,信件分发的场所
队列Queue 邮箱,存储邮件
绑定Binding 邮局信件和邮箱之间的关系,邮局信件按收件省市划分后归纳到不同邮箱
消费者Consuming 收件人,信件的归宿

1

2.2 基本特性

  • 消息只存储在Quene中

    • 只有邮箱,也就是Queue具有存储功能,Exchange不能存储消息
  • RoutingKey

    • RoutingKey就是消息的收件地址
    • RoutingKey可以直接写成XXX格式,也可以写成XXX.XXX.XXX格式,也可以为空
  • 交换机有4种类型

    • Direct 直连交换机
    • Fanout 扇形交换机
  • Topic 主题交换机
  • Headers 头交换机

三、 官方Demo与思考

3.1 Hello World

2

3.1.1 操作

生产者:

1
2
3
4
5
6
7
8
- 连接MQ
- 申明Queue
MQ.Queue.Name = "hello"
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = "hello"
Body = "hello wolrd"

消费者:

1
2
3
4
5
6
7
- 连接MQ
- 申明Queue
MQ.Queue.Name = "hello"
- 消费信息
MQ.Consume
Queue.Name= "hello"
Auto ack = true //注意自动ack

3.1.2 思考

  • 为何没有给Exchange命名?Exchange是什么类型?
  • 为何在生产者和消费者都申明了Queue?
  • RoutingKey和Queue.Name 有什么关系?

3.2 工作队列

3

3.2.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Queue
MQ.Queue.Name = "task_queue"
MQ.Queue.Durable = true //注意Queue持久化
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = "task_queue"
Body = "hello wolrd"
delivery_mode = persistent or 2 //注意消息持久化

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
- 连接MQ
- 申明Queue
MQ.Queue.Name = "task_queue"
MQ.Queue.Durable = true
- QoS
MQ.Qos
prefetch_count = 1 //注意QoS
- 消费信息
MQ.Consume
Queue.Name= "task_queue"
Auto ack = false //注意手动ack
- Sleep模拟处理事务
- 手动ack

3.2.2 思考

  • 多个消费者在Direct交换机,相同Queue下,如何接收消息?
  • 如何让Queue和消息持久化?
  • 可以直接给3.1中的hello队列赋予持久优设置吗?
  • 持久化有什么用?
  • 默认交换机是持久化的吗?
  • 手动ack和自动ack的区别?
  • 忘记ack怎么办?
  • QoS(Quality of Service)如何实现?

3.3 发布/订阅

4

3.3.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "logs"
MQ.Exchange.Type = "fanout"
MQ.Exchange.Durable = true
- 发送信息
MQ.Publish
Exchange = "logs"
RoutingKey = "" //注意RoutingKey
Body = "hello wolrd"

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "logs"
MQ.Exchange.Type = "fanout"
MQ.Exchange.Durable = true
- 申明临时Queue
MQ.Queue.exclusive= true //注意临时Queue
- 绑定
Binding //注意绑定
Exchange = "logs"
Queue.Name = MQ.Queue.Name
RoutingKey = ""
- 消费信息
MQ.Consume
Queue.Name = MQ.Queue.Name
Auto ack = false
- Sleep模拟处理事务
- 手动ack

3.3.2 思考

  • 扇形交换机的RoutingKey是如何配置的?
  • 扇形交换机的主要用途?
  • 什么是临时队列?
  • 临时队列与持久化可以同时设置吗?
  • 扇形交换机可以在生产者设置持久化的队列吗?

3.4 路由

5

6

3.4.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "direct_logs"
MQ.Exchange.Type = "direct" //注意Exchange类型
MQ.Exchange.Durable = true
- 发送信息
MQ.Publish
Exchange = "direct_logs"
RoutingKey = "info" or "error" or "warn" //注意RoutingKey
Body = "hello wolrd"

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "direct_logs"
MQ.Exchange.Type = "direct"
MQ.Exchange.Durable = true
- 申明临时Queue
MQ.Queue.exclusive= true
- 绑定
Binding
Exchange = "direct_logs"
Queue.Name = MQ.Queue.Name
RoutingKey = "info" or "error" or "warn"
- 消费信息
MQ.Consume
Queue.Name = MQ.Queue.Name
Auto ack = false
- Sleep模拟处理事务
- 手动ack

3.4.2 思考

  • 直连交换机多重绑定和扇形交换机有什么区别?
  • 直连交换机RoutingKey绑定有什么限制?

3.5 主题交换机

7

3.5.1 操作

生产者:

1
2
3
4
5
6
7
8
9
10
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "topic_logs"
MQ.Exchange.Type = "topic"
MQ.Exchange.Durable = true
- 发送信息
MQ.Publish
Exchange = "topic_logs"
RoutingKey = "this.abc" or "that.abc.xyz" or "abc" //注意RoutingKey写法
Body = "hello wolrd"

多个消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Exchange
MQ.Exchange.Name = "topic_logs"
MQ.Exchange.Type = "topic"
MQ.Exchange.Durable = true
- 申明临时Queue
MQ.Queue.exclusive= true
- 绑定
Binding
Exchange = "topic_logs"
Queue.Name = MQ.Queue.Name
RoutingKey = "*.abc" or "#.abc" or "*.abc.*" //注意binding写法
- 消费信息
MQ.Consume
Queue.Name = MQ.Queue.Name
Auto ack = false
- Sleep模拟处理事务
- 手动ack

3.5.2 思考

  • *# 的区别?
  • 绑定键为 * 的队列会取到一个路由键为空的消息吗?
  • a.*.#a.#的区别在哪儿?

3.6 远程过程调用RPC

8

3.6.1 操作

调用端(生产者):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 调用本机某方法
调用另一个服务的接口
- 连接MQ
- 申明临时Queue
MQ.Queue.exclusive= true //注意临时Queue,用于接收回传信息
- 消费信息
MQ.Consume
Queue.Name= MQ.Queue.Name
Auto ack = true
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = "rpc_queue" //注意发送信息的队列,是服务端的队列,不是上面的临时队列
CorrelationId: corrId, //唯一id,用以与回调结果匹配
ReplyTo: q.Name, //回传地址,用以服务端将结果返回
Body = "123456" //传输id
- 接收结果
匹配CorrelationId 和 Server.CorrelationId
获得信息

服务端(消费者):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- 连接MQ
- 申明Queue
MQ.Queue.Name= "rpc_queue" //注意此队列和调用者的临时队列不同
- QoS
MQ.Qos
prefetch_count = 1 //注意QoS
- 消费信息
MQ.Consume
Queue.Name= MQ.Queue.Name
Auto ack = false
- 调用接口
查询id相关信息
- 发送信息
MQ.Publish
Exchange = ""
RoutingKey = Client.ReplyTo
CorrelationId = Client.CorrelationId
Body = id相关信息

3.6.2 思考

  • 上面例子中使用了临时队列和非持久化队列,会出现什么问题?
  • 如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?
  • 当没有服务器运行时,客户端如何作出反映?

四、 思考总结

  • Q:为何没有给Exchange命名?Exchange是什么类型?

    • 没有命名的交换机是默认(匿名)交换机
    • 默认交换机是Direct类型
  • Q:为何在生产者和消费者都申明了Queue?

    • 声明交换机和队列只能生效一次
    • 由于生产者和消费者无法确定启动顺序,所以两处申明可以防止招不到交换机或队列造成消息丢失
  • Q:RoutingKey和Queue.Name 有什么关系?

    • 默认或者匿名交换机的消息将会根据指定的routing_key分发到指定的队列

  • Q:多个消费者在Direct交换机,相同Queue下,如何接收消息?

    • 轮询分发
  • Q:如何让Queue和消息持久化?

    • 对于交换机和队列,设置其Durable属性
    • 对于消息,设置其发送模式为Persistent(部分语言用编码2表示)
  • Q:可以直接给3.1中的hello队列赋予持久优设置吗?

    • 不可以,队列只可申明一次,改动属性会报异常
  • Q:持久化有什么用?

    • 持久化会使MQ服务退出后,以申明的交换机、队列不仍存在,队列内的数据仍存在
    • 但持久化不能保证所有的数据都不会丢失
  • Q:默认交换机是持久化的吗?

  • Q:手动ack和自动ack的区别?

    • 自动ack会在消费者收到消息时就自动发送确认
    • 手动ack需要消费者自己手动发送确认
    • 消息没有超时的概念
    • 当消息被RabbitMQ发送给消费者之后,马上就会在内存中移除
    • 自动ack场景下,当消费者执行一个费时任务时,MQ崩溃,会导致消息丢失
    • 手动ack场景下,当消费者执行一个费时任务时,MQ崩溃,消息会被重新推送
  • Q:忘记ack怎么办?

    • 忘记ack会让MQ不断重复发送信息,导致MQ内存增加
    • 可以在MQ中查询messages_unacknowledged字段,手动处理
  • Q:QoS(Quality of Service)如何实现?

    • 设置MQ的QoS相关配置
    • 设置prefetch_count = 1

  • Q:扇形交换机的RoutingKey是如何配置的?

    • 填写为空
  • Q:扇形交换机的主要用途?

    • 发送广播
  • Q:什么是临时队列?

    • 临时队列随机生成队列名称
    • 当与消费者断开连接的时候,这个队列应当被立即删除
  • Q:临时队列与持久化可以同时设置吗?

    • 不可以,消费者断开后随机队列就删除,所以一旦MQ退出,消费者就与MQ断开连接,随机队列就会删除,不能设置持久化
  • Q:扇形交换机可以在生产者设置持久化的队列吗?

    • 可以,以保证所有消息不丢失。

    • 但是要注意,一般扇形交换机每个消费者一般独占一条队列,如果多个消费者共用一条队列,会跟直连交换机一样进行轮询分发

    • 如果在生产者设置10个持久化队列,但有20个消费者,每个消费者独占一条队列,那么只有其中的10个可以获取全部信息,其他10个消费者无法使用

    • 如果在生产者设置10个持久化队列,但有20个消费者,每2个消费者共用一条队列,那么20个消费者将轮询消费信息,不能收到完整的全部的信息

    • 所以扇形交换机一般不按照上面的方式使用

  • Q:直连交换机多重绑定和扇形交换机有什么区别?

    • 扇形交换机不能通过RoutingKey过滤消息
  • Q:直连交换机RoutingKey多重绑定有什么限制?

    • 不能用通配符方式模糊过滤消息

  • Q: *# 的区别?

    • (星号) 用来表示一个单词
    • (井号) 用来表示任意数量(零个或多个)单词
  • Q:绑定键为 * 的队列会取到一个路由键为空的消息吗?

    • 不能,星号表示至少一个单词
  • Q:a.*.#a.#的区别在哪儿?

    • 前者不可以匹配 a
    • 后者可以匹配 a

  • Q:上面例子中使用了临时队列和非持久化队列,会出现什么问题?
    • 没启动服务端时,客户机消息丢失
    • 客户端发送给服务端后,客户端断开连接,会导致回传信息丢失
  • Q:如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?
    • 一般来说不需要
  • Q:当没有服务器运行时,客户端如何作出反映?
    • 对于不重要的信息,采用临时队列,丢失消息即可
    • 对于重要信息,采用持久化队列,等待服务器处理,并根据实际情况定时处理(清除或消息补偿)

五、 场景模拟

  • 账号注册后一系列短信邮件通知
    • 使用扇形交换机,短信和邮件服务作为消费者申明临时队列
    • 使用Auto ack
  • 秒杀下单
    • 使用直连交换机,秒杀服务作为生产者,订单中心作为消费者,都需要申明持久化队列
    • 使用手动ack
  • 下单后支付并返回结果
    • 使用直连交换机,下单服务作为生产者,支付服务作为消费者,都需要申明持久化队列
    • 使用手动ack
    • 消息安全性要求很高,需要消息补偿机制
  • 用户下单(订单服务接收下单请求,异步调用仓库服务查询库存是否足够)
    • 使用RPC
    • 仓库服务申明异步调用队列 A
    • 订单服务申明回传队列 B
    • 订单服务将待查询的产品id和B的地址发送到A
    • 仓库服务从A中消费信息,处理订单服务请求,并把结果发送到B
    • 订单服务从B中消费信息,接收仓库服务的查询结果