RocketMQ:深入理解Broker如何接收Producer生产消息请求?

wuchangjian2021-11-04 08:29:38编程学习

一、前言

今天原打算写一篇关于消息在Broker端如何存储的文档,刚梳理好整体文章脉络,想到一个问题:Broker端是如何接收生产消息请求的?印象中是通过Netty接收的,由于对具体的实现比较好奇,也就有了今天的故事。

二、源码分析

以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0
在这里插入图片描述

1、RemotingServer消息生产/消费服务启动

又要开始快乐的找入口环节了,我们先看看Broker作为服务端对外提供的服务RomotingServer。

RemotingServer是一个Netty服务端服务,它开始了10911端口用来处理Producer和Consumer的网络请求。

在前面我们介绍过RocketMQ中Broker的启动流程:RocketMQ:深度剖析Broker启动流程原理、源码。remotingServer就是在BrokerController#start()方法中启动的。
在这里插入图片描述
remotingServer是一个接口,点进去我们发现其有两个实现,分别是:NettyRemotingServerNettyRemotingClient
在这里插入图片描述
对于Producer和Consumer客户端而言,broker是作为服务端的存在,所以我们要看的是NettyRemotingServer#start()方法。
在这里插入图片描述
emmmm,这是一个标准的netty服务启动流程。对于netty相关的内容,我们这里仅需要关注pipeline上的channelHandler(处理读写请求的操作),在NettyRemotingServer中处理读写请求的ChannelHandler为serverHandler(NettyServerHandler);代码如下图所示:
在这里插入图片描述
NettyServerHandlerNettyRemotingServer的一个内部类;里面会调用当前外部类对象NettyRemotingServer父类NettyRemotingAbstractprocessMessageReceived()方法。
在这里插入图片描述
这里会判断RemotingCommand命令的类型是请求还是响应。对于消息生产而言,RemotingCommand的命令类型为请求,所以我们接着看一下processRequestCommand()方法。
在这里插入图片描述
首先根据命令的code从processorTable中获取相应的事件处理器和线程池,如果获取不到则采用默认的事件处理器(AdminBrokerProcessor);紧接着调用事件处理器处理请求,即核心处理请求的核心逻辑在processor#asyncProcessRequest()方法中。
在这里插入图片描述
点进去我们发现asyncProcessRequest#asyncProcessRequest是一个抽象类的方法;对应生产者发送消息的处理器是SendMessageProcessor
在这里插入图片描述
问题又来了?这个SendMessageProcessor是什么时候注册到processorTable中的?

2、Processor事件处理器注册

上面我们提到了一个问题:SendMessageProcessor是什么时候注册到processorTable中的?
processorTableNettyRemotingAbstract的成员变量,它里面的内容是在BrokerController初始化时注册的。BrokerController执行初始化的方法BrokerController#initialize()方法中或调用registerProcessor()方法,其会将一大堆的Processor注册到processorTable中,比如:pullMessageProcessor、SendMessageProcessor、ConsumeManageProcessor等。
在这里插入图片描述
在这里插入图片描述
然后我们发现并没有直接操作processorTable的地方,看这个remotingServer#registerProcessor()方法,注册处理器,感觉就是它了,我们再点进去看看:
在这里插入图片描述
其是个抽象方法,和上面一样,此时的Broker是作为服务端的存在,我们依旧看一下NettyRemotingServer是怎么做的。
在这里插入图片描述
在注册Processor时,会现获取相应的线程池,如果线程池为null,则使用公用的线程池;这里是一个线程池隔离的机制。最后把Processor放到processorTable,这个注册事件处理器的流程就完事了。

3、接收Producer生产消息请求

Producer发送消息到broker时,发送请求的code为SEND_MESSAGE。结合我们上面的分析:

  • 当消息过来时,NettyRemotingServer会使用NettyServerHandler这个ChannelHandler来处理请求;
  • 接着会调用到NettyRemotingAbstract#processRequestCommand()方法,根据请求命令的code获取对应的Processor来处理请求。
  • 从Processor的注册流程来看,处理SEND_MESSAGE 类型请求命令的Processor为SendMessageProcessor

在这里插入图片描述
没啥特别的,接着往下跟,进入到同名方法asyncProcessRequest()中。
在这里插入图片描述
由于我们请求的Code不是CONSUMER_SEND_MSG_BACK,所以会进入到Switch代码块的default中;接着我们进入到:asyncSendMessage()方法中,看一下生产单个消息是怎么做的?
在这里插入图片描述
这个方法是在准备要发送消息的数据,主要工作如下:

  1. 如果没有指定queue,就随机选一个queue;一般情况下也不会指定queue,除非顺序生产消息时。
  2. 构建MessageExtBrokerInner对象,给消息添加一些额外信息,比如:queuetopic事务消息标识、发送地址、发送时间等。
  3. 根据是否是事务消息,决定发送事务消息还是普通消息。

后面MessageStore登场,进行消息的持久化操作,这一部分内容,我们放在下一篇文章中探讨。

三、原理总结

Broker端接收Producer生产消息的流程如下:

  • 在Broker启动时,即BrokerController#start()方法中,初始化接收消息的Netty服务NettyRemotingServer。
  • NettyRemotingServerpipeline下的NettyServerHandler这个ChannelHandler负责处理消息生产请求;
  • 接着调用到NettyRemotingAbstractprocessMessageReceived()方法中根据请求的code调用对应的事件处理器处理请求。
  • 对于消息生产而言,最终会调用到SendMessageProcessor#asyncProcessRequest()方法处理消息生产请求。
  • 然后调用SendMessageProcessor#asyncSendMessage()方法封装消息、填充queue、topic信息,假如我们不指定queue进行消息生产,broker会随机选择一个queue 就是在这里做的;最后调用MessageStore#asyncPutMessage()方法持久化消息,最终将消息写到CommitLog中。

相关文章

算法修炼65、矩阵中的路径

算法修炼65、矩阵中的路径

题目描述:   请设计一个函数,用来判断在一个矩阵中是否存...

解决“指定的服务已经标记为删除”问题及“想删除指定服务器“

在注册DotNetWinService服务时,再使用 "sc de...

4.3 串口通信

4.3 串口通信 4.3.1 通信的概念 通信一词按照传统的理解就是信息的传输与交换...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。