English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
(一)、Préambule
Pourquoi introduire la file d'attente de messages ?
1.Dés耦程序
2.Améliorer les performances
3.Réduire la complexité des logiques des services multiples
(二)、操作 RabbitMQ avec Python
La configuration, l'installation et l'utilisation de base de RabbitMQ sont vues dans l'article précédent, et ne seront pas répétées.
Pour utiliser Python pour操作 RabbitMQ, vous devez installer le module pika, installez-le directement avec pip :
pip install pika
1.La conversation la plus simple entre le producteur et le consommateur côté RabbitMQ :
producer:
#Author :ywq import pika auth = pika.PlainCredentials('ywq', 'qwe') #enregistrer les informations d'authentification connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #se connecter à rabbit channel = connection.channel() #créer le canal channel.queue_declare(queue='hello') #déclarer la file #n Dans RabbitMQ, un message ne peut jamais être envoyé directement à la file, il doit toujours passer par un échange. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') #le contenu du corps est le message print(" [x] Envoyé 'Hello World!'") connection.close()
consumer:
#Author :ywq import pika auth=pika.PlainCredentials('ywq','qwe') #infos d'authentification connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #se connecter à rabbit channel = connection.channel() #créer un canal channel.queue_declare(queue='hello') #déclarer la file def callback(ch, method, properties, body): print(" [x] Reçu %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] En attente de messages. Pour quitter, appuyez sur CTRL+C') channel.start_consuming()
Pendant le processus de transmission et de consommation des messages, vous pouvez consulter en temps réel les informations des messages de la file sur la page de gestion web de Rabbit.
2.La file de messages persistants, pour éviter que la panne ou d'autres situations imprévues ne causent la perte de la file de messages.
Le côté consommateur ne nécessite pas de modification, ajoutez deux propriétés dans le code du côté producteur pour rendre les messages et la file persistants, il est nécessaire de les activer tous deux pour éviter la perte de messages :
delivery_mode=2 #rendre msg persistant durable=True
L'emplacement de l'insertion des propriétés est indiqué dans le code suivant (côté producteur) :
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.queue_declare(queue='test1#durable=True, rendre la file persistante msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='', routing_key='test1', body=msg, properties=pika.BasicProperties( delivery_mode=2 #rendre msg persistant ) ) print('Send done:',msg) connection.close()
3.Distribution équitable
Dans le cas de plusieurs consumers, par défaut, Rabbit distribue les messages en rotation, mais certains consumers consomment plus rapidement que d'autres. Pour une utilisation des ressources plus équilibrée, une mécanisme de confirmation ack est introduit. Une fois que le consumer a consommé un message, il envoie un ack à Rabbit, et dès que le nombre de messages non ack dépasse le nombre autorisé, il ne s'envoie plus au consumer concerné, mais plutôt à d'autres consumers.
Le code côté producteur ne nécessite pas de modification, mais il faut insérer deux propriétés dans le code côté consumer :
channel.basic_qos(prefetch_count= *) #définir le nombre maximal de non_ack_count channel.basic_ack(delivery_tag=deliver.delivery_tag) #envoyer ack à RabbitMQ
L'emplacement d'insertion des propriétés est indiqué dans le code suivant (consumer côté) :
#Author :ywq import pika,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.queue_declare(queue='test2',durable=True) def callback(chann,deliver,properties,body): print('Recv:',body) time.sleep(5) chann.basic_ack(delivery_tag=deliver.delivery_tag) #envoyer un ack à rabbit channel.basic_qos(prefetch_count=1) ''' Attention, no_ack=False, attention, ici le type no_ack ne fait que dire à Rabbit si ce consumer retourne un ack, si vous voulez retourner un ack, vous devez le définir dans le callback. prefetch_count=1, le nombre de messages non ack dépassant1s'il y a des messages, ce consumer ne recevra plus de msg, cette configuration doit être écrite au-dessus de channel.basic_consume, sinon cela pourrait entraîner l'apparition de la situation non_ack. ''' channel.basic_consume( callback, queue='test2' ) channel.start_consuming()
Troisième partie : Publication de messages/Abonnement
Les différents modes ci-dessus consistent en l'envoi d'un message par le producteur, suivi d'une réception par le consumer, mais est-il possible d'implémenter un producteur qui envoie un message et plusieurs consumers associés qui reçoivent simultanément ? Bien sûr, Rabbit supporte la publication et l'abonnement aux messages, et supporte trois modes, réalisés par le composant transmetteur exchange.3Différents modes :
fanout: Toutes les queues liées à cet échange peuvent recevoir des messages, similaire à une diffusion en broadcast.
direct: La réception de messages est déterminée par le routingKey et l'échange, qui détermine la queue unique qui peut recevoir des messages, et ces messages sont envoyés aux consumers liés à cette queue, similaire à une diffusion en multicast.
topic : toutes les files d'attente bindées aux routingKey (à ce moment, cela peut être une expression) qui correspondent au routingKey peuvent recevoir des messages, similaire à la correspondance de liste d'adresse préfixée de la route.
1.fanout
côté publication (producteur) :
#Author :ywq import pika,sys,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='hello', exchange_type='fanout' ) msg=''.join(sys.argv[1:]) ou 'Hello world %s' %time.time() channel.basic_publish( exchange='hello', routing_key='', body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) print('send done') connection.close()
abonnement côté (consumer) :
#Author :ywq import pika auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare( exchange='hello', exchange_type='fanout' ) random_num=channel.queue_declare(exclusive=True) #établir une file d'attente aléatoire avec rabbit, la file d'attente est supprimée et libérée immédiatement après la déconnexion du consumer queue_name=random_num.method.queue channel.basic_qos(prefetch_count=1) channel.queue_bind( queue=queue_name, exchange='hello' ) def callback(chann,deliver,properties,body): print('Recv:',body) chann.basic_ack(delivery_tag=deliver.delivery_tag) #envoyer un ack à rabbit channel.basic_consume( callback, queue=queue_name, ) channel.start_consuming()
realiser que le producteur envoie une fois, plusieurs consumers associés reçoivent.
Lors de l'utilisation du mode exchange :
1.producer ne déclare plus la file d'attente, mais déclare directement l'échange
2.consumer il doit toujours binder la file d'attente et spécifier l'échange pour recevoir le message
3.consumer il est préférable de créer une file d'attente aléatoire, libérer immédiatement après utilisation.
Le nom de la file d'attente aléatoire peut être détecté sous web :
2.direct
l'utilisation de l'exchange permet à la consumer de recevoir les messages de manière sélective. Les mots-clés de binding de la file d'attente, le producteur envoie les données selon les mots-clés à l'échange de messages, l'échange juge selon les mots-clés s'il doit envoyer les données à la file d'attente spécifique, et la consumer reçoit en conséquence. C'est-à-dire qu'il a été ajouté un routing key sur la base de fanout.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='direct_log', exchange_type='direct', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='direct_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='direct_log', exchange_type='direct', ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='direct_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[level:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Ouvrir plusieurs consumers en même temps, deux reçoivent notice, deux reçoivent warning, l'effet de exécution est le suivant :
3.topic
Par rapport à direct, topic peut réaliser un mode de travail de correspondance floue (spécifié au niveau du consumer), tant que le routing key contient les mots-clés spécifiés, le msg sera envoyé vers la queue liée.
rabbitmq règle de caractères génériques :
Le symbole ‘#’ correspond à un ou plusieurs mots, et le symbole ‘’ correspond à un mot. Par conséquent, ‘abc.#’ pourrait correspondre à ‘abc.m.n’, mais ‘abc.*‘' ne serait pas correspondant à ‘abc.m’.' Le point d'exclamation est utilisé comme séparateur. Lors de l'utilisation des caractères génériques, le point d'exclamation doit être utilisé comme séparateur.
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='topic_log', exchange_type='topic', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='topic_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='topic_log', exchange_type='topic' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='topic_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[type:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
Effet de exécution :
Trois types de publish rabbitmq/Présentation simple du modèle subscribe terminée.
Cet article sur la communication de file d'attente Python : l'utilisation de rabbitMQ (explication d'exemple) est tout ce que l'éditeur partage avec vous. J'espère que cela vous servira de référence et que vous soutiendrez également le tutoriel à cri
Déclaration : le contenu de cet article est tiré d'Internet, propriété de l'auteur original, le contenu est contribué et téléchargé par les utilisateurs d'Internet, ce site ne détient pas de propriété, n'a pas été traité par l'éditeur humain et n'assume aucune responsabilité juridique. Si vous trouvez du contenu suspect de violation de droits d'auteur, veuillez envoyer un e-mail à : notice#oldtoolbag.com (veuillez remplacer # par @ lors de l'envoi d'un e-mail pour signaler une violation, et fournir des preuves pertinentes. Une fois vérifié, ce site supprimera immédiatement le contenu suspect de violation de droits d'auteur.)