Redis Stream 2 使用
概要: Redis Stream的启动与基本交互命令
创建时间: 2022.10.21 00:42:34
更新时间: 2023.08.16 22:25:51
本文介绍Redis Stream的启动与基本交互命令。
连接Redis
以Docker形式启动Redis
YAML |
---|
| version: "3.2"
services:
redis-server:
privileged: true
image: redislabs/redismod
container_name: lzwang-redis
restart: always
ports:
- "6379:6379"
volumes:
- ./data:/data
# - ./redis.conf:/usr/local/etc/redis/redis.conf
|
进入redis-cli
Bash |
---|
| docker exec -it lzwang-redis bash
redis-cli
|
XADD 新增消息
语法
Bash |
---|
| XADD key ID field value [field value ...]
|
示例
在 my-mq
队列中新增消息,如果不存在此队列则自动创建;消息ID为 *
代表由Redis自动生成;后面是一组 kv
值,添加成功后会输出消息ID
Bash |
---|
| XADD my-mq * name lzwang msg hello
XADD my-mq * name lzwang msg hello msg2 world
|
XREAD 读取消息
语法
Bash |
---|
| XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
|
示例
从 my-mq
队列头部读取两条消息
Bash |
---|
| XREAD COUNT 2 STREAMS my-mq 0-0
|

从 my-mq
队列尾部读取两条消息,只接收新消息,阻塞时长10秒
Bash |
---|
| XREAD COUNT 2 BLOCK 10000 STREAMS my-mq $
# 在另一个 redis-cli 中新增一条消息
XADD my-mq * name lzwang msg hello222
# 然后原 redis-cli 将会接收到新发的消息
|
XTRIM 修剪消息队列长度
语法
Bash |
---|
| XTRIM key MAXLEN [~] count
|
示例
首先看下当前的消息队列中的全部内容
Bash |
---|
| XREAD COUNT 5 STREAMS my-mq 0-0
|

现在使用 XTRIM 将队列消息修剪为2条,Redis将自动舍弃最旧的消息

可以看到队列仅剩2条消息,且最旧的消息被移除
XDEL 删除消息
语法
可以删除一条或多条消息
示例
删除 1651327113528-0
这条消息
Bash |
---|
| XDEL my-mq 1651327113528-0
|
XLEN 获取消息队列长度
语法
示例
获取 my-mq
的队列长度
XRANGE 获取一组正序范围内的消息
语法
Bash |
---|
| XRANGE key start end [COUNT count]
|
其中 start
和 end
表示消息 ID
开始值和结束值,其中 -
代表最早,+
代表最晚
示例
首先我们写入一组数据到Redis Stream
Bash |
---|
| XADD my-mq * name lzwang msg python
XADD my-mq * name lzwang msg cpp
XADD my-mq * name lzwang msg java
XADD my-mq * name lzwang msg go
|

示例1,获取从最早到最晚的3条消息

示例2,获取指定ID区间的2条消息
Bash |
---|
| XRANGE my-mq 1651328168626-0 1651328174052-0 COUNT 2
|

示例3,正序获取全部消息
XREVRANGE 获取一组逆序范围内的消息
此命令可以看作是XRANGE
的逆序版
语法
Bash |
---|
| XREVRANGE key end start [COUNT count]
|
需要注意的是, -
代表最早,+
代表最晚,如果需要获取逆序的全部消息,则先 +
后 -
示例
示例1,获取最晚的3条消息
Bash |
---|
| XREVRANGE my-mq + - COUNT 3
|

示例2,获取指定区间的2条消息
Bash |
---|
| XREVRANGE my-mq 1651328174052-0 1651328168626-0 COUNT 2
|

示例3,逆序获取全部消息
XGROUP 消费者组操作
语法
Bash |
---|
| XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
|
示例
示例1,创建一个名为 group1
的消费者组,从头开始消费 my-mq
消息队列
Bash |
---|
| XGROUP CREATE my-mq group1 0-0
|
示例2,在 group1
的消费者组中,创建消费者 consumer1A
consumer1B
Bash |
---|
| XGROUP CREATECONSUMER my-mq group1 consumer1A
XGROUP CREATECONSUMER my-mq group1 consumer1B
|
示例3,在 group1
的消费者组中,删除消费者 consumer1A
Bash |
---|
| XGROUP DELCONSUMER my-mq group1 consumer1A
|
XREADGROUP 读取消费者组中的消息
语法
Bash |
---|
| XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
|
示例
读取 group1
消费者组中 consumer1B
的3条消息,阻塞30秒
Bash |
---|
| XREADGROUP GROUP group1 consumer1B COUNT 3 BLOCK 30000 STREAMS my-mq >
XREADGROUP GROUP group1 consumer1B COUNT 3 BLOCK 30000 STREAMS my-mq >
|

注意,由于原 my-mq
队列中有5条消息,所以上面的1次命令无法消费完毕,第2次消费时,会先消费完队列的剩余消息,然后阻塞等待;此时外部手动往队列中新增了消息,第2次消费满3条消息,命令执行完毕
XPENDING 查看待确认消息
此命令用于查看待确认的消息,进而确保消费者确实消费了某条消息,防止丢失
语法
Bash |
---|
| XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
|
示例
示例1,查看 my-mq
队列中 group1
中所有消费者当前待确认的消息

其中2,3行分别表示消息的开始和结束ID
示例2,查看 my-mq
队列中 group1
中10条待确认的消息详情
Bash |
---|
| XPENDING my-mq group1 - + 10
|
XACK 确认已处理某条消息
此命令用于消费者确认消费了某条消息,并将某条消息从待确认的消息中移除
Bash |
---|
| XACK key group ID [ID ...]
|
示例
对 my-mq
队列中 group1
确认消费ID为 1651331276813-0
这条消息
Bash |
---|
| XACK my-mq group1 1651331276813-0
|
XINFO 查看队列、消费群组或消费者的信息
语法
Bash |
---|
| XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
|
示例
示例1,查看 my-mq
队列信息

示例2,查看 my-mq
队列中的全部消费者组信息

示例3,查看 my-mq
队列中的 group1
消费者组中的全部消费者信息
Bash |
---|
| XINFO CONSUMERS my-mq group1
|
参考