English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Communication de file d'attente Python : utilisation de RabbitMQ (explication pratique)

(一)、Préambule

Pourquoi introduire la file d'attente de messages ?

1.Dés耦程序

2.Améliorer les performances

3.Réduire la complexité des logiques des services multiples

(二)、操作 RabbitMQ avec Python

La configuration, l'installation et l'utilisation de base de RabbitMQ sont vues dans l'article précédent, et ne seront pas répétées.

Pour utiliser Python pour操作 RabbitMQ, vous devez installer le module pika, installez-le directement avec pip :

pip install pika

1.La conversation la plus simple entre le producteur et le consommateur côté RabbitMQ :

producer:

#Author :ywq
import pika
auth = pika.PlainCredentials('ywq', 'qwe') #enregistrer les informations d'authentification
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #se connecter à rabbit
channel = connection.channel() #créer le canal
channel.queue_declare(queue='hello') #déclarer la file
#n Dans RabbitMQ, un message ne peut jamais être envoyé directement à la file, il doit toujours passer par un échange.
channel.basic_publish(exchange='',
   routing_key='hello',
   body='Hello World!') #le contenu du corps est le message
print(" [x] Envoyé 'Hello World!'")
connection.close()

consumer:

#Author :ywq
import pika
auth=pika.PlainCredentials('ywq','qwe') #infos d'authentification
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #se connecter à rabbit
channel = connection.channel()  #créer un canal
channel.queue_declare(queue='hello') #déclarer la file
def callback(ch, method, properties, body):
 print(" [x] Reçu %r" % body)
channel.basic_consume(callback,
   queue='hello',
   no_ack=True)
print(' [*] En attente de messages. Pour quitter, appuyez sur CTRL+C')
channel.start_consuming()

Pendant le processus de transmission et de consommation des messages, vous pouvez consulter en temps réel les informations des messages de la file sur la page de gestion web de Rabbit.

2.La file de messages persistants, pour éviter que la panne ou d'autres situations imprévues ne causent la perte de la file de messages.

Le côté consommateur ne nécessite pas de modification, ajoutez deux propriétés dans le code du côté producteur pour rendre les messages et la file persistants, il est nécessaire de les activer tous deux pour éviter la perte de messages :

delivery_mode=2 #rendre msg persistant
durable=True

L'emplacement de l'insertion des propriétés est indiqué dans le code suivant (côté producteur) :

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth_info
 ))
channel=connection.channel()
channel.queue_declare(queue='test1#durable=True, rendre la file persistante
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
 exchange='',
 routing_key='test1',
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2 #rendre msg persistant
 )
)
print('Send done:',msg)
connection.close()

3.Distribution équitable

Dans le cas de plusieurs consumers, par défaut, Rabbit distribue les messages en rotation, mais certains consumers consomment plus rapidement que d'autres. Pour une utilisation des ressources plus équilibrée, une mécanisme de confirmation ack est introduit. Une fois que le consumer a consommé un message, il envoie un ack à Rabbit, et dès que le nombre de messages non ack dépasse le nombre autorisé, il ne s'envoie plus au consumer concerné, mais plutôt à d'autres consumers.

Le code côté producteur ne nécessite pas de modification, mais il faut insérer deux propriétés dans le code côté consumer :

channel.basic_qos(prefetch_count= *) #définir le nombre maximal de non_ack_count
channel.basic_ack(delivery_tag=deliver.delivery_tag) #envoyer ack à RabbitMQ

L'emplacement d'insertion des propriétés est indiqué dans le code suivant (consumer côté) :

#Author :ywq
import pika,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.queue_declare(queue='test2',durable=True)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 time.sleep(5)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #envoyer un ack à rabbit
channel.basic_qos(prefetch_count=1)
'''
Attention, no_ack=False, attention, ici le type no_ack ne fait que dire à Rabbit si ce consumer retourne un ack, si vous voulez retourner un ack, vous devez le définir dans le callback.
prefetch_count=1, le nombre de messages non ack dépassant1s'il y a des messages, ce consumer ne recevra plus de msg, cette configuration doit être écrite au-dessus de channel.basic_consume, sinon cela pourrait entraîner l'apparition de la situation non_ack.
'''
channel.basic_consume(
 callback,
 queue='test2'
)
channel.start_consuming()

Troisième partie : Publication de messages/Abonnement

Les différents modes ci-dessus consistent en l'envoi d'un message par le producteur, suivi d'une réception par le consumer, mais est-il possible d'implémenter un producteur qui envoie un message et plusieurs consumers associés qui reçoivent simultanément ? Bien sûr, Rabbit supporte la publication et l'abonnement aux messages, et supporte trois modes, réalisés par le composant transmetteur exchange.3Différents modes :

fanout: Toutes les queues liées à cet échange peuvent recevoir des messages, similaire à une diffusion en broadcast.

direct: La réception de messages est déterminée par le routingKey et l'échange, qui détermine la queue unique qui peut recevoir des messages, et ces messages sont envoyés aux consumers liés à cette queue, similaire à une diffusion en multicast.

topic : toutes les files d'attente bindées aux routingKey (à ce moment, cela peut être une expression) qui correspondent au routingKey peuvent recevoir des messages, similaire à la correspondance de liste d'adresse préfixée de la route.

1.fanout

côté publication (producteur) :

#Author :ywq
import pika,sys,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='hello',
    exchange_type='fanout'
    )
msg=''.join(sys.argv[1:]) ou 'Hello world %s' %time.time()
channel.basic_publish(
 exchange='hello',
 routing_key='',
 body=msg,
 properties=pika.BasicProperties(
 delivery_mode=2
 )
)
print('send done')
connection.close()

abonnement côté (consumer) :

#Author :ywq
import pika
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(
 exchange='hello',
 exchange_type='fanout'
)
random_num=channel.queue_declare(exclusive=True) #établir une file d'attente aléatoire avec rabbit, la file d'attente est supprimée et libérée immédiatement après la déconnexion du consumer
queue_name=random_num.method.queue
channel.basic_qos(prefetch_count=1)
channel.queue_bind(
 queue=queue_name,
 exchange='hello'
)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #envoyer un ack à rabbit
channel.basic_consume(
 callback,
 queue=queue_name,
)
channel.start_consuming()

realiser que le producteur envoie une fois, plusieurs consumers associés reçoivent.

Lors de l'utilisation du mode exchange :

1.producer ne déclare plus la file d'attente, mais déclare directement l'échange

2.consumer il doit toujours binder la file d'attente et spécifier l'échange pour recevoir le message

3.consumer il est préférable de créer une file d'attente aléatoire, libérer immédiatement après utilisation.

Le nom de la file d'attente aléatoire peut être détecté sous web :

2.direct

l'utilisation de l'exchange permet à la consumer de recevoir les messages de manière sélective. Les mots-clés de binding de la file d'attente, le producteur envoie les données selon les mots-clés à l'échange de messages, l'échange juge selon les mots-clés s'il doit envoyer les données à la file d'attente spécifique, et la consumer reçoit en conséquence. C'est-à-dire qu'il a été ajouté un routing key sur la base de fanout.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='direct_log',
   exchange_type='direct',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='direct_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='direct_log',
 exchange_type='direct',
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
 queue=queue_name,
 exchange='direct_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[level:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

Ouvrir plusieurs consumers en même temps, deux reçoivent notice, deux reçoivent warning, l'effet de exécution est le suivant :

3.topic

Par rapport à direct, topic peut réaliser un mode de travail de correspondance floue (spécifié au niveau du consumer), tant que le routing key contient les mots-clés spécifiés, le msg sera envoyé vers la queue liée.

rabbitmq règle de caractères génériques :

Le symbole ‘#’ correspond à un ou plusieurs mots, et le symbole ‘’ correspond à un mot. Par conséquent, ‘abc.#’ pourrait correspondre à ‘abc.m.n’, mais ‘abc.*‘' ne serait pas correspondant à ‘abc.m’.' Le point d'exclamation est utilisé comme séparateur. Lors de l'utilisation des caractères génériques, le point d'exclamation doit être utilisé comme séparateur.

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='topic_log',
   exchange_type='topic',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='topic_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='topic_log',
 exchange_type='topic'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
 queue=queue_name,
 exchange='topic_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[type:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

Effet de exécution :

Trois types de publish rabbitmq/Présentation simple du modèle subscribe terminée.

Cet article sur la communication de file d'attente Python : l'utilisation de rabbitMQ (explication d'exemple) est tout ce que l'éditeur partage avec vous. J'espère que cela vous servira de référence et que vous soutiendrez également le tutoriel à cri

Déclaration : le contenu de cet article est tiré d'Internet, propriété de l'auteur original, le contenu est contribué et téléchargé par les utilisateurs d'Internet, ce site ne détient pas de propriété, n'a pas été traité par l'éditeur humain et n'assume aucune responsabilité juridique. Si vous trouvez du contenu suspect de violation de droits d'auteur, veuillez envoyer un e-mail à : notice#oldtoolbag.com (veuillez remplacer # par @ lors de l'envoi d'un e-mail pour signaler une violation, et fournir des preuves pertinentes. Une fois vérifié, ce site supprimera immédiatement le contenu suspect de violation de droits d'auteur.)

Vous pourriez aimer