Skip to main content

基本概念


1 消息模型(Message Model)

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 topic 的消息,每个 topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

2 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

3 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

4 主题(topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

5 代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

6 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

7 拉取式消费(Pull Consumer)

Consumer 消费的一种类型,应用通常主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

8 推动式消费(Push Consumer)

Consumer 消费的一种类型,应用不需要主动调用 Consumer 的拉消息方法,在底层已经封装了拉取的调用逻辑,在用户层面看来是 broker 把消息推送过来的,其实底层还是 consumer 去 broker 主动拉取消息。

9 生产者组(Producer Group)

同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

10 消费者组(Consumer Group)

同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

11 集群消费(Clustering)

集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

12 广播消费(Broadcasting)

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

13 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消息队列( topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

14 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

15 消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的 Message ID,且可以携带具有业务标识的 Key。系统提供了通过 Message ID 和 Key 查询消息的功能。

16 标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

特性(features)


1 订阅与发布

消息的发布是指某个生产者向某个 topic 发送消息;消息的订阅是指某个消费者关注了某个 topic 中带有某些 tag 的消息,进而从该 topic 消费数据。

2 消息顺序

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序 对于指定的一个 topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序 对于指定的一个 topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

3 消息过滤

RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

4 消息可靠性

RocketMQ 支持消息的高可靠,影响消息可靠性的几种情况:

  1. Broker 非正常关闭
  2. Broker 异常 Crash
  3. OS Crash
  4. 机器掉电,但是能立即恢复供电情况
  5. 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
  6. 磁盘设备损坏

1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ 在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情况下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。注:RocketMQ 从 3.0 版本开始支持同步双写。

5 至少一次

至少一次(At least Once)指每个消息必须投递一次。Consumer 先 Pull 消息到本地,消费完成后,才向服务器返回 ack,如果没有消费一定不会 ack 消息,所以 RocketMQ 可以很好的支持此特性。

6 回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

7 事务消息

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

8 定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项 messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18 个 level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel 是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level 有以下三种情况:

 level == 0,消息为非延迟消息
1<=level<=maxLevel,消息延迟特定时间,例如 level==1,延迟 1s
level > maxLevel,则 level== maxLevel,例如 level==20,延迟 2h

定时消息会暂存在名为 SCHEDULE*topic_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker 会调度地消费 SCHEDULE*topic_XXXX,将消息写入真实的 topic

需要注意的是,定时消息会在第一次写入和调度写入真实 topic 时都会计数,因此发送数量、tps 都会变高。

9 消息重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  • 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10 秒后再重试。
  • 由于依赖的下游应用服务不可用,例如 db 连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

RocketMQ 会为每个消费组都设置一个 topic 名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个 topic 的重试队列是针对消费组,而不是针对每个 topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 topic 名称为“SCHEDULE_topic_XXXX”的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至“%RETRY%+consumerGroup”的重试队列中。

10 消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。如下方法可以设置消息重试策略:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为 2,因此生产者会最多尝试发送 retryTimesWhenSendFailed + 1 次。不会选择上次失败的 broker,尝试向其他 broker 发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投。
  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他 broker,仅在同一个 broker 上做重试,不保证消息不丢。
  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启。

11 流量控制

生产者流控,因为 broker 处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。

生产者流控:

  • commitLog 文件被锁时间超过 osPageCacheBusyTimeOutMills 时,参数默认为 1000ms,返回流控。
  • 如果开启 transientStorePoolEnable == true,且 broker 为异步刷盘的主机,且 transientStorePool 中资源不足,拒绝当前 send 请求,返回流控。
  • broker 每隔 10ms 检查 send 请求队列头部请求的等待时间,如果超过 waitTimeMillsInSendQueue,默认 200ms,拒绝当前 send 请求,返回流控。
  • broker 通过拒绝 send 请求方式实现流量控制。

注意,生产者流控,不会尝试消息重投。

消费者流控:

  • 消费者本地缓存消息数超过 pullThresholdForQueue 时,默认 1000。
  • 消费者本地缓存消息大小超过 pullThresholdSizeForQueue 时,默认 100MB。
  • 消费者本地缓存消息跨度超过 consumeConcurrentlyMaxSpan 时,默认 2000。

消费者流控的结果是降低拉取频率。

12 死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

应用场景

1、异步解耦

2、削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的最佳方式;

  1. 超高流量脉冲处理能力

    MQ 超高性能的消息处理能力可以承接流量脉冲而不被击垮,在确保系统可用性同时,因快速有效的请求响应而提升用户的体验;

  2. 海量消息堆积能力

    确保下游业务在安全水位内平滑稳定的运行,避免超高流量的冲击;

  3. 合理的成本控制

    通过削弱填谷可控制下游业务系统的集群规模,从而降低投入成本;

3、顺序消息

细数日常中需要保证顺序的应用场景非常多,比如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与 FIFO 原理类似,MQ 提供的顺序消息即保证消息的先进先出;

  1. 严格保序

    与部分开源产品不同,无论是服务上下线、扩缩容,网络不稳定等情况下,MQ 始终保证消息的有序;

  2. 高性能&可扩展

    支持全局顺序与分区顺序,分别满足不同的业务需求,如证券交易系统中相同股别采用全局顺序,交易系统的不同订单之间采用分区顺序;分区顺序在严格保序的同时,通过分区动态扩展能力提高整体的并发与扩展能力;

4、分布式事务

阿里巴巴的交易系统、支付红包等场景需要确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性;

  1. 传统事务

    多个系统或者应用组件之间的业务处理会耦合到一个大事务中,响应时间长,业务链路长从而影响系统的整体性能和可用性,甚至引起系统崩溃;

  2. 分布式事务

    将核心链路业务与可异步化处理的分支链路进行拆分,将大事务拆分成小事务,减少系统间的交互,既高效又可靠;MQ 的可靠传输与多副本技术在确保消息不丢,At-Least-Once 特性确保数据的最终一致性;

5、大数据分析

6、分布模式缓存同步

架构


1 技术架构

image-20221022114032060

RocketMQ 架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer 是一个非常简单的 topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能:Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer 和 Consumer 仍然可以动态感知 Broker 的路由的信息。

  • BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker 包含了以下几个重要子模块。

    1. Remoting Module:整个 Broker 的实体,负责处理来自 Client 端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 topic 订阅信息。
    3. Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
    5. Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

image-20221022114042576

2 部署架构

image-20221022114055250

RocketMQ 网络部署特点

  • NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 topic 信息到所有 NameServer。 注意:当前 RocketMQ 版本在部署架构上支持一 Master 多 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。

  • Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 topic 路由信息,并向提供 topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。

  • Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 topic 路由信息,并向提供 topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读 I/O),以及从服务器是否可读等因素建议下一次是从 Master 还是 Slave 拉取。

结合部署架构图,描述集群工作流程:

  • 启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
  • Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP+端口等)以及存储所有 topic 信息。注册成功后,NameServer 集群中就有 topic 跟 Broker 的映射关系。
  • 收发消息前,先创建 topic,创建 topic 时需要指定该 topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 topic
  • Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
  • Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

来源于其他解释

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer 是一个非常简单的 topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。主要包括两个功能:Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。然后 Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。NameServer 通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer 和 Consumer 仍然可以动态感知 Broker 的路由的信息。
  • BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker 包含了以下几个重要子模块。
    1. Remoting Module:整个 Broker 的实体,负责处理来自 Client 端的请求。
    2. Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 topic 订阅信息。
    3. Store Service:提供方便简单的 API 接口处理消息存储到物理硬盘和查询功能。
    4. HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
    5. Index Service:根据特定的 Message key 对投递到 Broker 的消息进行索引服务,以提供消息的快速查询。

设计


1 消息存储

image-20221022114451231

消息存储是 RocketMQ 中最为复杂和最为重要的一部分,本节将分别从 RocketMQ 的消息存储整体架构、PageCache 与 Mmap 内存映射以及 RocketMQ 中两种不同的刷盘方式三方面来分别展开叙述。

1.1 消息存储整体架构

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

(1) CommitLog:消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G, 文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:消息消费索引,引入的目的主要是提高消息消费的性能。由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件,根据 topic 检索消息是非常低效的。Consumer 可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue 作为消费消息的索引,保存了指定 topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/topic/queueId/fileName。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

(3) IndexFile:IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是:$HOME/store/index/fileName,文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 RocketMQ 的索引文件其底层实现为 hash 索引。

在上面的 RocketMQ 的消息存储整体架构图中可以看出,RocketMQ 采用的是混合型的存储结构,即为 Broker 单个实例下所有的队列共用一个日志数据文件(即为 CommitLog)来存储。RocketMQ 的混合型存储结构(多个 topic 的消息实体内容都存储于一个 CommitLog 中)针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构,Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘持久化,保存至 CommitLog 中。只要消息被刷盘持久化至磁盘文件 CommitLog 中,那么 Producer 发送的消息就不会丢失。正因为如此,Consumer 也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker 允许等待 30s 的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ 的具体做法是,使用 Broker 端的后台服务线程—ReputMessageService 不停地分发请求并异步构建 ConsumeQueue(逻辑消费队列)和 IndexFile(索引文件)数据。

1.2 页缓存与内存映射

页缓存(PageCache)是 OS 对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于 OS 使用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

在 RocketMQ 中,ConsumeQueue 逻辑消费队列存储的数据较少,并且是顺序读取,在 page cache 机制的预读取作用下,Consume Queue 文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于 CommitLog 消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统 IO 调度算法,比如设置调度算法为“Deadline”(此时块存储采用 SSD 的话),随机读的性能也会有所提升。

另外,RocketMQ 主要通过 MappedByteBuffer 对文件进行读写操作。其中,利用了 NIO 中的 FileChannel 模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种 Mmap 的方式减少了传统 IO 将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故 RocketMQ 的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

1.3 消息刷盘

image-20221022114507434

(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

(2) 异步刷盘:能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。

2 通信机制

RocketMQ 消息队列集群主要包括 NameServer、Broker(Master/Slave)、Producer、Consumer4 个角色,基本通讯流程如下:

(1) Broker 启动后需要完成一次将自己注册至 NameServer 的操作;随后每隔 30s 时间定时向 NameServer 上报 topic 路由信息。

(2) 消息生产者 Producer 作为客户端发送消息时候,需要根据消息的 topic 从本地缓存的 topicPublishInfoTable 获取路由信息。如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息。

(3) 消息生产者 Producer 根据 2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker 作为消息的接收者接收消息并落盘存储。

(4) 消息消费者 Consumer 根据 2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面 1)~3)中可以看出在消息生产者,Broker 和 NameServer 之间都会发生通信(这里只说了 MQ 的部分通信),因此如何设计一个良好的网络通信模块在 MQ 中至关重要,它将决定 RocketMQ 集群整体的消息传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ 消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如 rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ 消息队列自定义了通信协议并在 Netty 的基础之上扩展了通信模块。

2.1 协议设计与编解码

在 Client 和 Server 之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义 RocketMQ 的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在 RocketMQ 中,RemotingCommand 这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

Header 字段类型Request 说明Response 说明
codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0 表示成功,非 0 则表示各种错误
languageLanguageCode请求方实现的语言应答方实现的语言
versionint请求方程序的版本应答方程序的版本
opaqueint相当于 requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回
flagint区分是普通 RPC 还是 onewayRPC 的标志区分是普通 RPC 还是 onewayRPC 的标志
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap<String, String>请求自定义扩展信息响应自定义扩展信息

image-20221022114532756

可见传输内容主要可以分为以下 4 部分:

(1) 消息长度:总长度,四个字节存储,占用一个 int 类型;

(2) 序列化类型&消息头长度:同样占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

(3) 消息头数据:经过序列化后的消息头数据;

(4) 消息主体数据:消息主体的二进制字节数据内容;

2.3 消息的通信方式和流程

在 RocketMQ 消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway) 三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其 Response。这里,主要介绍 RocketMQ 的异步通信流程。

img

2.4 Reactor 多线程设计

RocketMQ 的 RPC 通信采用 Netty 组件作为底层通信库,同样也遵循了 Reactor 多线程模型,同时又在这之上做了一些扩展和优化。

img

从上面的框图中可以大致了解 RocketMQ 中 NettyRemotingServer 的 Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的 1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为 3),在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

线程数线程名线程具体说明
1NettyBoss_%dReactor 主线程
NNettyServerEPOLLSelector*%d*%dReactor 线程池
M1NettyServerCodecThread_%dWorker 线程池
M2RemotingExecutorThread_%d业务 processor 处理线程池

3 消息过滤

RocketMQ 分布式消息队列的消息过滤方式有别于其它 MQ 中间件,是在 Consumer 端订阅消息时再做消息过滤的。RocketMQ 这么做是在于其 Producer 端写入消息和 Consumer 端订阅消息采用分离存储的机制来实现的,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其 ConsumeQueue 的存储结构如下,可以看到其中有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的消息过滤正是基于这个字段值的。

img

主要支持如下 2 种的过滤方式 (1) Tag 过滤方式:Consumer 端在订阅消息时除了指定 topic 还可以指定 TAG,如果一个消息有多个 TAG,可以用||分隔。其中,Consumer 端会将这个订阅请求构建成一个 SubscriptionData,发送一个 Pull 消息的请求给 Broker 端。Broker 端从 RocketMQ 的文件存储层—Store 读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给 Store。Store 从 ConsumeQueue 读取到一条记录后,会用它记录的消息 tag hash 值去做过滤,由于在服务端只是根据 hashcode 进行判断,无法精确对 tag 原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始 tag 字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

(2) SQL92 的过滤方式:这种方式的大致做法和上面的 Tag 过滤方式一样,只是在 Store 层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由 rocketmq-filter 模块负责的。每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 使用了 BloomFilter 避免了每次都去执行。SQL92 的表达式上下文为消息的属性。

4 负载均衡

RocketMQ 中的负载均衡都在 Client 端完成,具体来说的话,主要可以分为 Producer 端发送消息时候的负载均衡和 Consumer 端订阅消息的负载均衡。

4.1 Producer 的负载均衡

Producer 端在发送消息的时候,会先根据 topic 找到指定的 topicPublishInfo,在获取了 topicPublishInfo 路由信息后,RocketMQ 的客户端在默认方式下 selectOneMessageQueue()方法会从 topicPublishInfo 中的 messageQueueList 中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在 MQFaultStrategy 这个类中定义。这里有一个 sendLatencyFaultEnable 开关变量,如果开启,在随机递增取模的基础上,再过滤掉 not available 的 Broker 代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的 latency 超过 550L ms,就退避 30000L ms;超过 1000L,就退避 60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance 机制是实现消息发送高可用的核心关键所在。

4.2 Consumer 的负载均衡

在 RocketMQ 中,Consumer 端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在 Push 模式只是对 pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要 Consumer 端知道从 Broker 端的哪一个消息队列中去获取消息。因此,有必要在 Consumer 端来做负载均衡,即 Broker 端中多个 MessageQueue 分配给同一个 ConsumerGroup 中的哪些 Consumer 消费。

1、Consumer 端的心跳包发送

在 Consumer 启动后,它就会通过定时任务不断地向 RocketMQ 集群中的所有 Broker 实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端 id 的值等信息)。Broker 端在收到 Consumer 的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable 中,为之后做 Consumer 端的负载均衡提供可以依据的元数据信息。

2、Consumer 端实现负载均衡的核心类—RebalanceImpl

在 Consumer 实例的启动流程中的启动 MQClientInstance 实例部分,会完成负载均衡服务线程—RebalanceService 的启动(每隔 20s 执行一次)。通过查看源码可以发现,RebalanceService 线程的 run()方法最终调用的是 RebalanceImpl 类的 rebalanceBytopic()方法,该方法是实现 Consumer 端负载均衡的核心。这里,rebalanceBytopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:

(1) 从 rebalanceImpl 实例的本地缓存变量—topicSubscribeInfoTable 中,获取该 topic 主题下的消息消费队列集合(mqSet);

(2) 根据 topic 和 consumerGroup 为参数调用 mQClientFactory.findConsumerIdList()方法向 Broker 端发送获取该消费组下消费者 Id 列表的 RPC 通信请求(Broker 端基于前面 Consumer 端上报的心跳包数据而构建的 consumerTable 做出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);

(3) 先对 topic 下的消息消费队列、消费者 Id 排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。这里的平均分配算法,类似于分页的算法,将所有 MessageQueue 排好序类似于记录,将所有消费端 Consumer 排好序类似页数,并求出每一页需要包含的平均 size 和每个页面记录的范围 range,最后遍历整个 range 而计算出当前 Consumer 端应该分配到的记录(这里即为:MessageQueue)。

img

(4) 然后,调用 updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与 processQueueTable 做一个过滤比对。

img

  • 上图中 processQueueTable 标注的红色部分,表示与分配到的消息队列集合 mqSet 互不包含。将这些队列设置 Dropped 属性为 true,然后查看这些队列是否可以移除出 processQueueTable 缓存变量,这里具体执行 removeUnnecessaryMessageQueue()方法,即每隔 1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回 true。如果等待 1s 后,仍然拿不到当前消费处理队列的锁则返回 false。如果返回 true,则从 processQueueTable 缓存变量中移除对应的 Entry;

  • 上图中 processQueueTable 的绿色部分,表示与分配到的消息队列集合 mqSet 的交集。判断该 ProcessQueue 是否已经过期了,在 Pull 模式的不用管,如果是 Push 模式的,设置 Dropped 属性为 true,并且调用 removeUnnecessaryMessageQueue()方法,像上面一样尝试移除 Entry;

最后,为过滤后的消息队列集合(mqSet)中的每个 MessageQueue 创建一个 ProcessQueue 对象并存入 RebalanceImpl 的 processQueueTable 队列中(其中调用 RebalanceImpl 实例的 computePullFromWhere(MessageQueue mq)方法获取该 MessageQueue 对象的下一个进度消费值 offset,随后填充至接下来要创建的 pullRequest 对象属性中),并创建拉取请求对象—pullRequest 添加到拉取列表—pullRequestList 中,最后执行 dispatchPullRequest()方法,将 Pull 消息的请求对象 PullRequest 依次放入 PullMessageService 服务线程的阻塞队列 pullRequestQueue 中,待该服务线程取出后向 Broker 端发起 Pull 消息的请求。

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

5 事务消息

Apache RocketMQ 在 4.3.0 版中已经支持分布式事务消息,这里 RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

img

5.1 RocketMQ 事务消息流程概要

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送消息(half 消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时 half 消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)

2.补偿流程:

(1) 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”

(2) Producer 收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新 Commit 或者 Rollback

其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

5.2 RocketMQ 事务消息设计

1.事务消息在一阶段对用户不可见

在 RocketMQ 事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ 事务消息的做法是:如果消息是 half 消息,将备份原消息的主题与消息消费队列,然后改变主题为 RMQSYS_TRANS_HALFtopic。由于消费组未订阅该主题,故消费端无法消费 half 类型的消息,然后 RocketMQ 会开启一个定时任务,从 topic 为 RMQSYS_TRANS_HALFtopic 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

在 RocketMQ 中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer 通过 ConsumeQueue 这个二级索引来读取消息实体内容,其流程如下:

img

RocketMQ 的具体实现策略是:写入的如果事务消息,对消息的 topic 和 Queue 等属性进行替换,同时将原来的 topic 和 Queue 信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是 RocketMQ 的常用“套路”,回想一下延时消息的实现机制。

2.Commit 和 Rollback 操作以及 Op 消息的引入

在完成一阶段写入一条对用户不可见的消息后,二阶段如果是 Commit 操作,则需要让消息对用户可见;如果是 Rollback 则需要撤销一阶段的消息。先说 Rollback 的情况。对于 Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上 RocketMQ 也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending 状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ 事务消息方案中引入了 Op 消息的概念,用 Op 消息标识事务消息已经确定的状态(Commit 或者 Rollback)。如果一条事务消息没有对应的 Op 消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入 Op 消息后,事务消息无论是 Commit 或者 Rollback 都会记录一个 Op 操作。Commit 相对于 Rollback 只是在写入 Op 消息前创建 Half 消息的索引。

3.Op 消息的存储和对应关系

RocketMQ 将 Op 消息写入到全局一个特定的 topic 中通过源码中的方法—TransactionalMessageUtil.buildOptopic();这个 topic 是一个内部的 topic(像 Half 消息的 topic 一样),不会被用户消费。Op 消息的内容为对应的 Half 消息的存储的 Offset,这样通过 Op 消息能索引到 Half 消息进行后续的回查操作。

img

4.Half 消息的索引构建

在执行二阶段 Commit 操作时,需要构建出 Half 消息的索引。一阶段的 Half 消息由于是写到一个特殊的 topic,所以二阶段构建索引时需要读取出 Half 消息,并将 topic 和 Queue 替换成真正的目标的 topic 和 Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以 RocketMQ 事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。

5.如何处理二阶段失败的消息?

如果在 RocketMQ 事务消息的二阶段过程中失败了,例如在做 Commit 操作时,出现网络问题导致 Commit 失败,那么需要通过一定的策略使这条消息最终被 Commit。RocketMQ 采用了一种补偿机制,称为“回查”。Broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同一个 Group 的 Producer),由 Producer 根据消息来检查本地事务的状态,进而执行 Commit 或者 Rollback。Broker 端通过对比 Half 消息和 Op 消息进行事务消息的回查并且推进 CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq 并不会无休止的的信息事务状态回查,默认回查 15 次,如果 15 次回查还是无法得知事务状态,rocketmq 默认回滚该消息。

6 消息查询

RocketMQ 支持按照下面两种维度(“按照 Message Id 查询消息”、“按照 Message Key 查询消息”)进行消息查询。

6.1 按照 MessageId 查询消息

RocketMQ 中的 MessageId 的长度总共有 16 字节,其中包含了消息存储主机地址(IP 地址和端口),消息 Commit Log offset。“按照 MessageId 查询消息”在 RocketMQ 中具体做法是:Client 端从 MessageId 中解析出 Broker 的地址(IP 地址和端口)和 Commit Log 的偏移地址后封装成一个 RPC 请求后通过 Remoting 通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。Broker 端走的是 QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

6.2 按照 Message Key 查询消息

“按照 Message Key 查询消息”,主要是基于 RocketMQ 的 IndexFile 索引文件来实现的。RocketMQ 的索引文件逻辑结构,类似 JDK 中 HashMap 的实现。索引文件的具体结构如下:

img

IndexFile 索引文件为用户提供通过“按照 Message Key 查询消息”的消息索引查询服务,IndexFile 文件的存储位置是:$HOME\store\index$fileName,文件名 fileName 是以创建时的时间戳命名的,文件大小是固定的,等于 40+500W*4+2000W*20= 420000040 个字节大小。如果消息的 properties 中设置了 UNIQ_KEY 这个属性,就用 topic + “#” + UNIQ_KEY 的 value 作为 key 来做写入操作。如果消息设置了 KEYS 属性(多个 KEY 以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了 Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共 20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash 冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp 记录的是消息 storeTimestamp 之间的差,并不是一个绝对的时间。整个 Index File 的结构如图,40 Byte 的 Header 用于保存一些总的统计信息,4*500W 的 Slot Table 并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W 个索引。

“按照 Message Key 查询消息”的方式,RocketMQ 的具体做法是,主要通过 Broker 端的 QueryMessageProcessor 业务处理器来查询,读取消息的过程就是用 topic 和 key 找到 IndexFile 索引文件中的一条记录,根据其中的 commitLog offset 从 CommitLog 文件中读取消息的实体内容。

领域模型

通信方式

  1. 同步 RPC 远程调用

image-20221024151445924

同步 RPC 调用模型下,不同系统之间直接进行调用通信,每个请求直接从调用方发送到被调用方,然后要求被调用方立即返回响应结果给调用方,以确定本次调用结果是否成功。

注意 此处的同步并不代表 RPC 的编程接口方式,RPC 也可以有异步非阻塞调用的编程方式,但本质上仍然是需要在指定时间内得到目标端的直接响应。

  1. 异步中间件代理

image-20221024151543339

异步消息通信模式下,各子系统之间无需强耦合直接连接,调用方只需要将请求转化成异步事件(消息)发送给中间代理,发送成功即可认为该异步链路调用完成,剩下的工作中间代理会负责将事件可靠通知到下游的调用系统,确保任务执行完成。该中间代理一般就是消息中间件。

异步通信的优势:

  • 系统拓扑简单由于调用方和被调用方统一和中间代理通信,系统是星型结构,易于维护和管理。

  • 上下游耦合性弱上下游系统之间弱耦合,结构更灵活,由中间代理负责缓冲和异步恢复。 上下游系统间可以独立升级和变更,不会互相影响。

  • 容量削峰填谷基于消息的中间代理往往具备很强的流量缓冲和整形能力,业务流量高峰到来时不会击垮下游。

消息传输模型

  1. 点对点

image-20221024152223813

点对点模型也叫队列模型,具有如下特点:

  • 消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份。
  • 一对一通信:基于消费匿名特点,下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息,每一条消息都只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。
  1. 发布-订阅

image-20221024152339004

发布订阅模型特点:

  • 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
  • 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。

传输模型对比

点对点模型和发布订阅模型各有优势,点对点模型更为简单,而发布订阅模型的扩展性更高。 RocketMQ 使用的传输模型为发布订阅模型,因此也具有发布订阅模型的特点。

消息发送重试和流控机制

重试流程

生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
  • 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。

重试间隔

  • 除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
  • 若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
    • INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1 秒。
    • MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6。
    • JITTER :随机抖动因子,默认值:0.2。
    • MAX_BACKOFF :等待间隔时间上限,默认值:120 秒
    • MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20 秒。

消息存储和清理机制

触发条件

Apache RocketMQ 的消息流控触发条件如下:

  • 存储压力大:消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
  • 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。

流控行为

当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:

  • reply-code:530
  • reply-text:TOO_MANY_REQUESTS

客户端收到系统流控错误码后,会根据指数退避策略进行消息发送重试。

处理建议

  • 如何避免触发消息流控:触发限流的根本原因是系统容量或水位过高,您可以利用可观测性功能监控系统水位容量等,保证底层资源充足,避免触发流控机制。
  • 突发消息流控处理:如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议业务方将请求调用临时替换到其他系统进行应急处理。

RocketMQ 消息类型

共三种消息类型

  • 普通消息
  • 顺序消息
  • 事务消息

参考文章https://blog.csdn.net/qq_31155349/article/details/108761005

基于 RocketMQ5.x 版本

主题

单一主题只收发一种类型的消息

​ RocketMq 设计的原则是通过 topic 隔离业务,不同的业务逻辑应使用不同的 topic

队列

主要作用:

  • ​ FIFO,消息在队列中的位置通过位点(Offset)来标记管理
  • ​ RocketMQ 可以从任意的位点读取任意数量的消息。RabbitMQ、ActiveMQ 没有

RocketMQ 默认提供消息的可靠存储,所有发送成功的消息都会被持久化到队列中。

RocketMQ 队列模型和 Kafka 的分区(Partition)模型类似。在消息队列 RocketMQ 版消息收发模型中,队列属于主题的一部分,虽然所有的消息资源以主题粒度管理,但实际的操作实现是面向队列。例如,生产者指定某个主题,向主题内发送消息,但实际消息发送到该主题下的某个队列中。

注意:队列的数量应当遵循少用够用的原则,避免队列过多导致其他问题。比如会出现集群的元数据膨胀,或者客户端压力过大的问题

消息

  • 单挑消息不应该传输超大负载

    单个原子消息事件的数据大小需要严格控制,如果单条消息过大容易造成网络传输层压力,不利于异常重试和流量控制。

    生产环境中如果需要传输超大负载,建议按照固定大小做报文拆分,或者结合文件存储等方法进行传输。

  • 消息中转时做好不可变设计

    5.x 版本消息都是只读的视图,所以不用关心

    5.x 之前的消息不可变性没有约束,如果需要中专,需要重新初始化消息

    Message m = Consumer.receive();
    Message m2= MessageBuilder.buildFrom(m);
    Producer.send(m2);

生产者

  • 同一个线程中尽量不要有太多的生产者

  • 不要频繁的创建和销毁生产者,类似数据库的连接频繁的创建销毁会在服务端产生大量短连接请求,严重影响系统性能。

    Producer p = ProducerBuilder.build();
    for (int i =0;i<n;i++)
    {
    Message m= MessageBuilder.build();
    p.send(m);
    }
    p.shutdown();

消费者

消费者的类型分为PushConsumer类型、SimpleConsumer类型

要求同一分组下的所有消费者的行为保持一致,包括投递顺序、*消息重试策略

5.x版本,客户端无需关心消费行为,因为消费者的行为是从关联的消费者分组统一获取的

同样的也是不建议同一线程有大量的消费者,不建议频繁的创建个销毁消费者

订阅关系

通过订阅关系,可以控制消息过滤规则和消费状态

  • 消息过滤规则:控制消费者选择主题内的符合条件的消息进行消费
  • 消费状态:RocketMQ 默认提供订阅关系持久化的能力,消费者分组在服务端注册订阅关系后,当消费者离线再次上线后,可以获取里先前的消费进度,并继续消费

过滤的类型:TAGSQL92

TAG

Tag 标签设置

  • Tag 由生产者发送消息时设置,每条消息允许设置一个 Tag 标签。
  • Tag 使用可见字符,建议长度不超过 128 字符。

Tag 标签过滤规则

Tag 标签过滤为精准字符串匹配,过滤规则设置格式如下:

  • 单 Tag 匹配:过滤表达式为目标 Tag。表示只有消息标签为指定目标 Tag 的消息符合匹配条件,会被发送给消费者。
  • 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为 Tag1 或 Tag2 或 Tag3 的消息都满足匹配条件,都会被发送给消费者进行消费。
  • 全部匹配:使用星号(*)作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。

示例

  1. 生产者发送消息,设置 TAG
Message message = messageBuilder.set`topic`("`topic`")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
//该示例表示消息的Tag设置为“TagA”。
.setTag("TagA")
//消息体。
.setBody("messageBody".getBytes())
.build();

  1. 消费者消费消息 匹配单个标签
String `topic` = "Your `topic`";
//只订阅消息标签为“TagA”的消息。
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe(`topic`, filterExpression);
  1. 订阅消息,匹配多个 Tag 标签。
String `topic` = "Your `topic`";
//只订阅消息标签为“TagA”、“TagB”或“TagC”的消息。
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe(`topic`, filterExpression);
  1. 订阅消息,匹配 topic 中的所有消息,不进行过滤。
String `topic` = "Your `topic`";
//使用Tag标签过滤消息,订阅所有消息。
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe(`topic`, filterExpression);

SQL92 过滤

image

由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:

  • 异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
  • 空值情况处理:如果过滤条件的表达式计算值为 null 或不是布尔类型(true 和 false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为 null。
  • 数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。

示例

  1. 生产者发送消息
Message message = messageBuilder.set`topic`("`topic`")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
//该示例表示消息的Tag设置为“messageTag”。
.setTag("messageTag")
//消息也可以设置自定义的分类属性,例如环境标签、地域、逻辑分支。
//该示例表示为消息自定义一个属性,该属性为地域,属性值为杭州。
.addProperty("Region", "Hangzhou")
//消息体。
.setBody("messageBody".getBytes())
.build();
  1. 消费者消费根据单个自定义属性匹配消息
String `topic` = "`topic`";
//只订阅地域属性为杭州的消息。
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND Region='Hangzhou'", FilterExpressionType.SQL92);
simpleConsumer.subscribe(`topic`, filterExpression);
  1. 订阅消息,同时根据多个自定义属性匹配消息
String `topic` = "`topic`";
//只订阅地域属性为杭州且价格属性大于30的消息。
FilterExpression filterExpression = new FilterExpression("Region IS NOT NULL AND price IS NOT NULL AND Region = 'Hangzhou' AND price > 30", FilterExpressionType.SQL92);
simpleConsumer.subscribe(`topic`, filterExpression);
  1. 订阅消息,匹配 topic 中的所有消息,不进行过滤。
String `topic` = "`topic`";
//订阅所有消息。
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
simpleConsumer.subscribe(`topic`, filterExpression);

普通消息

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

//普通消息发送。
MessageBuilder messageBuilder = new MessageBuilder();
Message message = messageBuilder.set`topic`("`topic`")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}

//消费示例一:使用PushConsumer消费普通消息,只需要在消费监听器中处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};

//消费示例二:使用SimpleConsumer消费普通消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

应当为消息自定义索引键,方便问题的追踪和排查

顺序消息

发送和接收的顺序保持一致

如何保证数据的顺序性

生产的顺序性必须是单一的生产者进行串行发送

image-20221112170909507

  • 相同消息组的消息按照先后顺序被存储在同一个队列。
  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

消息组 1 和消息组 4 的消息混合存储在队列 1 中,消息队列 RocketMQ 版保证消息组 1 中的消息 G1-M1、G1-M2、G1-M3 是按发送顺序存储,且消息组 4 的消息 G4-M1、G4-M2 也是按顺序存储,但消息组 1 和消息组 4 中的消息不涉及顺序关系。

消费的顺序性:必须要保证投递顺序有限重试

​ 投递顺序:生产者顺序投递后消费者应当按照一接收一应答的语义处理消息,消费者类型为 PushConsumer 时,消息队列 RocketMQ 版保证消息按照存储顺序一条一条投递给消费者,若消费者类型为 SimpleConsumer,则消费者有可能一次拉取多 条消息。此时,消息消费的顺序性需要由业务方自行保证

​ 有限重试:设置最大重试次数,如果达到最大重试次数还没有消费成功,那么将跳过这条消息去消费下一条消息

  • 消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束。
  • 顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。

顺序消息仅支持使用MessageTypeFIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

示例

和普通消息发送相比,顺序消息发送必须要设置消息组。

        //顺序消息发送。
MessageBuilder messageBuilder = null;
Message message = messageBuilder.set`topic`("`topic`")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
.setMessageGroup("fifoGroup001")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。
//消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

定时、延时消息

一般可以应用在分布式任务调度,订单超时取消等场景

定时消息仅支持在MessageTypeDelay的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

示例

        //定时/延时消息发送
MessageBuilder messageBuilder = null;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.set`topic`("`topic`")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}

//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};

//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

不要在相同的定时时刻内发送大量消息,可能会损失定时精度

事务消息

在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

其他

1、如何处理二阶段失败的消息--回查

如果在 RocketMQ 事务消息的二阶段过程中失败了,例如在做 Commit 操作时,出现网络问题导致 Commit 失败,那么需要通过一定的策略使这条消息最终被 Commit。RocketMQ 采用了一种补偿机制,称为“回查”。Broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同一个 Group 的 Producer),由 Producer 根据消息来检查本地事务的状态,进而执行 Commit 或者 Rollback。Broker 端通过对比 Half 消息和 Op 消息进行事务消息的回查并且推进 CheckPoint(记录那些事务消息的状态是确定的)。

值得注意的是,rocketmq 并不会无休止的的信息事务状态回查,默认回查 15 次,如果 15 次回查还是无法得知事务状态,rocketmq 默认回滚该消息。

2、rabbitMq、rocketmq、kafaka 对比

Kafka Kafka 是 LinkedIn 开源的分布式发布-订阅消息系统,目前归属于 Apache 顶级项目。Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8 版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

RabbitMQ RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP 协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RocketMQ RocketMQ 是阿里开源的消息中间件,它是纯 Java 开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ 思路起源于 Kafka,但并不是 Kafka 的一个 Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog 分发等场景。

3、Rocketmq 和 Kafka 区别

(1) 适用场景

Kafka 适合日志处理;

RocketMQ 适合业务处理。

**结论:**平手,根据具体业务定夺。

(2) 性能

Kafka 单机写入 TPS 号称在百万条/秒;

RocketMQ 大约在 10 万条/秒。

**结论:**追求性能的话,Kafka 单机性能更高。

(3) 可靠性

RocketMQ 支持异步/同步刷盘;异步/同步 Replication;

Kafka 使用异步刷盘方式,异步 Replication。

**结论:**RocketMQ 所支持的同步方式提升了数据的可靠性。

(4) 实时性

均支持 pull 长轮询,RocketMQ 消息实时性更好

**结论:**RocketMQ 胜出。

(5) 支持的队列数

Kafka 单机超过 64 个队列/分区,消息发送性能降低严重;

RocketMQ 单机支持最高 5 万个队列,性能稳定

**结论:**长远来看,RocketMQ 胜出,这也是适合业务处理的原因之一

(6) 消息顺序性

Kafka 某些配置下,支持消息顺序,但是一台 Broker 宕机后,就会产生消息乱序;

RocketMQ 支持严格的消息顺序,在顺序消息场景下,一台 Broker 宕机后,

发送消息会失败,但是不会乱序;

**结论:**RocketMQ 胜出

(7)消费失败重试机制

Kafka 消费失败支持重试,KafkaProducer 通过设定参数retries

RocketMQ 消费失败支持定时重试,每次重试间隔时间顺延。

(8)定时/延时消息

Kafka 不支持定时消息;可以使用 DelayQueue 实现。

RocketMQ 支持定时消息

(9)分布式事务消息

Kafka 不支持分布式事务消息;

阿里云 ONS 支持分布式定时消息,未来开源版本的 RocketMQ 也有计划支持分布式事务消息

(10)消息查询机制

Kafka 不支持消息查询

RocketMQ 支持根据 Message Id 查询消息,也支持根据消息内容查询消息

(11)消息回溯

Kafka 理论上可以按照 Offset 来回溯消息

RocketMQ 支持按照时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息

4.其他

另外认为 kafka 是用于日志传输,所以不适合系统的业务事件是个更大的误区,Kafka 本身在最早实现时的确是为了传输日志,但后来经过多年发展,其适用范围早不限于日志,并且很多采取 Kafka 的公司并非用它来处理日志,kafka 背后的 Confluence 公司提供了很多基于 kafka 来简化系统实现的例子。

4、幂等性问题

幂等性 幂等

参考文档

快速开始 | RocketMQ (apache.org)

docs/cn · Apache/RocketMQ - 码云 - 开源中国 (gitee.com)

RocketMQ5.0 四大核心特性。 - 教程文章 - 时代 Java,与您同行! (nowjava.com)