Skip to content

RocketMQ面试宝典

1.为什么要使用消息队列呢?

消息队列(Message Queue, MQ)是一种非常重要的中间件技术,广泛应用于分布式系统中,以提高系统的可用性、解耦能力和异步通信效率。

①、解耦

生产者将消息放入队列,消费者从队列中取出消息,这样一来,生产者和消费者之间就不需要直接通信,生产者只管生产消息,消费者只管消费消息,这样就实现了解耦。

三分恶面渣逆袭:消息队列解耦

PmHub 中的任务审批,就用了 RocketMQ 来做解耦。

②、异步

系统可以将那些耗时的任务放在消息队列中异步处理,从而快速响应用户的请求。比如说,用户下单后,系统可以先返回一个下单成功的消息,然后将订单信息放入消息队列中,后台系统再去处理订单信息。

三分恶面渣逆袭:消息队列异步

③、削峰

削峰填谷是一种常见的技术手段,用于应对系统高并发请求的瞬时流量高峰,通过消息队列,可以将瞬时的高峰流量转化为持续的低流量,从而保护系统不会因为瞬时的高流量而崩溃。

三分恶面渣逆袭:消息队列削峰

如何用RocketMQ做削峰填谷的?

用户请求到达系统后,由生产者接收请求并将其转化为消息,发送到 RocketMQ 队列中。队列用来充当缓冲区,将大量请求按照顺序排队,这样就可以削减请求高峰时对后端服务的直接压力。

不仅如此,生产者通过异步方式发送消息,还可以快速响应用户请求。

消费者从 RocketMQ 队列中按照一定速率读取消息并进行处理。可以根据后端处理能力和当前负载情况动态调整消费者的消费速率,达到填谷的效果。

2.为什么要选择 RocketMQ?

四大消息队列对比

我们系统主要面向 C 端用户,有一定的并发量,对性能也有比较高的要求,所以选择了低延迟、吞吐量比较高,可用性比较好的 RocketMQ。

3.RocketMQ 有什么优缺点?

RocketMQ 优点:

  • 单机吞吐量:十万级
  • 可用性:非常高,分布式架构
  • 消息可靠性:经过参数优化配置,消息可以做到 0 丢失
  • 功能支持:MQ 功能较为完善,还是分布式的,扩展性好
  • 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降
  • 源码是 Java,方便结合公司自己的业务二次开发
  • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
  • RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ

RocketMQ 缺点:

  • 支持的客户端语言不多,目前是 Java 及 c++,其中 c++不成熟
  • 没有在 MQ 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

说说你对 RocketMQ 的理解?

牧小农:RocketMQ 的作用

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,具有高吞吐量、低延迟和高可用性。其主要组件包括生产者、消费者、Broker、Topic 和队列。消息由生产者发送到 Broker,再根据路由规则存储到队列中,消费者从队列中拉取消息进行处理。适用于异步解耦和流量削峰等场景。

4.消息队列有哪些消息模型?

消息队列有两种模型:队列模型发布/订阅模型

  • 队列模型

这是最初的一种消息队列模型,对应着消息队列“发-存-收”的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。

队列模型

  • 发布/订阅模型

如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。解决的方式就是发布/订阅模型。

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。

发布-订阅模型

它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。

5.那 RocketMQ 的消息模型呢?

RocketMQ 使用的消息模型是标准的发布-订阅模型,在 RocketMQ 的术语表中,生产者、消费者和主题,与发布-订阅模型中的概念是完全一样的。

RocketMQ 本身的消息是由下面几部分组成:

RocketMQ消息的组成

  • Message

Message(消息)就是要传输的信息。

一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。

一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

  • Topic

Topic(主题)可以看做消息的归类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有 0 个、1 个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。

一个 Topic 也可以被 0 个、1 个、多个消费者订阅。

  • Tag

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag

标签有助于保持你的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

  • Group

RocketMQ 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。

消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。默认情况,如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。

  • Message Queue

Message Queue(消息队列),一个 Topic 下可以设置多个消息队列,Topic 包括多个 Message Queue ,如果一个 Consumer 需要获取 Topic 下所有的消息,就要遍历所有的 Message Queue。

RocketMQ 还有一些其它的 Queue——例如 ConsumerQueue。

  • Offset

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 RocketMQ 为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。

也可以这么说,Queue 是一个长度无限的数组,Offset 就是下标。

RocketMQ 的消息模型中,这些就是比较关键的概念了。画张图总结一下:

6.消息的消费模式了解吗?

消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

两种消费模式

默认情况下就是集群消费,这种模式下一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。

7.RoctetMQ 基本架构了解吗?

先看图,RocketMQ 的基本架构:

RocketMQ架构

RocketMQ 一共有四个部分组成:NameServer,Broker,Producer 生产者,Consumer 消费者,它们对应了:发现、发、存、收,为了保证高可用,一般每一部分都是集群部署的。

8.那能介绍一下这四部分吗?

类比一下我们生活的邮政系统——

邮政系统要正常运行,离不开下面这四个角色, 一是发信者,二 是收信者, 三是负责暂存传输的邮局, 四是负责协调各个地方邮局的管理机构。对应到 RocketMQ 中,这四个角色就是 Producer、 Consumer、 Broker 、NameServer。

RocketMQ类比邮政体系

NameServer

NameServer 是一个无状态的服务器,角色类似于 Kafka 使用的 Zookeeper,但比 Zookeeper 更轻量。

特点:

  • 每个 NameServer 结点之间是相互独立,彼此没有任何信息交互。
  • Nameserver 被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer 在发送消息前从 NameServer 中获取 Topic 的路由信息也就是发往哪个 Broker,Consumer 也会定时从 NameServer 获取 Topic 的路由信息,Broker 在启动时会向 NameServer 注册,并定时进行心跳连接,且定时同步维护的 Topic 到 NameServer。

功能主要有两个:

  • 1、和 Broker 结点保持长连接。
  • 2、维护 Topic 的路由信息。
Broker

消息存储和中转角色,负责存储和转发消息。

  • Broker 内部维护着一个个 Consumer Queue,用来存储消息的索引,真正存储消息的地方是 CommitLog(日志文件)。

RocketMQ存储-图片来源官网

  • 单个 Broker 与所有的 Nameserver 保持着长连接和心跳,并会定时将 Topic 信息同步到 NameServer,和 NameServer 的通信底层是通过 Netty 实现的。
Producer

消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。

  • Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。

  • RocketMQ 提供了三种方式发送消息:同步、异步和单向

  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。

  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。

  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Consumer也由用户部署,支持 PUSH 和 PULL 两种消费模式,支持集群消费广播消费,提供实时的消息订阅机制
  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

9.如何保证消息的可用性/可靠性/不丢失呢?

消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段、存储阶段、消费阶段。

所以要从这三个阶段考虑:

消息传递三阶段

生产

在生产阶段,主要通过请求确认机制,来保证消息的可靠传递

  • 1、同步发送的时候,要注意处理响应结果和异常。如果返回响应 OK,表示消息成功发送到了 Broker,如果响应失败,或者发生其它异常,都应该重试。
  • 2、异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
  • 3、如果发生超时的情况,也可以通过查询日志的 API,来检查是否在 Broker 存储成功。
存储

存储阶段,可以通过配置可靠性优先的 Broker 参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步。

  • 1、消息只要持久化到 CommitLog(日志文件)中,即使 Broker 宕机,未消费的消息也能重新恢复再消费。
  • 2、Broker 的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在 pagecache 中(内存中),但是同步刷盘更可靠,它是 Producer 发送消息后等数据持久化到磁盘之后再返回响应给 Producer。

同步刷盘和异步刷盘-图片来源官网

  • 3、Broker 通过主从模式来保证高可用,Broker 支持 Master 和 Slave 同步复制、Master 和 Slave 异步复制模式,生产者的消息都是发送给 Master,但是消费既可以从 Master 消费,也可以从 Slave 消费。同步复制模式可以保证即使 Master 宕机,消息肯定在 Slave 中有备份,保证了消息不会丢失。
消费

从 Consumer 角度分析,如何保证消息被成功消费?

  • Consumer 保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

10.如何处理消息重复的问题呢?

RocketMQ 可以保证消息一定投递,且不丢失,但无法保证消息不重复消费。

因此,需要在业务端做好消息的幂等性处理,或者做消息去重。

三分恶面渣逆袭:幂等和去重

幂等性是指一个操作可以执行多次而不会产生副作用,即无论执行多少次,结果都是相同的。可以在业务逻辑中加入检查逻辑,确保同一消息多次消费不会产生副作用。

例如,在支付场景下,消费者消费扣款的消息,对一笔订单执行扣款操作,金额为100元。

如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果要保证只扣款一次,金额为100元。如果扣款操作是符合要求的,那么就可以认为整个消费过程实现了消息幂等。

消息去重,是指在消费者消费消息之前,先检查一下是否已经消费过这条消息,如果消费过了,就不再消费。

业务端可以通过一个专门的表来记录已经消费过的消息 ID,每次消费消息之前,先查询一下这个表,如果已经存在,就不再消费。

java
public void processMessage(String messageId, String message) {
    if (!isMessageProcessed(messageId)) {
        // 处理消息
        markMessageAsProcessed(messageId);
    }
}

private boolean isMessageProcessed(String messageId) {
    // 查询去重表,检查消息ID是否存在
}

private void markMessageAsProcessed(String messageId) {
    // 将消息ID插入去重表
}

如何保证消息的幂等性?

勇哥:消费幂等

首先,消息必须携带业务唯一标识,可以通过雪花算法生成全局唯一 ID。

java
Message msg = new Message(TOPIC /* Topic */,
             TAG /* Tag */,
               ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
             );
message.setKey("ORDERID_100"); // 订单编号
SendResult sendResult = producer.send(message);

其次,在消费者接收到消息后,判断 Redis 中是否存在该业务主键的标志位,若存在标志位,则认为消费成功,否则执行业务逻辑,执行完成后,在缓存中添加标志位。

java
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    try {
        for (MessageExt messageExt : msgs) {
           String bizKey = messageExt.getKeys(); // 唯一业务主键
           //1. 判断是否存在标志
           if(redisTemplate.hasKey(RedisKeyConstants.WAITING_SEND_LOCK + bizKey)) {
         			continue;
       		 }
         	 //2. 执行业务逻辑
           //TODO do business
           //3. 设置标志位
           redisTemplate.opsForValue().set(RedisKeyConstants.WAITING_SEND_LOCK + bizKey, "1", 72, TimeUnit.HOURS);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        logger.error("consumeMessage error: ", e);
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
}

然后,利用数据库的唯一索引来防止业务的重复插入。

sql
CREATE TABLE `t_order` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `order_id` varchar(64) NOT NULL COMMENT '订单编号',
  `order_name` varchar(64) NOT NULL COMMENT '订单名称',
  PRIMARY KEY (`id`),
  UNIQUE KEY `order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

最后,在数据库表中使用版本号,通过乐观锁机制来保证幂等性。每次更新操作时检查版本号是否一致,只有一致时才执行更新并递增版本号。如果版本号不一致,则说明操作已被执行过,拒绝重复操作。

java
public void updateRecordWithOptimisticLock(int id, String newValue, int expectedVersion) {
    int updatedRows = jdbcTemplate.update(
        "UPDATE records SET value = ?, version = version + 1 WHERE id = ? AND version = ?",
        newValue, id, expectedVersion
    );
    if (updatedRows == 0) {
        throw new OptimisticLockingFailureException("Record has been modified by another transaction");
    }
}

或者悲观锁机制,通过数据库的锁机制来保证幂等性。

java
public void updateRecordWithPessimisticLock(int id) {
    jdbcTemplate.queryForObject("SELECT * FROM records WHERE id = ? FOR UPDATE", id);
    jdbcTemplate.update("UPDATE records SET value = ? WHERE id = ?", "newValue", id);
}

雪花算法了解吗?

雪花算法是由 Twitter 开发的一种分布式唯一 ID 生成算法。

雪花算法以 64 bit 来存储组成 ID 的4 个部分:

  1. 最高位占1 bit,始终为 0,表示正数。
  2. 中位占 41 bit,值为毫秒级时间戳;
  3. 中下位占 10 bit,机器 ID(包括数据中心 ID 和机器 ID),可以支持 1024 个节点。
  4. 末位占 12 bit,值为当前毫秒内生成的不同的自增序列,值的上限为 4096;

目前雪花算法的实现比较多,可以直接使用 Hutool 工具类库中的 IdUtil.getSnowflake() 方法来获取雪花 ID。

java
long id = IdUtil.getSnowflakeNextId();

11.怎么处理消息积压?

发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:

消息积压处理

  • 消费者扩容:如果当前 Topic 的 Message Queue 的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。
  • 消息迁移 Queue 扩容:如果当前 Topic 的 Message Queue 的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容 Message Queue。可以新建一个临时的 Topic,临时的 Topic 多设置一些 Message Queue,然后先用一些消费者把消费的数据丢到临时的 Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的 Topic 里的数据,消费完了之后,恢复原状。

消息迁移扩容消费

12.顺序消息如何实现?

RocketMQ 实现顺序消息的关键在于保证消息生产和消费过程中严格的顺序控制,即确保同一业务的消息按顺序发送到同一个队列中,并由同一个消费者线程按顺序消费。

三分恶面渣逆袭:顺序消息

局部顺序消息如何实现?

局部顺序消息保证在某个逻辑分区或业务逻辑下的消息顺序,例如同一个订单或用户的消息按顺序消费,而不同订单或用户之间的顺序不做保证。

三分恶面渣逆袭:部分顺序消息

全局顺序消息如何实现?

全局顺序消息保证消息在整个系统范围内的严格顺序,即消息按照生产的顺序被消费。

可以将所有消息发送到一个单独的队列中,确保所有消息按生产顺序发送和消费。

三分恶面渣逆袭:全局顺序消息

13.如何实现消息过滤?

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。
  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

一般采用 Cosumer 端过滤,如果希望提高吞吐量,可以采用 Broker 过滤。

对消息的过滤有三种方式:

消息过滤

  • 根据 Tag 过滤:这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL 表达式过滤:SQL 表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();
  • Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

14.延时消息了解吗?

电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

RocketMQ 是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:

// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
    Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
    // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
    message.setDelayTimeLevel(3);
    // 发送消息
    producer.send(message);
}

但是目前 RocketMQ 支持的延时级别是有限的:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

RocketMQ 怎么实现延时消息的?

简单,八个字:临时存储+定时任务

Broker 收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX)的相应时间段的 Message Queue 中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标 Topic 的队列中,然后消费者就可以正常消费这些消息。

延迟消息处理流程-图片来源见水印

15.怎么实现分布式消息事务的?半消息?

半消息:是指暂时还不能被 Consumer 消费的消息,Producer 成功发送到 Broker 端的消息,但是此消息被标记为 “暂不可投递” 状态,只有等 Producer 端执行完本地事务后经过二次确认了之后,Consumer 才能消费此条消息。

依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:

RocketMQ实现消息事务

  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
  • 8、消费者段消费到消息之后,执行本地事务,执行本地事务。

16.死信队列知道吗?

死信队列用于存储那些无法被正常处理的消息,这些消息被称为死信(Dead Letter)。

阿里云官方文档:死信队列

产生死信的原因是,消费者在处理消息时发生异常,且达到了最大重试次数。当消费失败的原因排查并解决后,可以重发这些死信消息,让消费者重新消费;如果暂时无法处理,为避免到期后死信消息被删除,可以先将死信消息导出并进行保存。

17.如何保证 RocketMQ 的高可用?

NameServer 因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。

NameServer集群

RocketMQ 的高可用主要是在体现在 Broker 的读和写的高可用,Broker 的高可用是通过集群主从实现的。

Broker集群、主从示意图

Broker 可以配置两种角色:Master 和 Slave,Master 角色的 Broker 支持读和写,Slave 角色的 Broker 只支持读,Master 会向 Slave 同步消息。

也就是说 Producer 只能向 Master 角色的 Broker 写入消息,Cosumer 可以从 Master 和 Slave 角色的 Broker 读取消息。

Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候, Consumer 的读请求会被自动切换到从 Slave。有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响 Consumer 读取消息,这就实现了读的高可用。

如何达到发送端写的高可用性呢?在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 机器组成 Broker 组),这样当 Broker 组的 Master 不可用后,其他组 Master 仍然可用, Producer 仍然可以发送消息 RocketMQ 目前还不支持把 Slave 自动转成 Master ,如果机器资源不足,需要把 Slave 转成 Master ,则要手动停止 Slave 色的 Broker ,更改配置文件,用新的配置文件启动 Broker。

18.说一下 RocketMQ 的整体工作流程?

简单来说,RocketMQ 是一个分布式消息队列,也就是消息队列+分布式系统

作为消息队列,它是--的一个模型,对应的就是 Producer、Broker、Cosumer;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是 Broker、Producer/Consumer、NameServer

所以我们看一下它主要的工作流程:RocketMQ 由 NameServer 注册中心集群、Producer 生产者集群、Consumer 消费者集群和若干 Broker(RocketMQ 进程)组成:

  1. Broker 在启动的时候去向所有的 NameServer 注册,并保持长连接,每 30s 发送一次心跳
  2. Producer 在发送消息的时候从 NameServer 获取 Broker 服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Conusmer 消费消息的时候同样从 NameServer 获取 Broker 地址,然后主动拉取消息来消费

RocketMQ整体工作流程

19.为什么 RocketMQ 不使用 Zookeeper 作为注册中心呢?

Kafka 我们都知道采用 Zookeeper 作为注册中心——当然也开始逐渐去 Zookeeper,RocketMQ 不使用 Zookeeper 其实主要可能从这几方面来考虑:

  1. 基于可用性的考虑,根据 CAP 理论,同时最多只能满足两个点,而 Zookeeper 满足的是 CP,也就是说 Zookeeper 并不能保证服务的可用性,Zookeeper 在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑,NameServer 本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而 Zookeeper 的写是不可扩展的,Zookeeper 要解决这个问题只能通过划分领域,划分多个 Zookeeper 集群来解决,首先操作起来太复杂,其次这样还是又违反了 CAP 中的 A 的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该弱依赖注册中心,而 RocketMQ 的设计理念也正是基于此,生产者在第一次发送消息的时候从 NameServer 获取到 Broker 地址后缓存到本地,如果 NameServer 整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

20.Broker 是怎么保存数据的呢?

RocketMQ 主要的存储文件包括 CommitLog 文件、ConsumeQueue 文件、Indexfile 文件。

消息存储的整体的设计:

消息存储整体设计-来源官网

  • CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

CommitLog 文件保存于${Rocket_Home}/store/commitlog 目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认 1G,写满后自动生成一个新的文件。

CommitLog

  • ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。

Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。

ConsumeQueue 文件可以看成是基于 Topic 的 CommitLog 索引文件,故 ConsumeQueue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 ConsumeQueue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

Comsumer Queue

  • IndexFile:IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是: {fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 RocketMQ 的索引文件其底层实现为 hash 索引。

IndexFile文件示意图

总结一下:RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。

RocketMQ 的混合型存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog 中)针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。

只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。

这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程—ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据。

21.说说 RocketMQ 怎么对文件进行读写的?

RocketMQ 对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache顺序读写零拷贝

  • PageCache、顺序读取

在 RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 page cache 机制的预读取作用下,Consume Queue 文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于 CommitLog 消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统 IO 调度算法,比如设置调度算法为“Deadline”(此时块存储采用 SSD 的话),随机读的性能也会有所提升。

页缓存(PageCache)是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

  • 零拷贝

另外,RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种 Mmap 的方式减少了传统 IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故 RocketMQ 的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

说说什么是零拷贝?

在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换。

传统文件传输示意图-来源《图解操作系统》

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复制到用户态内存;
  3. 然后从用户态内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。

所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换内存拷贝的次数,用来提升 I/O 的性能。零拷贝比较常见的实现方式是mmap,这种机制在 Java 中是通过 MappedByteBuffer 实现的。

mmap示意图-来源《图解操作系统》

22.消息刷盘怎么实现的呢?

RocketMQ 提供了两种刷盘策略:同步刷盘和异步刷盘

  • 同步刷盘:在消息达到 Broker 的内存之后,必须刷到 commitLog 日志文件中才算成功,然后返回 Producer 数据已经发送成功。
  • 异步刷盘:异步刷盘是指消息达到 Broker 内存后就返回 Producer 数据已经发送成功,会唤醒一个线程去将数据持久化到 CommitLog 日志文件中。

Broker 在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。

刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成。

异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示。

异步刷盘

23.能说下 RocketMQ 的负载均衡是如何实现的?

RocketMQ 中的负载均衡都在 Client 端完成,具体来说的话,主要可以分为 Producer 端发送消息时候的负载均衡和 Consumer 端订阅消息的负载均衡。

Producer 的负载均衡

Producer 端在发送消息的时候,会先根据 Topic 找到指定的 TopicPublishInfo,在获取了 TopicPublishInfo 路由信息后,RocketMQ 的客户端在默认方式下 selectOneMessageQueue()方法会从 TopicPublishInfo 中的 messageQueueList 中选择一个队列(MessageQueue)进行发送消息。具这里有一个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。

所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550Lms,就退避 3000Lms;超过 1000L,就退避 60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制是实现消息发送高可用的核心关键所在。

Consumer 的负载均衡

在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端知道从 Broker 端的哪一个消息队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。

  1. Consumer 端的心跳包发送

在 Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端 id 的值等信息)。Broker 端在收到 Consumer 的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。

  1. Consumer 端实现负载均衡的核心类—RebalanceImpl

在 Consumer 实例的启动流程中的启动 MQClientInstance 实例部分,会完成负载均衡服务线程—RebalanceService 的启动(每隔 20s 执行一次)。

通过查看源码可以发现,RebalanceService 线程的 run()方法最终调用的是 RebalanceImpl 类的 rebalanceByTopic()方法,这个方法是实现 Consumer 端负载均衡的核心。

rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:

(1) 从 rebalanceImpl 实例的本地缓存变量—topicSubscribeInfoTable 中,获取该 Topic 主题下的消息消费队列集合(mqSet);

(2) 根据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList()方法向 Broker 端发送通信请求,获取该消费组下消费者 Id 列表;

(3) 先对 Topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端 Consumer 排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range,最后遍历整个 range 而计算出当前 Consumer 端应该分配到的的 MessageQueue。

Cosumer分配

(4) 然后,调用 updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与 processQueueTable 做一个过滤比对。

  • 上图中 processQueueTable 标注的红色部分,表示与分配到的消息队列集合 mqSet 互不包含。将这些队列设置 Dropped 属性为 true,然后查看这些队列是否可以移除出 processQueueTable 缓存变量,这里具体执行 removeUnnecessaryMessageQueue()方法,即每隔 1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回 true。如果等待 1s 后,仍然拿不到当前消费处理队列的锁则返回 false。如果返回 true,则从 processQueueTable 缓存变量中移除对应的 Entry;
  • 上图中 processQueueTable 的绿色部分,表示与分配到的消息队列集合 mqSet 的交集。判断该 ProcessQueue 是否已经过期了,在 Pull 模式的不用管,如果是 Push 模式的,设置 Dropped 属性为 true,并且调用 removeUnnecessaryMessageQueue()方法,像上面一样尝试移除 Entry;
  • 最后,为过滤后的消息队列集合(mqSet)中的每个 MessageQueue 创建一个 ProcessQueue 对象并存入 RebalanceImpl 的 processQueueTable 队列中(其中调用 RebalanceImpl 实例的 computePullFromWhere(MessageQueue mq)方法获取该 MessageQueue 对象的下一个进度消费值 offset,随后填充至接下来要创建的 pullRequest 对象属性中),并创建拉取请求对象—pullRequest 添加到拉取列表—pullRequestList 中,最后执行 dispatchPullRequest()方法,将 Pull 消息的请求对象 PullRequest 依次放入 PullMessageService 服务线程的阻塞队列 pullRequestQueue 中,待该服务线程取出后向 Broker 端发起 Pull 消息的请求。其中,可以重点对比下,RebalancePushImpl 和 RebalancePullImpl 两个实现类的 dispatchPullRequest()方法不同,RebalancePullImpl 类里面的该方法为空。

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

24.RocketMQ 消息长轮询了解吗?

所谓的长轮询,就是 Consumer 拉取消息,如果对应的 Queue 如果没有数据,Broker 不会立即返回,而是把 PullReuqest hold 起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。

长轮询简单示意图

  • PullMessageProcessor#processRequest
//如果没有拉到数据
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允许 suspend,默认开启
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
    }

    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    //封装一个PullRequest
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
    //把PullRequest挂起来
    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
    response = null;
    break;
}

挂起的请求,有一个服务线程会不停地检查,看 queue 中是否有数据,或者超时。

  • PullRequestHoldService#run()
java
@Override
public void run() {
    log.info("{} service started"this.getServiceName());
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            //检查hold住的请求
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end"this.getServiceName());
}

没有什么使我停留——除了目的,纵然岸旁有玫瑰、有绿荫、有宁静的港湾,我是不系之舟。