Redis Stream 简介
Stream从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。下文称Stream为队列
Stream 出现原因:
Stream的出现是为了给Redis提供完善的消息队列功能
- PUB/SUB,订阅/发布模式
- 基于List的 LPUSH+BRPOP 的实现
- 基于有序集合的实现
类型 | 优点 | 缺点 |
---|---|---|
List | 支持阻塞式的获取消息 | 没有消息多播功能,没有ACK机制,无法重复消费等等 |
Pub/Sub | 支持消息多播 | 消息无法持久化,只管发送,如果出现网络断开、Redis宕机等,消息就直接没了,自然也没有ACK机制。 |
Sorted Set | 支持延时消息 | 不支持阻塞式获取消息、不允许重复消费、不支持分组。 |
发布订阅模式
Redis 发布订阅 (pub/sub 是一种消息通信模式:发送者 (pub 发送消息,订阅者 (sub 接收消息。
订阅者首先订阅channel
psubscribe news
publish news "hello world"
致命缺点
:
Redis的Pub/Sub为什么被抛弃?
最主要的原因是它无法持久化,没有实现持久化机制的Pub/Sub,无法做到消息的不丢失,在客户端宕机或者Redis服务宕机的情况下,都会导致消息丢失。Stream
Redis 5.0发布的Stream相比Pub/Sub模块,Stream支持消息持久化,结合集群使其成为了一个比较可靠的消息队列。
提供了消息多播的功能,同一个消息可被分发给多个单消费者和消费者组
提供了对于消费者和消费者组的阻塞、非阻塞的获取消息的功能
- 消费者组实现同组多个消费者并行但不重复消费消息的能力,提升消费能力;
- 消费者组能够记住最新消费的信息,保证消息连续消费;
- 消费者组提供了ACK确认机制,保证消息被成功消费,不丢失;
Stream本质上是Redis中的key,相关指令根据可以分为两类,分别是消息队列相关指令,消费组相关指令。
指令名称 | 指令作用 |
---|---|
XADD | 添加消息到队列末尾 |
XTRIM | 限制Stream的长度,如果已经超长会进行截取 |
XDEL | 删除消息 |
XLEN | 获取Stream中的消息长度 |
XRANGE | 获取消息列表(可以指定范围),忽略删除的消息 |
XREVRANGE | 和XRANGE相比区别在于反向获取,ID从大到小 |
XREAD | 获取消息(阻塞/非阻塞),返回大于指定ID的消息 |
消费者相关指令:
指令名称 | 指令作用 |
---|---|
XGROUP CREATE | 创建消费者组 |
XREADGROUP | 读取消费者组中的消息 |
XACK | ack消息,消息被标记为“已处理” |
XGROUP SETID | 设置消费者组最后递送消息的ID |
XGROUP DELCONSUMER | 删除消费者组 |
XPENDING | 打印待处理消息的详细信息 |
XCLAIM | 转移消息的∂归属权(长期未被处理/无法处理的消息,转交给其他消费者组进行处理) |
XINFO | 打印Stream\Consumer\Group的详细信息 |
XINFO GROUPS | 打印消费者组的详细信息 |
XINFO STREAM | 打印Stream的详细信息 |
消息队列操作
XADD
添加的消息是一个和多个键值对。XADD也是唯一可以向队列中添加数据的 Redis 命令。
XADD key ID field value [field value ...]
- key:队列名称,如果不存在就创建
- ID:消息id,使用*表示由redis生成。可以自定义,但是要自己保证递增性
- field value:记录,当前消息内容,由一个或多个key-value构成
命令使用:
创建两条消息,分别是(name=tom, age=22,(height=180, use=iphone
127.0.0.1:6379> xadd mystream * name tom age 22
"1674984765438-0"
127.0.0.1:6379> xadd mystream * height 180 use iphone
"1674985213802-0"
创建消息时会生成一个序号,支持自定义序号和自动生成序号。*
表示自动生成序号
XLEN
语法格式:
XLEN key
命令使用:
127.0.0.1:6379> xlen mystream
(integer 2
XDEL
使用XDEL删除消息。语法格式:
XDEL key ID [ID ...]
XDEL删除消息的指令,并不会从内存上删除消息,它只是给消息打上标记位,下次通过XRANGE指令忽略这些消息
XRANGE
XRANGE key start end [COUNT count]
- key:队列名
- start:开始值,-表示最小值
- end:结束值,+表示最大值
- count:数量
命令使用:
不指定count默认查询所有
127.0.0.1:6379> xrange mystream - +
1 1 "1674984765438-0"
2 1 "name"
2 "tom"
3 "age"
4 "22"
2 1 "1674985213802-0"
2 1 "height"
2 "180"
3 "use"
4 "iphone"
127.0.0.1:6379>
XREAD
XREAD命令提供读取队列消息的能力,返回大于指定ID的消息。
XREAD常用于用于迭代队列的消息,所以传递给 XREAD 的通常是上一次从该队列接收到的最后一个消息的ID。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count:用于限定获取的消息数量
- BLOCK milliseconds:用于设置XREAD为阻塞模式以及阻塞的时长,单位毫秒,默认为非阻塞模式
- ID:设置开始读取的消息ID,使用0表示从第一条消息开始。
消息队列ID是单调递增的,所以通过设置起点,可以向后读取。
在阻塞模式中,可以使用$
,表示最新的消息ID, block 0表示永久阻塞。(非阻塞模式下$无意义)。
命令使用:
非阻塞读取
从第一条消息开始
127.0.0.1:6379> xread streams mystream 0
1 1 "mystream"
2 1 1 "1674984765438-0"
2 1 "name"
2 "tom"
3 "age"
4 "22"
2 1 "1674985213802-0"
2 1 "height"
2 "180"
3 "use"
4 "iphone"
127.0.0.1:6379>
阻塞读取
127.0.0.1:6379> xread block 10000 streams mystream $ (nil (10.04s 127.0.0.1:6379>
阻塞模式读,阻塞时长为10s。如果10s内未读取到消息则退出阻塞。另开一个终端向队列中写入一条消息,阻塞读的终端就能接收到消息。
消费者操作
XGROUP CREATE
从资源结构上说消费者从属于一个消费组
- 一个队列可以拥有多个消费组。不同消费组之间读取队列互不干扰
语法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
- key:队列名称,如果不存在就创建
- groupname:组名
- id: $表示从尾部开始消费,只接受新消息,当前Stream消息会全部忽略
命令使用:
127.0.0.1:6379> XGROUP CREATE mystream mqGroup 0
OK
XREADGROUP
读取队列的消息。在读取消息时需要指定消费者,只需要指定名字,不用预先创建。
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
[NOACK] STREAMS key [key ...] id [id ...]
- group:消费组名
- consumer:消费者名
- count:读取数量
- BLOCK milliseconds:阻塞读以及阻塞毫秒数。默认非阻塞。和XREAD类似
- key:队列名
- id:消息ID。ID可以填写特殊符号
>
,表示未被组内消费的起始消息
命令使用:
创建消费者consumerA和consumerB,各读取一条消息
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mystream >
1 1 "mystream"
2 1 1 "1674984765438-0"
2 1 "name"
2 "tom"
3 "age"
4 "22"
127.0.0.1:6379> XREADGROUP group mqGroup consumerB count 1 streams mystream >
1 1 "mystream"
2 1 1 "1674985213802-0"
2 1 "height"
2 "180"
3 "use"
4 "iphone"
可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后读取的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。
XPENDING
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,Stream 设计了 Pending 列表,用于记录读取但并未确认完毕的消息。
语法格式:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
- key:队列名
- group: 消费组名
- start:开始值,-表示最小值
- end:结束值,+表示最大值
- count:数量
命令使用:
127.0.0.1:6379> xlen mystream
(integer 3
127.0.0.1:6379> xpending mystream mqGroup
1 (integer 2 # 2个已读取但未处理的消息
2 "1674984765438-0" # 起始ID
3 "1674985213802-0" # 结束ID
4 1 1 "consumerA" # 消费者A有1个
2 "1"
2 1 "consumerB" # 消费者B有1个
2 "1"
队列中一共三条信息,有两条被消费但未处理完毕,也就是上面XREADGROUP消费的两条。一个是消费者consumerA,另一个是consumerB。
127.0.0.1:6379> xpending mystream mqGroup - + 10
1 1 "1674984765438-0"
2 "consumerA"
3 (integer 12110001
4 (integer 1
2 1 "1674985213802-0"
2 "consumerB"
3 (integer 89140701
4 (integer 1
XACK
对于已读取未处理的消息,使用命令 XACK 完成告知消息处理完成
XACK 命令确认消费的信息,一旦信息被确认处理,就表示信息被完善处理。
XACK key group id [id ...]
- key: stream 名
- group:消费组
- id:消息ID
命令使用:
1674985213802-0
127.0.0.1:6379> XACK mystream mqGroup 1674985213802-0
(integer 1
127.0.0.1:6379>
XCLAIM
某个消费者读取了消息但没有处理,这时消费者宕机或重启等就会导致该消息失踪。那么就需要该消息转移给其他的消费者处理,就是消息转移。XCLAIM来实现消息转移的操作。
XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms]
[TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID]
[LASTID id]
- key: 队列名称
- group :消费组
- consumer:消费组里的消费者
- min-idle-time 最小时间。空闲时间大于min-idle-time的消息才会被转移成功
- id:消息的ID
转移除了要指定ID外,还需要指定min-idle-time,min-idle-time是最小空闲时间,该值要小于消息的空闲时间,这个参数是为了保证是多于多长时间的消息未处理的才被转移。比如超过24小时的处于pending未xack的消息要进行转移
同时min-idle-time还有一个功能是能够避免两个消费者同时转移一条消息。被转移的消息的IDLE会被重置为0。假设两个消费者都以2min来转移,第一个成功之后IDLE被重置为0,第二个消费者就会因为min-idle-time大与空闲时间而是失败。
目前未确认的消息
127.0.0.1:6379> xpending mystream mqGroup - + 10
1 1 "1674984765438-0"
2 "consumerA"
3 (integer 12145595
4 (integer 1
id: 1674984765438-0
空闲时间:12145595,单位ms
读取次数:1
127.0.0.1:6379> XCLAIM mystream mqGroup consumerB 3600000 1674984765438-0
1 1 "1674984765438-0"
2 1 "name"
2 "tom"
3 "age"
4 "22"
查看未确认的消息
消息已经从consumerA转移给consumerB,IDLE重置,读取次数加1。转移之后就可以继续处理这条消息。
127.0.0.1:6379> xpending mystream mqGroup - + 10
1 1 "1674984765438-0"
2 "consumerB"
3 (integer 5729 # 注意IDLE,被重置了
4 (integer 2 # 注意,读取次数也累加了1次
通常转移操作的完整流程是:
- 先用xpending命令找出所有未确认的消息
- 再用xclaim命令转移所有未确认消息
XAUTOCLAIM,可以将xpending查找未确认消息和xclaim转移消息合并成一个操作。
XINFO
查看队列信息
127.0.0.1:6379> xinfo stream mystream
1 "length"
2 (integer 3
3 "radix-tree-keys"
4 (integer 1
5 "radix-tree-nodes"
6 (integer 2
7 "groups"
8 (integer 1
9 "last-generated-id"
10 "1674985995856-0"
11 "first-entry"
12 1 "1674984765438-0"
2 1 "name"
2 "tom"
3 "age"
4 "22"
13 "last-entry"
14 1 "1674985995856-0"
2 1 "name"
2 "jack"
消费组信息
127.0.0.1:6379> xinfo groups mystream
1 1 "name"
2 "mqGroup"
3 "consumers"
4 (integer 2
5 "pending"
6 (integer 1
7 "last-delivered-id"
8 "1674985213802-0"
消费者组成员信息
127.0.0.1:6379> xinfo consumers mystream mqGroup
1 1 "name"
2 "consumerA"
3 "pending"
4 (integer 0
5 "idle"
6 (integer 12904777
2 1 "name"
2 "consumerB"
3 "pending"
4 (integer 1
5 "idle"
6 (integer 696457
127.0.0.1:6379>
项目中中Stream的使用
项目中部分web请求的处理是异步处理,web服务调用底层模块异步执行。当底层模块处理完成后需要保存结果并通知web服务,所以使用Stream作为保存的载体。
Stream 的生产和消费
生产
向队列中写消息
def batch_xadd(self, name: str, payloads: List[Dict] -> None:
pipe = self._redis.pipeline(
for payload in payloads:
pipe.xadd(name, payload
pipe.execute(
消费
定时任务间隔10s从队列中读消息while True: _, payloads = await self._conn.xautoclaim( self.stream_name, self.group_name, self.consumer_name, min_idle_time id_ = last_id if check_backlog else ">" for _, messages in await self._conn.xreadgroup( groupname=self.group_name, consumername=self.consumer_name, streams={self.stream_name: id_}, block=block_timeout, : ... last_id = messages[-1][0] payloads += messages # 处理队列中取出的消息,耗时操作 successful_ids = await f_processor(payloads if successful_ids: await self._conn.xack(self.stream_name, self.group_name, *successful_ids
Stream和专业消息队列对比
专业的消息队列包括:
RabbitMQ
- RocketMQ
- Kafka
消息不丢
- 消息可堆积
下面从这两个方面来对比Stream和专业消息队列。
消息不丢
生产者
:消息发送失败或发送超时,这两种情况会导致数据丢失,可以使用重试来解决。不依赖消息中间件,需要业务实现。消费者:消费者存在读取消息未处理完就异常宕机了,消费者要还能重新读取消息。Stream和其他消息中间件都能做到。
队列中间件:中间件要保证数据不丢失。 Redis 在以下 2 个场景下,都会导致数据丢失:
AOF 持久化配置为每秒写盘,Redis 宕机时会存在丢失最后1秒数据的可能
- 主从复制的集群,主从切换时,从库还未同步完成主库发来的数据,就被提成主库,也存在丢失数据的可能。
专业队列如何解决数据丢失问题:
RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时一般是部署一个集群。生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息冗余。这样一来,即便其中一个节点挂了,集群也能的数据不丢失。消息积压
所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
总结
综上可以看到,把 Redis 当作队列来使用时,始终面临两个问题:
Redis 本身可能会丢数据
- 面对消息积压,Redis 内存资源紧张
优缺点
优点:
使用成本低。几乎每一个项目都会使用Redis,用Stream做消息队列就不需要额外再引入中间件,减少系统复杂性,运维成本,硬件资源。
缺点:
Redis 的数据都存储在内存中,内存持续增长超过机器内存上限,就会面临 OOM 的风险
- Stream 作为Redis的一种数据结构,Redis 在持久化或主从切换时有丢失数据的风险,所以Stream也有丢失消息的风险
- 所有的消息会一直保存在Stream中,没有删除机制。要么定时清除,那么设置队列的长度自动丢弃先入列消息
使用场景
适用
适用业务场景:
- 场景足够简单
- 对于数据丢失不敏感
- 消息积压概率比较小
基于redis的高性能和使用内存的机制使得其的性能优于大部分消息队列。在小规模场景会有更出色的表现。
不适用
不适用业务场景:
- 对于数据丢失非常敏感,如订单系统
- 写入量非常大,并发请求大
- 消息积压时会占用很多的内存资源,消息数据量大
题外话
技术选型出了技术本身之外还要考虑公司团队能否匹配技术。如果在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。
实际案例讨论
同一个大型项目中子项目的互相调用。TMS调用ATS获取数据集
- 丢失数据不敏感
- 业务场景简单
- 消息积压概率比较小
参考:
https://zhuanlan.zhihu.com/p/60501638
https://redis.io/commands/xclaim/
https://liziba.blog.csdn.net/article/details/120320018
https://juejin.cn/post/6962423461071290375#heading-2