消息的推与拉
这一节我们来一起看一下 MQ 消息中的推和拉两种模式。
推
缺点是如果消费者处理不过来,就会造成大量问题。
拉
缺点是实时性相对较差。
拉取策略实现
push 策略
package com.github.houbb.mq.consumer.core;
/**
* 推送消费策略
*
* @author binbin.hou
* @since 1.0.0
*/
public class MqConsumerPush extends Thread implements IMqConsumer {
@Override
public void run() {
// 启动服务端
log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}",
groupName, brokerAddress);
//1. 参数校验
this.paramCheck();
try {
//0. 配置信息
//1. 初始化
//2. 连接到服务端
//3. 标识为可用
//4. 添加钩子函数
//5. 启动完成以后的事件
this.afterInit();
log.info("MQ 消费者启动完成");
} catch (Exception e) {
log.error("MQ 消费者启动异常", e);
throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
}
}
/**
* 初始化完成以后
*/
protected void afterInit() {
}
// 其他方法
/**
* 获取消费策略类型
* @return 类型
* @since 0.0.9
*/
protected String getConsumerType() {
return ConsumerTypeConst.PUSH;
}
}
我们在 push 中预留了一个 方法,便于子类重载。
pull 策略
消费者实现
package com.github.houbb.mq.consumer.core;
/**
* 拉取消费策略
*
* @author binbin.hou
* @since 0.0.9
*/
public class MqConsumerPull extends MqConsumerPush {
private static final Log log = LogFactory.getLog(MqConsumerPull.class);
/**
* 拉取定时任务
*
* @since 0.0.9
*/
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
/**
* 单次拉取大小
* @since 0.0.9
*/
private int size = 10;
/**
* 初始化延迟毫秒数
* @since 0.0.9
*/
private int pullInitDelaySeconds = 5;
/**
* 拉取周期
* @since 0.0.9
*/
private int pullPeriodSeconds = 5;
/**
* 订阅列表
* @since 0.0.9
*/
private final List<MqTopicTagDto> subscribeList = new ArrayList<>();
// 设置
@Override
protected String getConsumerType() {
return ConsumerTypeConst.PULL;
}
@Override
public synchronized void subscribe(String topicName, String tagRegex) {
MqTopicTagDto tagDto = buildMqTopicTagDto(topicName, tagRegex);
if(!subscribeList.contains(tagDto)) {
subscribeList.add(tagDto);
}
}
@Override
public void unSubscribe(String topicName, String tagRegex) {
MqTopicTagDto tagDto = buildMqTopicTagDto(topicName, tagRegex);
subscribeList.remove(tagDto);
}
private MqTopicTagDto buildMqTopicTagDto(String topicName, String tagRegex) {
MqTopicTagDto dto = new MqTopicTagDto();
dto.setTagRegex(tagRegex);
dto.setTopicName(topicName);
return dto;
}
}
订阅相关
pull 策略可以把订阅/取消订阅放在本地,避免与服务端的交互。
定时拉取
/**
* 初始化拉取消息
* @since 0.0.6
*/
@Override
public void afterInit() {
//5S 发一次心跳
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(CollectionUtil.isEmpty(subscribeList)) {
log.warn("订阅列表为空,忽略处理。");
return;
}
for(MqTopicTagDto tagDto : subscribeList) {
final String topicName = tagDto.getTopicName();
final String tagRegex = tagDto.getTagRegex();
MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
List<MqMessage> mqMessageList = resp.getList();
if(CollectionUtil.isNotEmpty(mqMessageList)) {
for(MqMessage mqMessage : mqMessageList) {
IMqConsumerListenerContext context = new MqConsumerListenerContext();
mqListenerService.consumer(mqMessage, context);
}
}
} else {
log.error("拉取消息失败: {}", JSON.toJSON(resp));
}
}
}
}, pullInitDelaySeconds, pullPeriodSeconds, TimeUnit.SECONDS);
}
应用启动时,指定时间定时拉取消息并进行消费处理。
public MqConsumerPullResp pull(String topicName, String tagRegex, int fetchSize) {
MqConsumerPullReq req = new MqConsumerPullReq();
req.setSize(fetchSize);
req.setGroupName(groupName);
req.setTagRegex(tagRegex);
req.setTopicName(topicName);
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_MESSAGE_PULL);
Channel channel = getChannel(null);
return this.callServer(channel, req, MqConsumerPullResp.class);
}
Borker 相关
消息分发
// 消费者主动 pull
if(MethodType.C_MESSAGE_PULL.equals(methodType)) {
MqConsumerPullReq req = JSON.parseObject(json, MqConsumerPullReq.class);
return mqBrokerPersist.pull(req, channel);
}
实现
mqBrokerPersist 是一个接口,此处演示基于本地实现的,后续会实现基于数据库的持久化。
@Override
public MqConsumerPullResp pull(MqConsumerPullReq pullReq, Channel channel) {
//1. 拉取匹配的信息
//2. 状态更新为代理中
//3. 如何更新对应的消费状态呢?
// 获取状态为 W 的订单
final int fetchSize = pullReq.getSize();
final String topic = pullReq.getTopicName();
final String tagRegex = pullReq.getTagRegex();
List<MqMessage> resultList = new ArrayList<>(fetchSize);
List<MqMessagePersistPut> putList = map.get(topic);
// 性能比较差
if(CollectionUtil.isNotEmpty(putList)) {
for(MqMessagePersistPut put : putList) {
final String status = put.getMessageStatus();
if(!MessageStatusConst.WAIT_CONSUMER.equals(status)) {
continue;
}
final MqMessage mqMessage = put.getMqMessage();
List<String> tagList = mqMessage.getTags();
if(InnerRegexUtils.hasMatch(tagList, tagRegex)) {
// 设置为处理中
// TODO: 消息的最终状态什么时候更新呢?
// 可以给 broker 一个 ACK
put.setMessageStatus(MessageStatusConst.PROCESS_CONSUMER);
resultList.add(mqMessage);
}
if(resultList.size() >= fetchSize) {
break;
}
}
}
MqConsumerPullResp resp = new MqConsumerPullResp();
resp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
resp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
resp.setList(resultList);
return resp;
}
我们遍历找到匹配的消息,将其状态更新为中间状态。