从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN等等。

请注意,在阅读本页之前,如果你不了解Stream,我们推荐先阅读我们的Redis Streams介绍

非阻塞使用

如果未提供BLOCK选项,此命令是同步的,并可以认为与XRANGE有些相关:它将会返回流中的一系列项目,但与XRANGE相比它有两个基本差异(如果我们只考虑同步使用):

  • 如果我们想要从多个键同时读取,则可以使用多个流调用此命令。这是XREAD的一个关键特性,因为特别是在使用BLOCK进行阻塞时,能够通过单个连接监听多个键是一个至关重要的特性。
  • XRANGE返回一组ID中的项目,XREAD更适合用于从第一个条目(比我们到目前为止看到的任何其他条目都要大)开始使用流。因此,我们传递给XREAD的是,对于每个流,我们从该流接收的最后一个条目的ID。

例如,如果我有两个流mystreamwriters,并且我希望同时从这两个流中读取数据(从它们的第一个元素开始), 我可以像下面这样调用XREAD

请注意:我们在例子中使用了COUNT选项,因此对于每一个流,调用将返回每个流最多两个元素。

> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

STREAMS选项是强制的,并且必须是最后一个选项,因为此选项以下列格式获取可变长度的参数:

STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N

所以我们以一组流的key开始,并在后面跟着所有关联的ID,表示我们从该流中获取的最后ID,以便调用仅为我们提供同一流中具有更大ID的条目。

例如,在上面的例子中,我们从流mystream中接收的最后项目的ID是1526999352406-0,而对于流writers,我们接收的最后项目的ID是1526985685298-0

要继续迭代这两个流,我将调用:

> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
   2) 1) 1) 1526999626221-0
         2) 1) "duration"
            2) "911"
            3) "event-id"
            4) "7"
            5) "user-id"
            6) "9488232"
2) 1) "writers"
   2) 1) 1) 1526985691746-0
         2) 1) "name"
            2) "Toni"
            3) "surname"
            4) "Morris"
      2) 1) 1526985712947-0
         2) 1) "name"
            2) "Agatha"
            3) "surname"
            4) "Christie"

以此类推,最终,调用不再返回任何项目,只返回一个空数组,然后我们就知道我们的流中没有更多数据可以获取了(我们必须重试该操作,因此该命令也支持阻塞模式)。

不完全ID

使用不完整的ID是有效的,就像它对XRANGE一样有效。但是这里ID的序列号部分,如果缺少,将总是被解释为0,所以命令:

> XREAD COUNT 2 STREAMS mystream writers 0 0

完全等同于

> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0

阻塞数据

在同步形式中,只要有更多可用项,该命令就可以获取新数据。但是,有些时候,我们不得不等待数据生产者使用XADD向我们消费的流中推送新条目。为了避免使用固定或自适应间隔获取数据,如果命令根据指定的流和ID不能返回任何数据,则该命令能够阻塞,并且一旦请求的key之一接收了数据,就会自动解除阻塞。

重要的是需要理解这个命令是扇形分发到所有正在等待相同ID范围的客户端,因此每个消费者都将得到一份数据副本,这与使用阻塞列表pop操作时发生的情况不同。

为了阻塞,使用BLOCK选项,以及我们希望在超时前阻塞的毫秒数。通常Redis阻塞命令的超时时间单位是秒,但此命令拥有一个毫秒超时时间,虽然通常服务器的超时时间精度大概在0.1秒左右。这可以在某些用例中阻塞更短的时间,并且如果服务器内部结构随着时间的推移而改善,它的超时精度可能也会有提升。

当传递了BLOCK选项,但是在传递的流中没有任何流有数据返回,那么该命令是同步执行的,就像没有指定BLOCK选项一样

这是阻塞调用的例子,其中命令稍后将返回空回复(nil),因为超时时间已到但没有新数据到达:

> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)

特殊的ID$

有时阻塞我们只希望接收从我们阻塞的那一刻开始通过XADD添加到流的条目。在这种情况下,我们对已经添加条目的历史不感兴趣。对于这个用例,我们不得不检查流的最大元素ID,并使在XREAD命令行中使用这个ID。这不纯粹并且需要调用其他命令,因此可以使用特殊的ID$来表明我们只想要流中的新条目。

你应该仅在第一次调用XREAD时使用$,理解这一点非常重要。后面的ID你应该使用前一次报告的项目中最后一项的ID,否则你将会丢失所有添加到这中间的条目。

这是典型的XREAD调用(在想要仅消费新条目的消费者的第一次迭代)看起来的样子:

> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $

一旦我们得到了一些回复,下一次调用会是像这样的:

> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3

依此类推。

如何在单个流上阻止多个客户端

列表或有序集合上的阻塞列表操作有pop行为。基本上,元素将从列表或有序集合中删除,以便返回给客户端。 在这种场景下,你希望项目以公平的方式被消费,具体取决于客户端在给定key上阻止的时刻到达。通常在这种用例中,Redis使用FIFO语义。

但请注意,对于流这不是问题:当服务客户端时,不会从流中删除条目,因此只要XADD命令向流提供数据,就会提供给每个等待的客户端。

返回值

array-reply

该命令返回一个结果数组:返回数组的每个元素都是一个由两个元素组成的数组(键名和为该键报告的条目)。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。返回的条目及其字段和值的顺序与使用XADD添加它们的顺序完全一致。

当使用BLOCK时,超时时将返回一个空回复(nil)。

为了更多地了解流的整体行为和语义,强烈建议阅读Redis Streams介绍



关于本文翻译者

网名:eson
github:helloeson
打赏他(备注rediscn)
微信