English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Redis Stream

Redis Stream est une 50 nouvelle structure de données ajoutée.

Redis Stream est principalement utilisé pour la file d'attente de messages (MQ, Message Queue), Redis a une Redis publish/subscribe (pub/sub) permet de réaliser les fonctions de file d'attente de messages, mais il a un inconvénient : les messages ne peuvent pas être persistants, si une interruption de réseau, une panne de Redis se produit, les messages seront perdus.

En termes simples, le publish/subscribe (pub/sub) peut distribuer des messages, mais ne peut pas enregistrer les messages historiques.

Redis Stream offre la persistance des messages et la réplication principal/secondaire, permettant à tout client d'accéder aux données à tout moment et en mémorisant la position d'accès de chaque client, tout en garantissant que les messages ne sont pas perdus.

La structure de Redis Stream est présentée comme suit, elle contient une liste de messages qui relie tous les messages ajoutés, chaque message a un ID unique et un contenu correspondant :

Chaque Stream a un nom unique, c'est-à-dire la clé de Redis, qui est créée automatiquement lors de notre première utilisation de l'instruction xadd pour ajouter un message.

Analyse de l'image ci-dessus :

  • Consumer Group :Groupe de consommateurs, créé par la commande XGROUP CREATE, un groupe de consommateurs contient plusieurs consommateurs (Consumer).

  • last_delivered_id :Curseur, chaque groupe de consommateurs aura un curseur last_delivered_id, chaque fois qu'un consommateur lit un message, le curseur last_delivered_id avancera.

  • pending_ids :L'état variable du consommateur (Consumer) a pour fonction de maintenir les IDs non confirmés du consommateur. pending_ids enregistre les messages lus par le client, mais pas encore ack (Acknowledge character : caractère de confirmation).

Commandes liées à la file de messages :

  • XADD - Ajouter un message à la fin

  • XTRIM - Tronquer le flux pour limiter la longueur

  • XDEL - Supprimer des messages

  • XLEN - Obtenir le nombre d'éléments contenus dans le flux, c'est-à-dire la longueur des messages

  • XRANGE - Obtenir une liste de messages, qui filtrera automatiquement les messages supprimés

  • XREVRANGE -  Obtenir une liste de messages dans l'ordre décroissant

  • XREAD - Obtenir une liste de messages de manière bloquée ou non bloquée

Commandes liées au groupe de consommateurs :

  • XGROUP CREATE - Créer un groupe de consommateurs

  • XREADGROUP GROUP - Lire les messages dans le groupe de consommateurs

  • XACK - Marquer les messages comme "traités"

  • XGROUP SETID - Définir un nouveau dernier ID de message livré pour le groupe de consommateurs

  • XGROUP DELCONSUMER - Supprimer le consommateur

  • XGROUP DESTROY - Supprimer le groupe de consommateurs

  • XPENDING - Afficher les informations sur les messages en attente

  • XCLAIM - Transférer la propriété des messages

  • XINFO - Voir les informations sur le flux et le groupe de consommateurs

  • XINFO GROUPS - Imprimer les informations du groupe de consommateurs

  • XINFO STREAM - Imprimer les informations du flux

XADD

Utilisez XADD pour ajouter des messages à la file d'attente, si le fichier de queue spécifié n'existe pas, il est créé, la syntaxe de XADD est :

XADD clé ID champ valeur [champ valeur ...]
  • key :队列名称,如果不存在就创建

  • ID :id de message, nous utilisons * représente généré par redis, peut être personnalisé, mais il faut s'assurer de la progression croissante.

  • valeur champ :enregistrement.

redis> XADD mystream * nom Sara prénom O'Connor
"1601372323627-0"
redis> XADD mystream * champ1 valeur1 champ2 valeur2 champ3 valeur3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) ""1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "O'Connor"
2) 1) ""1601372323627-1"
   2) 1) "champ"1"
      2) "valeur"1"
      3) "champ"2"
      4) "valeur"2"
      5) "champ"3"
      6) "valeur"3"
redis>

XTRIM

Utilisez XTRIM pour tronquer le flux, limiter la longueur, la syntaxe est :

XTRIM clé MAXLEN [~] count
  • key :nom du fichier de queue

  • MAXLEN :longueur

  • count :数量

127.0.0.1:6379> XADD mystream * champ1 A champ2 B champ3 C champ4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(entier) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) ""1601372434568-0"
   2) 1) "champ"1"
      2) "A"
      3) "champ"2"
      4) "B"
      5) "champ"3"
      6) "C"
      7) "champ"4"
      8) "D"
127.0.0.1:6379> 
redis>

XDEL

Utilisez XDEL pour supprimer des messages, la syntaxe est :

XDEL clé ID [ID ...]
  • key:nom du fichier de queue

  • ID :消息 ID

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) ""1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) ""3"

XLEN

Utilisez XLEN pour obtenir le nombre d'éléments contenus dans le flux, c'est-à-dire la longueur des messages, la syntaxe est :

XLEN clé
  • key:nom du fichier de queue

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名

  • start :开始值, - 表示最小值

  • end :结束值, + 表示最大值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) ""1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) ""1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名

  • end :结束值, + 表示最大值

  • start :开始值, - 表示最小值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) ""1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量

  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式

  • key :队列名

  • id :消息 ID

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) ""1532"
            3) "event"-id"
            4) ""5"
            5) "user"-id"
            6) ""7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) ""812"
            3) "event"-id"
            4) ""9"
            5) "user"-id"
            6) ""388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id]-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建

  • groupname :组名。

  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名

  • consumer :消费者名。

  • count : 读取数量。

  • milliseconds : 阻塞毫秒数。

  • key : 队列名。

  • ID : 消息 ID。

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >