rocketmq-spring : 实战与源码解析一网打尽

科技资讯 投稿 7600 0 评论

rocketmq-spring : 实战与源码解析一网打尽

这篇文章会介绍 Spring Boot 项目使用 rocketmq-spring SDK 实现消息收发的操作流程,同时笔者会从开发者的角度解读 SDK 的设计逻辑

1 SDK 简介

项目地址:

rocketmq-spring 的本质是一个 Spring Boot starter

约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能通过简单地与各种启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。

引入 starter 的依赖定义,在配置文件中编写约定的配置即可。

1、引入依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.3</version>
</dependency>

2、约定配置

2 生产者

首先我们添加依赖后,进行如下三个步骤:

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
      group: platform-sms-server-group
    # access-key: myaccesskey
    # secret-key: mysecretkey
  topic: sms-common-topic

生产者配置非常简单,主要配置名字服务地址生产者组

需要发送消息的类中注入 RcoketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Value("${rocketmq.topic}"
private String smsTopic;

3、发送消息,消息体可以是自定义对象,也可以是 Message 对象

    同步发送 syncSend
  1. 异步发送 asyncSend
  2. 顺序发送 syncSendOrderly
  3. oneway发送 sendOneWay

下面的代码展示如何同步发送消息。

String destination = StringUtils.isBlank(tags ? topic : topic + ":" + tags;
SendResult sendResult =
         rocketMQTemplate.syncSend(
            destination, 
            MessageBuilder.withPayload(messageContent.
            setHeader(MessageConst.PROPERTY_KEYS, uniqueId.
            build(
          ;
if (sendResult != null {
    if (sendResult.getSendStatus( == SendStatus.SEND_OK {
       // send message success,do something 
    }
}

syncSend 方法的第一个参数是发送的目标,格式是:topic + ":" + tags

spring-message 规范的 message 对象,而 MessageBuilder 是一个工具类,方法链式调用创建消息对象。

3 消费者

1、配置文件中配置如下

rocketmq:
  name-server: 127.0.0.1:9876
  consumer1:
    group: platform-sms-worker-common-group
    topic: sms-common-topic

2、实现消息监听器

@Component
@RocketMQMessageListener(
    consumerGroup = "${rocketmq.consumer1.group}",  //消费组
    topic = "${rocketmq.consumer1.topic}"  					//主题

public class SmsMessageCommonConsumer implements RocketMQListener<String> {
    public void onMessage(String message {
       System.out.println("普通短信:" + message;
    }
}

消费者实现类也可以实现 RocketMQListener<MessageExt>, 在 onMessage 方法里通过 RocketMQ 原生消息对象 MessageExt 获取更详细的消息数据

public void onMessage(MessageExt message {
    try {
        String body = new String(message.getBody(, "UTF-8";
        logger.info("普通短信:" + message;
    } catch (Exception e {
        logger.error("common onMessage error:", e;
    }
}

4 源码概览

1、rocketmq-spring-boot-parent

2、rocketmq-spring-boot

3、rocketmq-spring-boot-starter

4、rocketmq-spring-boot-samples

5 starter 实现

我们重点分析下 rocketmq-spring-boot 模块的核心源码:


1、定义 Spring 自身的依赖包和 RocketMQ 的依赖包 ;

2、定义spring.factories 文件

定义自动加载类,文件内容是:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

spring boot 会根据文件中配置的自动化配置类来自动初始化相关的 Bean、Component 或 Service。

3、实现自动加载类

▍生产者发送模板类:RocketMQTemplate

两个重点需要强调:

▍自定义消费者类

后置处理器,然后调用 ListenerContainerConfiguration 类的 registerContainer 方法 。

消费消息的逻辑,同时满足 RocketMQListener 泛化接口支持不同参数,比如 String 、MessageExt 、自定义对象 。

rocketMQListener.onMessage(doConvertMessage(messageExt;

6 写到最后

开源项目 rocketmq-spring 有很多值得学习的地方,我们可以从如下四个层面逐层进阶:

学会如何使用 :参考 rocketmq-spring-boot-samples 模块的示例代码,学会如何发送和接收消息,快速编码;

模块设计:学习项目的模块分层 (父模块、SDK 模块、核心实现模块、示例代码模块);

starter 设计思路 :定义自动配置文件 spring.factories 、设计配置属性类 、在 RocketMQ client 的基础上实现优雅的封装、深入理解 RocketMQ 源码等;

举一反三:当我们理解了 rocketmq-spring 的源码,我们可以尝试模仿该项目写一个简单的 spring boot starter。


点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

编程笔记 » rocketmq-spring : 实战与源码解析一网打尽

赞同 (42) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽