前 言
“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:
三此君向来不喜欢零零散散的知识点,通过一张图将零散的知识点连接起来,能够让我们对一个板块有更深入、更系统的理解。
同时本系列尽可能的精炼,希望能够让大家花 20%的时间,快速理解这个板块下 80% 的内容。
为了叙述的方便,绘图的时候将整个系列分为许多小的模块,讲解的时候也是按照模块循序渐进的。
一张图解析 RocketMQ 原图
一张图解析 RocketMQ 是会深入到源码层面,但是文中不会粘贴源码。三此君在看源码的时候写了很多备注,可以降低大家看源码的难度,需要的同学自行到三此君的仓库中 Fork:rocketmq release-4.3.0
整体架构:会从大家熟悉的“生产者-消费者模式”逐步推出 RocketMQ 完整架构,只需要记住一张完整的架构图即可。
元数据管理:我把 RocketMQ 集群的元数据整理成一张图,方便大家直观的了解都有哪些元数据,各有什么用。
消息收发示例:通过 Docker 部署 RocketMQ,并用简单的示例串起 RocketMQ 消息收发流程。
整体架构
什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?
生产者线程:生产产品,并把产品放到队列里。
消费者线程:从队列里面获取产品,并消费。
这意味着什么呢,生产者和消费者之间实现解藕和异步。这就厉害了,因为我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。
相同类型的消息称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组和消费者组。
队列集群。
路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。RocketMQ 为这些路由信息的设置了管理员 NameServer,当然 NameServer 也可以有很多个,组成 NameServer 集群。
生产者(Producer),消费者(Consumer),NameServer 以及队列本身(Broker)。Broker 是代理的意思,负责队列的存取等操作,我们可以把 Broker 理解为队列本身。
我们可以同时部署多个 Master 和多个 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 为 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。(可以暂时忽略 Broker 的主从角色)
Producer:Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,采用轮询策略从选择一个队列,然后与队列所在的 Broker 建立长连接,并向 Broker 发消息。
我们刚刚提到骑手不止一个,取外卖快递的也不止我一个,所以会有生产者组合消费者组的概念。这里需要补充说明一下,消息分为集群消息和广播消息:
只能有一个消费者实例消费。例如,同样是外卖 Topic,一份外卖,我们整个小区也只有一个人消费,就是集群消费。
所有消费者实例都会消费。例如,如果是因为疫情,政府发放食品,那我们小区每个人都会消费,就是广播消费。
NameServer 起来后监听 端口,等待 Broker、Producer、Consumer 连上来,NameServer 是集群元数据管理中心。
元数据管理
:Key 是 Topic 的名称,它存储了所有 Topic 的属性信息。Value 是个 QueueData 列表,长度等于这个 Topic 数据存储的 Master Broker 的个数,QueueData 里存储着 Broker 的名称、读写 queue 的数量、同步标识等。
:这个结构存储着一个 BrokerName 对应的属性信息,包括所属的 Cluster 名称,一个 Master Broker 和多个 Slave Broker 的地址信息
:存储的是集群中 Cluster 的信息,就是一个 Cluster 名称对应一个由 BrokerName 组成的集合。
:Key 是 BrokerAddr 对应着一台机器,BrokerLiveTable 存储的内容是这台 Broker 机器的实时状态,包括上次更新状态的时间戳,NameServer 会定期检查这个时间戳,超时没有更新就认为这个 Broker 无效了,将其从 Broker 列表里清除。
:Key 是 Broker 的地址,Value 是和这个 Broker 关联的多个 FilterServer 的地址。Filter Server 是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一个 Broker 可以有一个或多个 Filter Server。
其他角色会主动向 NameServer 上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息。
Broker 接到创建 Topic 的请求后向 NameServer 发送注册信息,NameServer 收到注册信息后首先更新 Broker 信息,然后对每个 Master 角色的 Broker,创建一个 QueueData 对象。如果是新建 Topic,就是添加 QueueData 对象;如果是修改 Topic,就是把旧的 QueueData 删除,加入新的 QueueData。
Broker 向 NameServer 发送的心跳会更新时间戳,NameServer 每 10 秒检查一次检查时间戳,检查到时间戳超过 2 分钟则认为 Broker 已失效,便会触发清理逻辑。
连接断开的事件也会触发状态更新,当 NameServer 和 Broker 的长连接断掉以后,onChannelDestroy 函数会被调用,把这个 Broker 的信息清理出去。
Producer/Consumer 启动之后会和 NameServer 建立长连接,定时从 NameServer 获取路由信息保存到本地。消息的发送与拉取都会用到上面的数据。
消息收发示例
RocketMQ 部署
刚刚我们了解 RocketMQ 整体架构,那怎么样通过 RocketMQ 收发消息呢?需要先通过 Docker 部署一套 RocketMQ:
MacOS Docker 安装/Windows Docker 安装 进行安装。然后,通过 docker-compose 部署 RocketMQ:
克隆
docker-middleware
仓库,打开 RocketMQ 目录;
修改中的参数为本机 IP;
进入文件所在路径,执行命令即可;
RocketMQ 已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 RocketMQ 的 "Hello World"。
消息发送
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("Topic1","Tag", "Key",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
首先,实例化一个生产者,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。
然后 得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先建立通信连接等。
已经准备好了,那得准备好要发的内容,把 "Hello World" 发送到 Topic1。
内容准备好,那 就可以把消息发送出去了。 怎么知道 Broker 地址呢?他就会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。
生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。
消息接收
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
erbconsumerijun.subscribe("sancijun", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
}
}
首先,实例化一个消费者,告诉它 NameServer 的地址,这样消费者才能从 NameServer 获取路由信息。
然后这个消费者需要知道自己可以消费哪些 Topic 的消息,也就是每个消费者需要订阅一个或多个 Topic。
消费者也需要做一些初始化,业务本身并没有理会怎么从 Broker 拉取消息,这些都是消费者默默无闻的奉献。所以,我们需要启动消费者,消费者会从 NameServer 拉取路由信息,并不断从 Broker 拉取消息。拉取回来的消息提供给业务定义的 MessageListener。
消息拉取回来后,消费这需要怎么处理呢?每个消费者都不一样(业务本身决定),由我们业务定义的 MessageListener 处理。处理完之后,消费者也需要确认收货,就是告诉 Broker 消费成功了。
以上就是本文的全部内容,本文没有堆砌太多无意义的概念,没有讲什么削峰解耦,异步通信。这些内容网上也很多,看了和没看没什么两样。最后的最后,看懂的点赞,没看懂的收藏,顺便在分享给你的小伙伴。还没有关注的朋友记得关注,入股不亏。
参考文献
RocketMQ 源码
李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08。
RocketMQ 官方文档
转载请注明出处