跳转至

Redis Stream 2 使用

概要: Redis Stream的启动与基本交互命令

创建时间: 2022.10.21 00:42:34

更新时间: 2023.08.16 22:25:51

本文介绍Redis Stream的启动与基本交互命令。

连接Redis

以Docker形式启动Redis

YAML
version: "3.2"

services:
  redis-server:
    privileged: true
    image: redislabs/redismod
    container_name: lzwang-redis
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - ./data:/data
      # - ./redis.conf:/usr/local/etc/redis/redis.conf

进入redis-cli

Bash
docker exec -it lzwang-redis bash
redis-cli
image.png

XADD 新增消息

语法

Bash
XADD key ID field value [field value ...]

示例

my-mq 队列中新增消息,如果不存在此队列则自动创建;消息ID为 * 代表由Redis自动生成;后面是一组 kv 值,添加成功后会输出消息ID

Bash
XADD my-mq * name lzwang msg hello
XADD my-mq * name lzwang msg hello msg2 world
image.png

XREAD 读取消息

语法

Bash
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

示例

my-mq 队列头部读取两条消息

Bash
XREAD COUNT 2 STREAMS my-mq 0-0
image.png
my-mq 队列尾部读取两条消息,只接收新消息,阻塞时长10秒
Bash
1
2
3
4
5
XREAD COUNT 2 BLOCK 10000 STREAMS my-mq $

# 在另一个 redis-cli 中新增一条消息
XADD my-mq * name lzwang msg hello222
# 然后原 redis-cli 将会接收到新发的消息
image.png

XTRIM 修剪消息队列长度

语法

Bash
XTRIM key MAXLEN [~] count

示例

首先看下当前的消息队列中的全部内容

Bash
XREAD COUNT 5 STREAMS my-mq 0-0
image.png
现在使用 XTRIM 将队列消息修剪为2条,Redis将自动舍弃最旧的消息
Bash
XTRIM my-mq MAXLEN 2
image.png
可以看到队列仅剩2条消息,且最旧的消息被移除

XDEL 删除消息

语法

Bash
XDEL key ID [ID ...]
可以删除一条或多条消息

示例

删除 1651327113528-0 这条消息

Bash
XDEL my-mq 1651327113528-0
image.png

XLEN 获取消息队列长度

语法

Bash
XLEN key

示例

获取 my-mq 的队列长度

Bash
XLEN my-mq
image.png

XRANGE 获取一组正序范围内的消息

语法

Bash
XRANGE key start end [COUNT count]
其中 startend 表示消息 ID 开始值和结束值,其中 - 代表最早,+ 代表最晚

示例

首先我们写入一组数据到Redis Stream

Bash
1
2
3
4
XADD my-mq * name lzwang msg python
XADD my-mq * name lzwang msg cpp
XADD my-mq * name lzwang msg java
XADD my-mq * name lzwang msg go
image.png
示例1,获取从最早到最晚的3条消息
Bash
XRANGE my-mq - + COUNT 3
image.png
示例2,获取指定ID区间的2条消息
Bash
XRANGE my-mq 1651328168626-0 1651328174052-0 COUNT 2
image.png
示例3,正序获取全部消息
Bash
XRANGE my-mq - +
image.png

XREVRANGE 获取一组逆序范围内的消息

此命令可以看作是XRANGE的逆序版

语法

Bash
XREVRANGE key end start [COUNT count]
需要注意的是, - 代表最早,+ 代表最晚,如果需要获取逆序的全部消息,则先 +-

示例

示例1,获取最晚的3条消息

Bash
XREVRANGE my-mq + - COUNT 3
image.png
示例2,获取指定区间的2条消息
Bash
XREVRANGE my-mq 1651328174052-0 1651328168626-0 COUNT 2
image.png
示例3,逆序获取全部消息
Bash
XREVRANGE my-mq + -
image.png

XGROUP 消费者组操作

语法

Bash
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

示例

示例1,创建一个名为 group1 的消费者组,从头开始消费 my-mq 消息队列

Bash
XGROUP CREATE my-mq group1 0-0
示例2,在 group1 的消费者组中,创建消费者 consumer1A consumer1B
Bash
XGROUP CREATECONSUMER my-mq group1 consumer1A
XGROUP CREATECONSUMER my-mq group1 consumer1B
示例3,在 group1 的消费者组中,删除消费者 consumer1A
Bash
XGROUP DELCONSUMER my-mq group1 consumer1A
image.png

XREADGROUP 读取消费者组中的消息

语法

Bash
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

示例

读取 group1 消费者组中 consumer1B 的3条消息,阻塞30秒

Bash
XREADGROUP GROUP group1 consumer1B COUNT 3 BLOCK 30000 STREAMS my-mq >
XREADGROUP GROUP group1 consumer1B COUNT 3 BLOCK 30000 STREAMS my-mq >
image.png
注意,由于原 my-mq 队列中有5条消息,所以上面的1次命令无法消费完毕,第2次消费时,会先消费完队列的剩余消息,然后阻塞等待;此时外部手动往队列中新增了消息,第2次消费满3条消息,命令执行完毕

XPENDING 查看待确认消息

此命令用于查看待确认的消息,进而确保消费者确实消费了某条消息,防止丢失

语法

Bash
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

示例

示例1,查看 my-mq 队列中 group1 中所有消费者当前待确认的消息

Bash
XPENDING my-mq group1
image.png
其中2,3行分别表示消息的开始和结束ID
示例2,查看 my-mq 队列中 group1 中10条待确认的消息详情
Bash
XPENDING my-mq group1 - + 10
image.png

XACK 确认已处理某条消息

此命令用于消费者确认消费了某条消息,并将某条消息从待确认的消息中移除

语法 XACK | Redis

Bash
XACK key group ID [ID ...]

示例

my-mq 队列中 group1 确认消费ID为 1651331276813-0 这条消息

Bash
XACK my-mq group1 1651331276813-0
image.png

XINFO 查看队列、消费群组或消费者的信息

语法

Bash
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

示例

示例1,查看 my-mq 队列信息

Bash
XINFO STREAM my-mq
image.png
示例2,查看 my-mq 队列中的全部消费者组信息
Bash
XINFO GROUPS my-mq
image.png
示例3,查看 my-mq 队列中的 group1 消费者组中的全部消费者信息
Bash
XINFO CONSUMERS my-mq group1
image.png

参考