一、背景
最近在看redis
这方面的知识,发现在redis5
中产生了一种新的数据类型Stream
,它和kafka
的设计有些类似,可以当作一个简单的消息队列来使用。
二、redis中Stream类型的特点
- 支持持久化。streams能持久化存储数据,不同于pub/sub机制和list 消息被消费后就会被删除,streams消费过的数据会被持久化的保存在历史中。
- 支持消息的多播、分组消费。 这一点跟 pub/sub有些类似。
- 支持消息的有序性,可以通过消息的ID来控制消息的排序。
- 支持消费者组。streams 允许同一消费组内的消费者竞争消息,并提供了一系列机制允许消费者查看自己的历史消费消息。并允许监控streams的消费者组信息,消费者组内消费者信息,也可以监控streams内消息的状态。
三、Stream的结构
解释:
1. 消费者组
:Consumer Group,即使用XGROUP CREATE
命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。
1. 同一条消息,只能被这个消费者组中的某个消费者获取。
2. 多个消费者之间是相互独立的,互不干扰。
2. 消费者
: Consumer 消费消息。
3. last_delivered_id
: 这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动一位,保证消费者不会读取到重复的消息。
4. pending_ids
:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack
命令。这样就确保了某个消息一定会被执行一次。
5. 消息内容:
是一个键值对
的格式。
6. Stream 中 消息的 ID:
默认情况下,ID使用 *
,redis可以自动生成一个,格式为 时间戳-序列号
,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。
四、Stream的命令
1、XADD 往Stream末尾添加消息
1、命令格式
xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
2、举例
xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)
1、向流中增加一条数据,
127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中增加一个 username 是zhangsan的数据 *表示自动生成id
"1635999858912-0" # 返回的是ID
127.0.0.1:6379> keys *
1) "stream-key" # 可以看到stream自动创建了
127.0.0.1:6379>
2、向流中增加数据,不自动创建流
127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以加入失败
(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>
3、手动指定ID的值
127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是自己传递的1-1,而不是使用*自动生成
"1-1" # 返回的是id的值
127.0.0.1:6379>
4、设置一个固定大小的Stream
1、精确指定Stream的大小
指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。
2、模糊指定Stream的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
"1636001034141-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second
"1636001044506-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
"1636001057846-0"
127.0.0.1:6379> xinfo stream stream-key
1) "length"
2) (integer) 3
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1636001057846-0"
9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1636001034141-0"
2) 1) "first"
2) "first"
13) "last-entry"
14) 1) "1636001057846-0"
2) 1) "third"
2) "third"
127.0.0.1:6379>
~
模糊指定流的大小,可以看到指定的是1,实际上已经到了3.
2、XRANGE查看Stream中的消息
1、命令格式
xrange key start end [COUNT count]
2、准备数据
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636003499055-0"
127.0.0.1:6379>
使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样
3、举例
1、获取所有的数据(-
和+
的使用)
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
3) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
-:
表示最小id的值
+:
表示最大id的值
2、获取指定id范围内的数据,闭区间
127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
2) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
3、获取指定id范围内的数据,开区间
127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
1) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
127.0.0.1:6379>
(:
表示开区间
4、获取某个毫秒后所有的数据
127.0.0.1:6379> xrange stream-key 1636003481706 +
1) 1) "1636003481706-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636003481706-1"
2) 1) "username"
2) "lisi"
3) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
直接写毫秒
不写后面的序列号即可。
5、获取单条数据
127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
1) 1) "1636003499055-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
start
和end
的值写的一样即可获取单挑数据。
6、获取固定条数的数据
127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
2) 1) "username"
2) "zhangsan"
127.0.0.1:6379>
使用 count
进行限制
3、XREVRANGE反向查看Stream中的消息
XREVRANGE key end start [COUNT count]
使用方式和XRANGE
类似,略。
4、XDEL删除消息
1、命令格式
xdel key ID [ID ...]
2、准备数据
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636004189211-0"
127.0.0.1:6379>
3、举例
需求: 往Stream中加入3条消息,然后删除第2条消息
127.0.0.1:6379> xdel stream-key 1636004183638-0
(integer) 1 # 返回的是删除记录的数量
127.0.0.1:6379> xrang stream -key - +
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636004176924-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636004189211-0"
2) 1) "username"
2) "wangwu"
127.0.0.1:6379>
注意:
需要注意的是,我们从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。
6、XLEN查看Stream中元素的长度
1、命令格式
xlen key
2、举例
查看Stream中元素的长度
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>
注意:
如果xlen
后方的key
不存在则返回0,否则返回元素的个数。
7、XTRIM对Stream中的元素进行修剪
1、命令格式
xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]
2、准备数据
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636009745401-0"
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username wangwu
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636009763955-0"
2) "1636009763955-1"
127.0.0.1:6379> xadd stream-key * username zhaoliu
"1636009769625-0"
127.0.0.1:6379>
3、举例
1、maxlen精确限制
127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2个消息
(integer) 2
127.0.0.1:6379> xrange stream-key - + # 可以看到之前加入的2个消息被删除了
1) 1) "1636009763955-1"
2) 1) "username"
2) "wangwu"
2) 1) "1636009769625-0"
2) 1) "username"
2) "zhaoliu"
127.0.0.1:6379>
上方的意思是,保留stream-key
这个Stream中最后的2个消息。
2、minid模糊限制
minid 是删除比这个id小的数据,本地测试的时候没有测试出来
,略。
8、XREAD独立消费消息
XREAD
只是读取消息,读取完之后并不会删除消息。 使用XREAD
读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。
1、命令格式
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
2、准备数据
127.0.0.1:6379> xadd stream-key * username zhangsan
"1636011801365-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636011806261-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636011810905-0"
127.0.0.1:6379>
3、举例
1、获取用户名是wangwu的数据
127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据需要是 > 1636011806261-0
1) 1) "stream-key"
2) 1) 1) "1636011810905-0"
2) 1) "username"
2) "wangwu"
2、获取2条数据
127.0.0.1:6379> xread count 2 streams stream-key 0-0
1) 1) "stream-key"
2) 1) 1) "1636011801365-0"
2) 1) "username"
2) "zhangsan"
2) 1) "1636011806261-0"
2) 1) "username"
2) "lisi"
127.0.0.1:6379>
count
限制单次读取最后的消息,因为当前读取可能没有这么多。
3、非阻塞读取Stream对尾的数据
即读取队列尾的下一个消息,在非阻塞模式下始终是nil
127.0.0.1:6379> xread streams stream-key $
(nil)
4、阻塞读取Stream对尾的数据
注意:
$
表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xread block
执行后,再次使用xadd
添加消息后,xread block
才会返回。block 0
表示永久阻塞,当消息到来时,才接触阻塞。block 1000
表示阻塞1000ms,如果1000ms还没有消息到来,则返回nil
xread进行顺序消费
当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。xread
读取消息,完全无视消费组,此时Stream
就可以理解为一个普通的list。
4、对比pub/sub
XREAD允许我们从某一结点开始从streams中读取数据,它的语法为XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
,我们在这里主要将的是通过XREAD
来订阅到达streams新的数据。这种操作可能跟REDIS中原有的pub/sub
机制或者阻塞队列
的概念有些类似,都是等待一个key然后获取到新的数据,但是跟这两种有着本质的差别:
- streams跟
pub/sub
和阻塞队列
允许多个客户端一起等待数据,默认情况下,streams会把消息推送给所有等待streams数据的客户端,这个能力跟pub/sub
有点类似,但是streams也允许把消息通过竞争机制推送给其中的一个客户端(这种模式需要用到消费者组的概念,会在后边讲到)。 pub/sub
的消息是fire and forget并且从不存储,你只可以订阅到在你订阅时间之后产生的消息,并且消息只会推送给客户端一次,不能查看历史记录。以及使用阻塞队列
时,当客户端收到消息时,这个元素会从队列中弹出,换句话说,不能查看某个消费者消费消息的历史。而在streams中所有的消息会被无限期的加入到streams中(消息可以被显式的删除并且存在淘汰机制),客户端需要记住收到的最后一条消息,用于获取到节点之后的新消息。- Streams 消费者组提供了一种Pub/Sub或者阻塞列表都不能实现的控制级别,同一个Stream不同的群组,显式地确认已经处理的项目,检查待处理的项目的能力,申明未处理的消息,以及每个消费者拥有连贯历史可见性,单个客户端只能查看自己过去的消息历史记录。
从streams中读取数据127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream 0 1) 1) “mystream” 2) 1) 1) “1574835253335-0” 2) 1) “name” 2) “bob” 3) “age” 4) “23” 2) 1) “1574925508730-0” 2) 1) “name” 2) “dwj” 3) “age” 4) “18” 127.0.0.1:6379>
同list结构一样,streams也提供了阻塞读取的命令XREAD BLOCK 0 STREAMS mystream
在上边的命令中指定了BLOCK选项,超时时间为0毫秒(意味着永不会过期)。此外,这个地方使用了特殊的id
$
,这个特殊的id代表着当前streams中最大的id,这就意味着你只会读取streams中在你监听时间以后的消息。有点类似于Unix的tail -f
。另外XREAD可以同时监听多个流中的数据。
9、消费者组相关操作
如果我们想要的不是多个客户端处理相同的消息,而是多个客户端从streams中获取到不同的消息进行处理。也就是我们常用的生产者-消费者模型。假如想象我们具有两个生产者p1,p2,三个消费者c1,c2,c3以及7个商品。我们想按照下面的效果进行处理
p1 =>item1 => c1
p2 =>item2 => c2
p1 =>item3 => c3
p2 =>item4 => c1
p1 =>item5 => c2
p2 =>item6 => c3
p1 =>item7 => c1
为了解决这种场景,redis使用了一个名为消费者的概念,有点类似于kafka,但只是表现上。消费者组就像是一个伪消费者,它从流内读取数据,然后分发给组内的消费者,并记录该消费者组消费了哪些数据,处理了那些数据,并提供了一系列功能。
- 每条消息都提供给不同的消费者,因此不可能将相同的消息传递给多个消费者。
- 消费者在消费者组中通过名称来识别,该名称是实施消费者的客户必须选择的区分大小写的字符串。这意味着即便断开连接过后,消费者组仍然保留了所有的状态,因为客户端会重新申请成为相同的消费者。 然而,这也意味着由客户端提供唯一的标识符。
- 每一个消费者组都有一个第一个ID永远不会被消费的概念,这样一来,当消费者请求新消息时,它能提供以前从未传递过的消息。
- 消费消息需要使用特定的命令进行显式确认,表示:这条消息已经被正确处理了,所以可以从消费者组中逐出。
- 消费者组跟踪所有当前所有待处理的消息,也就是,消息被传递到消费者组的一些消费者,但是还没有被确认为已处理。由于这个特性,当访问一个Stream的历史消息的时候,每个消费者将只能看到传递给它的消息。
它的模型类似于如下
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
从上边的模型中我们可以看出消费者组记录处理的最后一条消息,将消息分发给不同的消费者,每个消费者只能看到自己的消息。如果把消费者组看做streams的辅助数据结构,我们可以看出一个streams可以拥有多个消费者组,一个消费者组内可以拥有多个消费者。实际上,一个streams允许客户端使用XREAD读取的同时另一个客户端通过消费者群组读取数据。
1、消费者组命令
2、准备数据
1、创建Stream的名称是 stream-key
2、创建2个消息,aa和bb
127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"
3、创建消费者组
1、创建一个从头开始消费的消费者组
xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(表示从头开始消费)
2、创建一个从Stream最新的一个消息消费的消费者组
xgroup create stream-key g2 $
$
表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。
4、创建一个从某个消息之后消费的消费者组
xgroup create stream-key g3 1636362619125-0 #1636362619125-0 这个是上方aa消息的id的值
1636362619125-0
某个消息的具体的ID,这个g3
消费者组中的消息都是大于>
这个id的消息。
3、从消费者中读取消息
127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取)
1) 1) "stream-key"
2) 1) 1) "1636362619125-0"
2) 1) "aa"
2) "aa"
2) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
(nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息
127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key > #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。
1) 1) "stream-key"
2) 1) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379>
4、读取消费者的pending消息
127.0.0.1:6379> xgroup create stream-key g4 0-0
OK
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 2
5) "idle"
6) (integer) 88792
127.0.0.1:6379> xinfo consumers stream-key g4
(empty array)
127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
1) 1) "stream-key"
2) 1) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
1) 1) "stream-key"
2) (empty array)
127.0.0.1:6379>
5、转移消费者的消息
127.0.0.1:6379> xpending stream-key g1 - + 10 c1
1) 1) "1636362619125-0"
2) "c1"
3) (integer) 2686183
4) (integer) 1
2) 1) "1636362623191-0"
2) "c1"
3) (integer) 102274
4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
1) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
2) "c2"
3) (integer) 17616
4) (integer) 8
127.0.0.1:6379>
也可以通过xautoclaim
来实现。
6、一些监控命令
1、查看消费组中消费者的pending消息
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
2) "c2"
3) (integer) 1247680
4) (integer) 8
127.0.0.1:6379>
2、查看消费组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
2) "c1"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1474864
2) 1) "name"
2) "c2"
3) "pending"
4) (integer) 1
5) "idle"
6) (integer) 1290069
127.0.0.1:6379>
3、查看消费组信息
127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 2
5) "pending"
6) (integer) 2
7) "last-delivered-id"
8) "1636362623191-0"
2) 1) "name"
2) "g2"
3) "consumers"
......
4、查看Stream信息
127.0.0.1:6379> xinfo stream stream-key
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1636362623191-0"
9) "groups"
10) (integer) 4
11) "first-entry"
12) 1) "1636362619125-0"
2) 1) "aa"
2) "aa"
13) "last-entry"
14) 1) "1636362623191-0"
2) 1) "bb"
2) "bb"
127.0.0.1:6379>
7、有几件事需要记住:
消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。 即使使用XREADGROUP,你也可以同时从多个key中读取,但是要让其工作,你需要给每一个Stream创建一个名称相同的消费者组。这并不是一个常见的需求,但是需要说明的是,这个功能在技术上是可以实现的。 XREADGROUP命令是一个写命令,因为当它从Stream中读取消息时,消费者组被修改了,所以这个命令只能在master节点调用。
五、与kafka partitions的区别
Consumer groups in Redis streams may resemble in some way Kafka ™ partitioning-based consumer groups, however note that Redis streams are, in practical terms, very different. The partitions are only logical and the messages are just put into a single Redis key, so the way the different clients are served is based on who is ready to process new messages, and not from which partition clients are reading. For instance, if the consumer C3 at some point fails permanently, Redis will continue to serve C1 and C2 all the new messages arriving, as if now there are only two logical partitions.
Redis streams 的消费组和Kafka的分区消费组有点像,然而Redis的streams是一个消息队列,它的消息是按照时间顺序排列的,而Kafka是一个分区的消息队列,它的消息是按照分区的顺序排列的,实际上区别很大。分区只是一个概念,消息是放在一个单一的Redis key中,不同的客户端是根据谁有新的消息来处理的,而不是根据客户端的分区来处理的。比如C3某个时候失败了,Redis会继续服务C1和C2,因为它们都有新的消息来了,而不是根据它们的分区来处理的。
Similarly, if a given consumer is much faster at processing messages than the other consumers, this consumer will receive proportionally more messages in the same unit of time. This is possible since Redis tracks all the unacknowledged messages explicitly, and remembers who received which message and the ID of the first message never delivered to any consumer.
如果某个消费者比其他消费者更快地处理消息,那么这个消费者将比其他消费者接收到的消息比例更高。这是可能的,因为Redis记录了所有未确认的消息,并记住了每个消费者接收到的消息和从没有消费者接收到过第一个消息的ID,。
However, this also means that in Redis if you really want to partition messages in the same stream into multiple Redis instances, you have to use multiple keys and some sharding system such as Redis Cluster or some other application-specific sharding system. A single Redis stream is not automatically partitioned to multiple instances.
然而,这也意味这如果你真的想要将一个消息队列分区到多个Redis实例,你需要使用多个key,并且使用一个分区算法,类似Redis集群或者一个其他的应用程序特定的分区算法。一个Redis stream不会自动分区到多个实例。
We could say that schematically the following is true:
- If you use 1 stream -> 1 consumer, you are processing messages in order.
- If you use N streams with N consumers, so that only a given consumer hits a subset of the N streams, you can scale the above model of 1 stream -> 1 consumer.
- If you use 1 stream -> N consumers, you are load balancing to N consumers, however in that case, messages about the same logical item may be consumed out of order, because a given consumer may process message 3 faster than another consumer is processing message 4.
大致效果如下: - 如果1个stream -> 1个消费者,消费消息顺序处理。 - 如果用N个streams和N个消费者,那么只有一个消费者会访问N个streams,你可以把上面的模型缩小为1个stream -> 1个消费者。 - 如果1个stream -> N个消费者,那么你可以负载均衡到N个消费者,但是在这种情况下,消息可能不会按顺序执行,因为一个消费者可能处理消息3比另一个消费者处理消息4慢。
So basically Kafka partitions are more similar to using N different Redis keys, while Redis consumer groups are a server-side load balancing system of messages from a given stream to N different consumers.
所以Kafka分区和使用N个不同的Redis keys更加相似,而Redis消费组是一个服务器端的消息负载均衡系统,从一个消息队列分区到N个不同的消费者。
六、设置streams上限
如果从streams可以查看到历史记录,我们可能会有疑惑,如果streams无限期的加入内存会不会够用,一旦消息数量达到上限,将消息永久删除或者持久化到数据库都是有必要的,redis也提供了诸如此类场景的支持。
一种方法是我们使用XADD
的时候指定streams的最大长度,XADD mystream MAXLEN ~ 1000
其中的数值前可以加上~
标识不需要精确的将长度保持在1000,比1000多一些也可以接受。如果不使用该标识,性能会差一些。另一种方法是使用XTRIM
,该命令也是使用MAXLEN
选项,> XTRIM mystream MAXLEN ~ 10
七、一些特殊的id
前面提到了在streams API里边存在一些特殊的id。
首先是-
和+
,这两个ID在XRANGE
命令中使用,分别代表最小的id和最大的id。-
代表0-1
,+
代表18446744073709551615-18446744073709551615
,从使用上方便了很多。在XPENDING
等范围查询中都可以使用。
$
代表streams中当前存在的最大的id,在XREAD
和XGROUP
中代表只获取新到的消息。需要注意的是$
跟+
的含义并不一致。
还有一个特殊的id是>
,这个id只能够在XREADGROUP
命令中使用,意味着在这个消费者群组中,从来没有分配给其他的消费者,所以总是使用>
作为群组中的last delivered ID
。
八、持久化,复制和消息安全性
与redis的其它数据结构一样,streams会异步复制到从节点,并持久化到AOF和RDB文件中,并且消费者群组的状态也会按照此机制进行持久化。 需要注意的几点是:
- 如果消息的持久化以及状态很重要,则AOF必须使用强fsync配合(AOF记录每一条更改redis数据的命令,有很多种持久化机制,在这个地方要用到的是
appendfsync always
这样会严重降低Redis的速度) - 默认情况下,异步复制不能保证从节点的数据与主节点保持一致,在故障转移以后可能会丢失一些内容,这跟从节点从主节点接受数据的能力有关。
WAIT
命令可以用于强制将更改传输到一组从节点上。虽然这使得数据不太可能会丢失,但是redis的Sentinel和cluster在进行故障转移的时候不一定会使用具有最新数据的从节点,在一些特殊故障下,反而会使用缺少一些数据的从节点。 因此在使用redis streams和消费者群组在设计程序的时候,确保了解你的应用程序在故障期间的应对策略,并进行相应地配置,评估它对你的程序是否足够安全。
九、从streams中删除数据
删除streams中的数据使用XDEL
命令,其语法为XDEL key ID [ID ...]
,需要注意的是在当前的实现中,在宏节点完全为空之前,内存并没有真正回收,所以你不应该滥用这个特性。
十、streams的性能
streams的不阻塞命令,比如XRANGE
或者不使用BLOCK选项的XREAD
和XREADGROUP
跟redis普通命令一致,所以没有必要讨论。如果有兴趣的话可以在redis的文档中查看到对应命令的时间复杂度。streams命令的速度在一定范围内跟set
是一致的,XADD
命令的速度非常快,在一个普通的机器上,一秒钟可以插入50w~100w条数据。
我们感兴趣的是在消费者群组的阻塞场景下,从通过XADD
命令向streams中插入一条数据,到消费者通过群组读取到这条消息的性能。
为了测试消息从产生到消费间的延迟,我们使用ruby程序进行测试,将消息的产生时间作为消息的一个字段,然后把消息推送到streams中,客户端收到消息后使用当前时间跟生产时间进行对比,从而计算出消息的延迟时间。这个程序未进行性能优化,运行在一个双核的机器上,同时redis也运行在这台机器上,以此来模拟不是理想条件下的场景。消息每秒钟产生1w条,群组内有10个消费者消费数据。测试结果如下:
Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%
99.9%的请求的延迟小于等于2毫秒,而且异常值非常接近平均值。另外需要注意的两点:
- 消费者每次处理1w条消息,这样增加了一些延迟,这样做是为了消费速度较慢的消费者能够保持保持消息流。
- 用来做测试的系统相比于现在的系统非常慢。
五、参考文档
1、https://redis.io/topics/streams-intro
2、https://www.runoob.com/redis/redis-stream.html
3、https://zhuanlan.zhihu.com/p/95483316