0%

RocketMQ学习

一、消息队列能用来干什么?

异步,解耦,削峰。

1. 异步:

比如在电商的业务场景下,下单成功后要调用短信模块,邮件模块提醒买家下单成功的信息,而下单和发短信,发邮件使用消息队列来异步执行可以提高响应速度,增强用户体验。

2. 解耦

如果不用消息队列的话,对不同功能模块的调用是要写死在业务代码里的,也就是调用方在调用逻辑还耦合了”调用哪一个服务”这一层逻辑在里面逻辑,日后增加功能或删掉功能的时候,修改业务代码成本会比较大… 而使用消息队列的topic在中间进行一层解耦就能减轻这种问题,发送方只负责发送消息,消费方只负责消费消息,服务的类型被抽象成topic,由消息队列来管理。

3. 削峰

如果上游请求规模超出了下游服务的负载的话,使用消息队列可以起到一个缓冲的作用,即将请求先存起来然后让下游服务尽自己所能的去消费,保护了下游服务系统。

二、 RocketMQ的架构组成

在这里插入图片描述

  • NameServer: 主要是对Broker进行管理,主要包括对消息的路由的管理。

被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群

每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

  • Broker:主要负责对消息的存储,消息的转发

Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。

Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。

  • Producer: 消息生产者,负责产生消息,一般由业务系统负责产生消息。

    RocketMQ 提供了三种方式发送消息:同步、异步和单向

同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。

异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。

单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制。

Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。

Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

三、消息的发送与消费

a. 消息的三种发送方式

1. 发送同步消息

特点:发送完阻塞,直到发送成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

2. 发送异步消息

特点:消息发送完之后不会阻塞,传入一个回调方法,当消息发送成功时会对回调方法进行调用。 适合对响应时间比较敏感的业务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

3. 单向发送消息

不关心发送结果,发送完也不管是否发送成功,可靠性不高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);

}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

b. 消息的类型

1. 顺序消息

就是按照消息的发送顺序来进行消费.

2. 延时消息

在指定时间之后发送一个消息。例如在 10s钟之后发送一个消息,使用限制:

1
2
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

3. 事务消息

在这里插入图片描述

消息的发送和提交

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果去执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

补偿流程(在Commit或Rollback的时候失败了,消息会一直处于unknown状态)

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

部分原理

半消息如何对消费端不可见

发送出去的半消息会把消息的Topic和Queue等属性给存储到消息中,然后再对这两个属性进行替换,然后这个半消息就被存到一个消费者不可见的Queue中了(QueueId是0)。 当进行二段提交的时候,会将预先存在该消息里的原本消息的topic和queue再替换回去,然后发送出去,就能被消费者看到了…

如何确定消息的最终状态

RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息是否状态已经确定(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。

这个Op消息也是被存储在一个单独的队列里面的,消费者不可见。

如何处理二阶段失败的消息

如果在二阶段提交的时候失败了,比如在Commit的时候出现了网络问题导致Commit失败,那么这条消息还是处于unknown状态,为了解决这一问题,rocketmq的使用的方法就是“回查”。

Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。

参考 RocketMQ事务消息实现分析