在 python/pika 中消费多个队列

Consume multiple queues in python / pika(在 python/pika 中消费多个队列)
本文介绍了在 python/pika 中消费多个队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着跟版网的小编来一起学习吧!

问题描述

我正在尝试创建一个订阅多个队列的消费者,然后在消息到达时对其进行处理.

I am trying to create a consumer that would subscribe to multiple queues, and then process messages as they arrive.

问题是,当第一个队列中已经存在一些数据时,它会消耗第一个队列,而永远不会去消耗第二个队列.但是,当第一个队列为空时,它确实会转到下一个队列,然后同时消耗两个队列.

The problem is that when there is some data already present in the first queue, it consumes the first queue and never goes to consume the second queue. However, when the first queue is empty, it does go to the next queue, and then consumes both queues simultaneously.

我首先实现了线程,但想避开它,当 pika 库为我完成它时没有太多复杂性.以下是我的代码:

I had first implemented threading but want to steer clear of it, when pika library does it for me without much complexity. Below is my code:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()

推荐答案

一种可能的解决方案是使用非阻塞连接并消费消息.

One possible solution is to use non blocking connection and consume messages.

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(callback, queue='queue1')
    channel.basic_consume(callback, queue='queue2')


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

这将连接到多个队列并相应地使用消息.

This will connect to multiple queues and will consume messages accordingly.

这篇关于在 python/pika 中消费多个队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持跟版网!

本站部分内容来源互联网,如果有图片或者内容侵犯了您的权益,请联系我们,我们会在确认后第一时间进行删除!

相关文档推荐

groupby multiple coords along a single dimension in xarray(在xarray中按单个维度的多个坐标分组)
Group by and Sum in Pandas without losing columns(Pandas中的GROUP BY AND SUM不丢失列)
Group by + New Column + Grab value former row based on conditionals(GROUP BY+新列+基于条件的前一行抓取值)
Groupby and interpolate in Pandas(PANDA中的Groupby算法和插值算法)
Pandas - Group Rows based on a column and replace NaN with non-null values(PANAS-基于列对行进行分组,并将NaN替换为非空值)
Grouping pandas DataFrame by 10 minute intervals(按10分钟间隔对 pandas 数据帧进行分组)