一丶前言


《Rocketmq学习3——消息发送原理源码浅析》
中,我们学习了消息发送的要点:

  • 本地缓存+rpc 请求namesever + 定时刷新,topic路由信息
  • 负载均衡的选择一个Broker进行发送,还支持【故障转移(即支持规避短时间内发送失败的broker)】
  • 基于netty实现的rpc进行消息发送

这一篇我们将学习,消息是如何持久化在broker上的

二丶概述

消息存储的流程如下:

  1. 发送消息: 生产者(Producer)发送消息到 Broker。
  2. 消息存储:Broker 接收到消息后,将消息存储在消息存储文件中,通常是 CommitLog 文件。 RocketMQ 使用了内存映射文件(MappedByteBuffer)来提高文件的读写速度,它可以将文件直接映射到虚拟内存,减少了文件 I/O 操作。
  3. 写入磁盘:RocketMQ 使用了顺序写的方式将消息写入到 CommitLog,这是因为顺序写磁盘的速度远快于随机写。
  4. 索引文件更新:为了提高查询效率,消息会被索引,索引信息存储在 ConsumerQueue 和 IndexFile 中。ConsumerQueue 存储了消息在 CommitLog 中的偏移量,而 IndexFile 存储了关键字到消息偏移量的映射。这一步和broker处理消息发送请求是异步的,由后台线程定时处理。
  5. 数据刷盘:RocketMQ 提供两种消息刷盘方式:
    1. 同步刷盘和异步刷盘。同步刷盘会在消息确实写入磁盘后再向生产者确认消息发送成功,
    2. 异步刷盘则在写入操作系统 PageCache 后就确认,依靠操作系统异步将数据刷写到磁盘。
  6. HA 机制:为了保证数据的高可用性,RocketMQ 还提供了主从同步机制,从服务器可以从主服务器上复制数据,确保在主服务器宕机时,从服务器可以接管消息服务。

三丶broker是接收消息发送请求

broker在启动的时候,会启动
BrokerController
,BrokerController会触发
remotingServer
的启动。remotingServer基于netty实现,其中关联了RequestCode(rocketmq协议中使用一个int表明请求类型)和对应的请求处理的processor。

image-20240127194137820

其中
SEND_MESSAGE
对应的processor——SendMessageProcessor。

在broker启动时,会触发基于netty的服务端启动,其中注册的
NettyServerHandler
实现了ChannelInboundHandler,在数据客户端数据到达的时候会先经由解码器
ByteToMessageDecoder(rocketmq根据自己的协议实现了解码器——NettyDecoder)
,解码后将调用到如下的NettyServerHandler!

image-20240127194340690

其中会根据请求类型,获取到对应的Processor,消息发送一般最后由SendMessageProcessor处理

四丶rocketmq基于netty实现的远程服务处理请求的流程

image-20240127195406001

SendMessageProcessor接收到请求的时候,不是立马在当前线程进行处理,而是将封装成一个任务,提交到业务线程池。

在提交之前,还是会进行当前broker是否关闭中,是否拒绝请求的判断。

如下是处理请求的大致流程

image-20240127195943463

可看到绿色部分才是真正处理请求的部分,处理后将响应写到netty的channel中,实习响应!

五丶SendMessageProcessor 处理请求大致流程

image-20240127200425984

rocketmq留了一堆扩展的钩子,最终在sendMessage方法中进行一系列的校验,包装消息为MessageExtBrokerInner,然后进行消息存储流程,源码如下

image-20240127200917150

消息存储最后交给MessageStore,调用
asyncPutMessage
进行异步存储消息,也就是说业务处理线程并没有一直阻塞到消息存储完毕,而是提交后就释放了

看到这里你可能会疑问,那么同步消息发送者岂不是收不到响应,同步消息消费者还会block住么?

还是会的,因为

image-20240127201240829

只有在MessageStore异步存储完消息后,才会回调doResponse写回响应!

这样做的目的在于将业务处理Executor,和消息存储Executor进行解耦

六丶消息持久化

image-20240127201715189

可看到最终使用CommitLog进行消息存储

1.消息持久化前置流程

image-20240127203240696

如上主要是进行一些校验,其中有两层锁

  • topic + queue锁


    image-20240127203404747

    topicQueueKey
    由topic和queueId构成,因为一个broker上可有多个topic,一个topic可具备多个messgeQuque,这里使用hash实现锁粒度的细化,那么queueId是在哪里生成的?

    image-20240127203718470

    如上是在SendMessageProcessor中,如果指定了queueId那么使用指定的queueId,反之随机产生一个。

  • 文件锁


    image-20240127204401170

    rocketmq具备两个实现:


    • 一个基于AQS ReentrantLock
    • 一个基于cas自旋

    image-20240127210000504

    高并发情况浪费大量cpu,低并发情况下减少内核态用户态切换

2.消息持久化

image-20240127204034033

2.1 MappedFile文件创建

image-20240127211511727

这里会构建出两个文件路径,这意味着会一次性创建两个文件,可看到文件名称是偏移量的大小——比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推

下面我们看看文件创建的源码:

image-20240127212505934

文件创建并不是由当前线程进行的,而是将请求提交到
requestTable
中,然后等待指定时间。

然后再背后存在一个线程,不断从队列中拿任务进行处理

image-20240127213432976

可以看到MappedFile支持SPI机制,但是这里的代码让人作呕

如果开启的了堆外内存缓冲,那么会使用:
new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool())
创建DefaultMappedFile

否则使用
new DefaultMappedFile(req.getFilePath(), req.getFileSize())
创建DefaultMappedFile。

  1. 不适应堆外内存缓冲

    image-20240127214936029

    使用fileChannel.map创建mappedByteBuffer

  2. 使用堆外写缓冲

    image-20240127214656833

    会从TransientStorePool中获取一个ByteBuffer


    image-20240127215656728

    这里堆外缓冲是TransientStorePool初始化时申请的


    image-20240127215810637

2.2 文件预热

至此完成了文件的创建,rocktmq还会进行文件的预热:

image-20240127220925880

预热的过程其实就是每隔4K写入0值,这样做的好处是:

提高文件的访问效率,尤其是在使用内存映射(Memory Mapped File,MMF)技术时。内存映射文件技术能将文件直接映射到操作系统的虚拟内存中,进而可以像访问内存一样访问这些文件,这样可以显著提高文件I/O的效率。

预热(mappedFile)的过程,主要是提前将文件内容加载到物理内存中,确保在实际使用这些文件时,能够避免或减少磁盘I/O带来的延迟。因为当进程首次访问内存映射文件中的某个部分时,如果这部分数据还没有加载到物理内存中,操作系统需要从磁盘中读取数据到物理内存,这个过程称为缺页中断(page fault)。缺页中断会导致一定的延迟。

进行预热主要是通过以下几种方式:

  1. 触摸内存
    : 遍历映射文件的每一页并写入少量数据(例如0),这样可以确保操作系统将这些页加载到物理内存中。
  2. mlock
    : 在某些系统中,可以使用
    mlock
    或类似的调用来锁定内存的特定区域,确保这些区域常驻内存,不会被操作系统交换到磁盘上(swap out)。

2.3 消息写入

image-20240127221738628

写入的时候会获取ByteBuffer,如下:如果具备写堆外内存缓冲,那么使用堆外内存,反之使用mmap生成的byteBuffer

image-20240127221918476

最终就是将消息按照消息格式put到ByteBuffer中

2.4 消息刷盘

当消息写入到ByteBuffer后,会进行持久化 和高可用同步副本

image-20240127222606577

这里我们看下刷盘的源码

image-20240127222927230

可以看到根据是否由堆外写缓冲和刷盘方式,会使用不同的service进行wakeup实现刷盘:

  • 堆外写缓冲(WriteBuffer)(只有异步刷盘模式才可以开启)

    在RocketMQ中,
    MappedFile
    类代表一个内存映射文件,可以在构造时选择是否启用“堆外写缓冲”(transientStorePoolEnable)。如果启用,RocketMQ会创建一个堆外内存池
    TransientStorePool
    ,用于临时存储即将写入文件的数据。

    写入过程分为两步:


    1. 写入堆外内存
      :生产者发送的消息首先被写入到堆外内存池中的一个缓冲区,这个缓冲区对应一个
      ByteBuffer
    2. 提交到
      MappedFile

      :随后,数据会从堆外内存缓冲区“提交”(commit)到
      MappedByteBuffer
      。在RocketMQ中,commit操作实际上是将堆外内存中的数据复制到内存映射文件的
      MappedByteBuffer
      中。

RocketMQ通过这种方式实现了一种内存双写的机制:先写入堆外内存,然后再提交到内存映射文件中。这样做可以利用堆外内存池做一层缓冲,提高写入效率,同时减少JVM垃圾回收的压力。

  • 刷盘方式(Flush)
    1. 同步刷盘(SYNC_FLUSH):每次消息写入
      MappedByteBuffer
      之后,同步调用
      MappedByteBuffer.force()
      方法,将数据强制刷写到磁盘。同步刷盘提供了较高的数据安全性,但会牺牲一些性能。
    2. 异步刷盘
      (ASYNC_FLUSH):消息写入
      MappedByteBuffer
      之后,并不立即刷写到磁盘,而是由后台线程(如
      FlushRealTimeService
      )定期调用
      MappedByteBuffer.force()
      方法进行刷盘。异步刷盘牺牲了部分数据安全性,但提高了性能。

在RocketMQ中,刷盘策略可以根据数据的重要性和对性能的要求来选择。如果数据安 全性要求极高,可以选择同步刷盘;如果追求高吞吐量,可以选择异步刷盘。

2.4.1 同步刷盘

同步刷盘,rocketmq的消息可以设置是否等待消息存储完成,如下

image-20240127223124880

  1. 如果设置了等待刷盘成功,那么会向GroupCommitService中提交刷盘请求,然后返回对应future
  2. 如果没有,那么唤醒刷盘线程,然后返回

GroupCommitService
是为同步刷盘模式设计的,它允许在将消息持久化到磁盘之前暂停生产者的发送操作。这是为了确保在任何时候发生故障时消息不会丢失。

当一个生产者请求将消息同步刷盘到磁盘时,它会创建一个
GroupCommitRequest
。这个请求包含了刷盘所需的信息,如期望刷盘的偏移量。然后,生产者线程将这个请求提交给
GroupCommitService
并等待。

GroupCommitService
内部维护了一个请求队列。这个队列是线程安全的,生产者通过
putRequest
方法将请求添加到队列。添加请求后,生产者线程调用
CountDownLatch.await()
方法等待。

GroupCommitService
的主循环会检查队列中是否有请求。如果有,它会将这些请求从队列中移除并进行处理。处理包括将
CommitLog
中的相关数据刷盘到磁盘。一旦完成,它将调用每个
GroupCommitRequest

wakeupCustomer
方法,该方法将减少
CountDownLatch
的计数,从而允许等待的生产者线程继续执行。

如下是GroupCommitService处理刷盘,和异步刷盘的不同在于其会设置刷盘future状态,从而让等待刷盘的线程被唤醒

image-20240127230314918

2.4.3 异步刷盘

image-20240127230504184

异步刷盘针对是否开启了堆外写缓冲会调用不同的Service

  1. 开启了堆外写缓冲:使用CommitRealTimeService


    GroupCommitService
    不同,
    CommitRealTimeService
    是为异步刷盘模式设计的,它不会在每次消息追加到
    CommitLog
    后暂停生产者线程。相反,它根据预设的时间间隔或消息积累量定期刷盘。
    image-20240127232525422

    image-20240127231221246

    如下是刷盘的源码:

    image-20240127231542363

  2. 没开启堆外写缓冲:使用FlushRealTimeService,其会调用flush直接进行刷新

七丶高可用

让我们回到CommitLog#asyncPutMessage方法,可以看到下面有一个高可用的处理(needHandleHA)

image-20240127232746537

那什么是否需要刷新到其他副本昵?

image-20240127233145012

Message必须setWaitStoreMsgOK(true),且消息存储表明需要副本,并且角色是SYNC_MASTER

那么高可用如何实现的?

image-20240127233747416

下面是RocketMq高可用机制:

RocketMQ 通过其 HAService(高可用性服务)实现了主从同步复制,确保了消息的高可用性。它的工作原理是在主Broker(Master)上的CommitLog更新之后,这些更新会被复制到一个或多个从Broker(Slave)上。这样,即使主Broker发生故障,从Broker也可以接管工作,保证消息服务的可用性。

HAService 主要包括两个组件:Master端的
HAService
和 Slave端的
HAConnection

1.Master 端

HAService
主要负责管理与从Broker的连接,并将CommitLog的更新推送到所有连接的从Broker上。

  1. 连接建立:
    HAService
    在Master上监听一个特定的端口。从Broker通过这个端口与Master建立连接。

  2. 封装连接:
    HAService
    管理所有的从Broker连接。每当有新的从Broker连接到Master时,它都会创建一个新的
    HAConnection

  3. 数据同步: Master上的CommitLog更新后,
    HAService
    会将这些数据通过
    HAConnection
    发送到从Broker。数据的发送依赖于从Broker的拉取请求,即从Broker告诉Master它已经接收了哪些数据,并请求后续的数据。

  4. 资源清理:如果从Broker断开连接或出现错误,
    HAService
    会关闭对应的
    HAConnection
    并清理资源。

2. Slave 端

从Broker端的
HAConnection
主要负责与Master保持通信,获取数据更新,并将这些更新写入本地的CommitLog。

  1. 连接建立: 从Broker启动时,它会尝试与Master建立
    HAConnection

  2. 请求数据并同步: 从Broker会定期发送已确认的数据偏移量给Master,并请求新的数据。收到Master的数据后,从Broker会将数据写入自己的CommitLog。

  3. 反馈同步进度:从Broker在成功将数据写入CommitLog后,会更新已确认的数据偏移量,并准备发送回Master以获取更多数据。

通过这种方式,RocketMQ的HAService确保了消息数据在Master和Slave之间实时同步,即使在Master出现故障的情况下,也能保证服务的高可用性。

需要注意的是,这种主从同步机制虽然提供了高可用性,但它可能会对消息的发送性能产生一定影响,因为Master需要在将消息存储到本地CommitLog并且同步到从Broker之后才能向生产者发送确认响应。此外,如果从Broker落后于Master太多,也有可能影响整体的同步效率。

为了确保数据的强一致性,RocketMQ通常建议至少部署一个Master和一个Slave,并在Broker配置中设置
brokerRole

SYNC_MASTER
。这样,只有当数据成功复制到至少一个Slave时,Master才会对消息发送者确认成功。这确保了即使Master发生故障,消息也不会丢失,因为至少有一个Slave拥有完整的数据副本。

八丶总结

感觉rocketmq代码写的很垃圾,但是功能还是实现了的。其落盘+副本同步,再很多其他中间件中也是适用的

1.顺序写

CommitLog 的写入被视为磁盘的顺序写,主要是因为 RocketMQ 采用了顺序向 CommitLog 文件追加消息的方式进行数据记录。消息生产者产生的消息按照接受的顺序依次追加到 CommitLog 文件的尾部,而不是随机分散地写到文件的不同位置。

顺序写为什么比随机写快:
1.机械硬盘(HDD)的顺序写性能通常远高于随机写,因为顺序写不需要硬盘头移动去查找不同的写入位置,而是连续在同一轨道上写入数据,大大减少了寻址时间。即使在固态硬盘(SSD)中,顺序写也有一些优势,因为 SSD 的写入操作涉及擦除以前的数据块然后再写入新数据。顺序写可以减少数据移动和合并操作,提高了 SSD 的写入效率。
2.文件系统一般也对顺序写进行了优化,能够更好地利用缓存和预取策略,提高写入效率。
3.顺序写有助于减少 I/O 操作的开销,增加了操作的预测性,使操作系统和硬件能够对写入进行优化。

2.mmap内存映射文件

内存映射文件(Memory-Mapped File,简称 mmap)

  • 内存映射:mmap 通过将磁盘上的文件映射到虚拟内存的方式,使得应用程序可以像访问内存一样直接读写文件区域,避免了传统的文件I/O操作(read/write)中用户空间和内核空间之间上下文切换的开销。
  • 操作系统缓存:mmap 的数据可以被操作系统自动地缓存,提高了数据访问速度。操作系统会负责将修改过的内存数据同步回文件,减少了显式的读写操作

3.文件预热

预热(mappedFile)的过程,主要是提前将文件内容加载到物理内存中,确保在实际使用这些文件时,能够避免或减少磁盘I/O带来的延迟。因为当进程首次访问内存映射文件中的某个部分时,如果这部分数据还没有加载到物理内存中,操作系统需要从磁盘中读取数据到物理内存,这个过程称为缺页中断(page fault)。缺页中断会导致一定的延迟。

4.堆外内存写缓冲

rocketmq支持开启堆外写缓冲,优先写到DirectByteBuffer中,然后使用FileChannel#write刷新到pageCache,这样做好处是可以将多个消息先聚合到堆外byteBuffer然后一次性写入到page'Cache,减少系统调用。

5.HA机制

同步到副本,避免master宕机时消息丢失。

弊端是如果写master成功,同步副本失败,消息生产者maybe重试,导致消息重复,以及同步副本带来的延迟降低了系统的吞吐量。

标签: none

添加新评论