一张图进阶 RocketMQ

数据库 投稿 43800 0 评论

一张图进阶 RocketMQ

前    言

“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:

  • 三此君向来不喜欢零零散散的知识点,通过一张图将零散的知识点连接起来,能够让我们对一个板块有更深入、更系统的理解。

  • 同时本系列尽可能的精炼,希望能够让大家花 20%的时间,快速理解这个板块下 80% 的内容。

  • 为了叙述的方便,绘图的时候将整个系列分为许多小的模块,讲解的时候也是按照模块循序渐进的。

  • 一张图解析 RocketMQ 原图

  • 一张图解析 RocketMQ 是会深入到源码层面,但是文中不会粘贴源码。三此君在看源码的时候写了很多备注,可以降低大家看源码的难度,需要的同学自行到三此君的仓库中 Fork:rocketmq release-4.3.0

  • 整体架构:会从大家熟悉的“生产者-消费者模式”逐步推出 RocketMQ 完整架构,只需要记住一张完整的架构图即可。

  • 元数据管理:我把 RocketMQ 集群的元数据整理成一张图,方便大家直观的了解都有哪些元数据,各有什么用。

  • 消息收发示例:通过 Docker 部署 RocketMQ,并用简单的示例串起 RocketMQ 消息收发流程。

整体架构

什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?

  • 生产者线程:生产产品,并把产品放到队列里。

  • 消费者线程:从队列里面获取产品,并消费。

这意味着什么呢,生产者和消费者之间实现解藕和异步。这就厉害了,因为我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。

相同类型的消息称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组和消费者组。

队列集群。

路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。RocketMQ 为这些路由信息的设置了管理员 NameServer,当然 NameServer 也可以有很多个,组成 NameServer 集群。

生产者(Producer),消费者(Consumer),NameServer 以及队列本身(Broker)。Broker 是代理的意思,负责队列的存取等操作,我们可以把 Broker 理解为队列本身。

    NameServer 起来后监听 端口,等待 Broker、Producer、Consumer 连上来,NameServer 是集群元数据管理中心。

  • 我们可以同时部署多个 Master 和多个 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 为 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。(可以暂时忽略 Broker 的主从角色)

  • Producer:Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,采用轮询策略从选择一个队列,然后与队列所在的 Broker 建立长连接,并向 Broker 发消息。

  • 我们刚刚提到骑手不止一个,取外卖快递的也不止我一个,所以会有生产者组合消费者组的概念。这里需要补充说明一下,消息分为集群消息和广播消息:

    • 只能有一个消费者实例消费。例如,同样是外卖 Topic,一份外卖,我们整个小区也只有一个人消费,就是集群消费。

    • 所有消费者实例都会消费。例如,如果是因为疫情,政府发放食品,那我们小区每个人都会消费,就是广播消费。

元数据管理

  • :Key 是 Topic 的名称,它存储了所有 Topic 的属性信息。Value 是个 QueueData 列表,长度等于这个 Topic 数据存储的 Master Broker 的个数,QueueData 里存储着 Broker 的名称、读写 queue 的数量、同步标识等。

  • :这个结构存储着一个 BrokerName 对应的属性信息,包括所属的 Cluster 名称,一个 Master Broker 和多个 Slave Broker 的地址信息

  • :存储的是集群中 Cluster 的信息,就是一个 Cluster 名称对应一个由 BrokerName 组成的集合。

  • :Key 是 BrokerAddr 对应着一台机器,BrokerLiveTable 存储的内容是这台 Broker 机器的实时状态,包括上次更新状态的时间戳,NameServer 会定期检查这个时间戳,超时没有更新就认为这个 Broker 无效了,将其从 Broker 列表里清除。

  • :Key 是 Broker 的地址,Value 是和这个 Broker 关联的多个 FilterServer 的地址。Filter Server 是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一个 Broker 可以有一个或多个 Filter Server。

其他角色会主动向 NameServer 上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息。

  • Broker 接到创建 Topic 的请求后向 NameServer 发送注册信息,NameServer 收到注册信息后首先更新 Broker 信息,然后对每个 Master 角色的 Broker,创建一个 QueueData 对象。如果是新建 Topic,就是添加 QueueData 对象;如果是修改 Topic,就是把旧的 QueueData 删除,加入新的 QueueData。

  • Broker 向 NameServer 发送的心跳会更新时间戳,NameServer 每 10 秒检查一次检查时间戳,检查到时间戳超过 2 分钟则认为 Broker 已失效,便会触发清理逻辑。

  • 连接断开的事件也会触发状态更新,当 NameServer 和 Broker 的长连接断掉以后,onChannelDestroy 函数会被调用,把这个 Broker 的信息清理出去。

  • Producer/Consumer 启动之后会和 NameServer 建立长连接,定时从 NameServer 获取路由信息保存到本地。消息的发送与拉取都会用到上面的数据。

消息收发示例

RocketMQ 部署

刚刚我们了解 RocketMQ 整体架构,那怎么样通过 RocketMQ 收发消息呢?需要先通过 Docker 部署一套 RocketMQ:

MacOS Docker 安装/Windows Docker 安装 进行安装。然后,通过 docker-compose 部署 RocketMQ:

  • 克隆

  • docker-middleware

  • 仓库,打开 RocketMQ 目录;

  • 修改中的参数为本机 IP;

  • 进入文件所在路径,执行命令即可;

RocketMQ 已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 RocketMQ 的 "Hello World"。

消息发送

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("Topic1","Tag", "Key",
                                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); 
        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
      	// 通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
  • 首先,实例化一个生产者,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。

  • 然后  得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先建立通信连接等。

  • 已经准备好了,那得准备好要发的内容,把 "Hello World" 发送到 Topic1。

  • 内容准备好,那   就可以把消息发送出去了。 怎么知道 Broker 地址呢?他就会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。

  • 生产者发送的消息通过网络传输给 Broker,Broker 需要对消息按照一定的结构进行存储。存储完成之后,把存储结果告知生产者。

消息接收

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
    	// 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    	// 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
    	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        erbconsumerijun.subscribe("sancijun", "*");
    	// 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
              List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
	}
}
  • 首先,实例化一个消费者,告诉它 NameServer 的地址,这样消费者才能从 NameServer 获取路由信息。

  • 然后这个消费者需要知道自己可以消费哪些 Topic 的消息,也就是每个消费者需要订阅一个或多个 Topic。

  • 消费者也需要做一些初始化,业务本身并没有理会怎么从 Broker 拉取消息,这些都是消费者默默无闻的奉献。所以,我们需要启动消费者,消费者会从 NameServer 拉取路由信息,并不断从 Broker 拉取消息。拉取回来的消息提供给业务定义的 MessageListener。

  • 消息拉取回来后,消费这需要怎么处理呢?每个消费者都不一样(业务本身决定),由我们业务定义的 MessageListener 处理。处理完之后,消费者也需要确认收货,就是告诉 Broker 消费成功了。

以上就是本文的全部内容,本文没有堆砌太多无意义的概念,没有讲什么削峰解耦,异步通信。这些内容网上也很多,看了和没看没什么两样。最后的最后,看懂的点赞,没看懂的收藏,顺便在分享给你的小伙伴。还没有关注的朋友记得关注,入股不亏。

参考文献

    RocketMQ 官方文档

  • RocketMQ 源码

  • 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08。

转载请注明出处

编程笔记 » 一张图进阶 RocketMQ

赞同 (65) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽