前面文章中的一个简单队列中只有一个生产者,一个队列和一个消费者。下面我们床架一个工作队列,可以发送一些耗时的任务给多个工作者(worker)。
工作队列(Task Queues)是为了避免等待一些占用时间和资源的操作。当我们把任务(Task)当作消息发送到队列中,一个工作者(worker)就会取出任务然后进行处理,如果任务很多,就会有多个工作者共同分工完成这些任务。
在下面的例子中,我们发送一个“Hello World!”的字符串,并把这个字符串当作是一个复杂耗时的任务,其中使用time.sleep()函数来模拟耗时,根据输入的“.”的数量来觉得sleep的秒数。
我们修改以下send.py文件内容,并重命名为new_task.py文件:
import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.queue_declare(queue='hello') message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print('[x] Sent %r' %message) connection.close()
修改recieve.py文件内容,并重命名为worker.py文件:
import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print("[x] Recieved %r" %body) time.sleep(body.count('.'.encode)) print('[x] Done') channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True) print('[*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
工作队列的好处是可以并行处理队列中的任务,可以有多个worker处理任务。
我们现在打开三个终端,其中一个运行new_task.py用来向队列中发送任务,另外两个运行worker.py用来处理队列中的任务。
两个打开worker的终端:
终端1:
(learn_rabbitmq) $ python worker.py [*] Waiting for messages. To exit press CTRL+C
终端2:
(learn_rabbitmq) $ python worker.py [*] Waiting for messages. To exit press CTRL+C
发布任务的终端:
(learn_rabbitmq) $ python new_task.py First message. [x] Sent 'First message.' (learn_rabbitmq) $ python new_task.py Secode message.. [x] Sent 'Secode message..' (learn_rabbitmq) $ python new_task.py Third message... [x] Sent 'Third message...' (learn_rabbitmq) $ python new_task.py Fourth message.... [x] Sent 'Fourth message....' (learn_rabbitmq) $ python new_task.py Fifth message.... [x] Sent 'Fifth message....' (learn_rabbitmq) $
在发布任务之后,我们可以同步观察两个woker端的情况:
worker1:
(learn_rabbitmq) $ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Recieved b'First message.' [x] Done [x] Recieved b'Third message...' [x] Done [x] Recieved b'Fifth message....' [x] Done
worker2:
(learn_rabbitmq) $ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Recieved b'Secode message..' [x] Done [x] Recieved b'Fourth message....' [x] Done
从上面的情况可以看出,RabbitMQ会按照顺序把消息发送给每个消费者。发送消息的方式为轮询。
消息的确认
上面的代码中,消费者这端,我们添加了一个auto_ack的参数,表示worker完成一个任务后,就发送一个响应。然后RabbitMQ就会把该消息从队列中移除。
如果不添加该参数,RabbitMQ在发送了任务之后,不知道任务的处理情况,而且如果worker异常终止,则此时该消息就会丢失。如果我们不想丢失任何消息,如果一个worker终止了,我们就希望把该任务分配给其它的worker处理。这个就是RabbitMQ提供的消息响应机制。消费者完成任务之后,会发送一个ack响应,告诉RabbitMQ已经处理完该消息了。RabbitMQ知悉之后,就会从队列中删除该消息。如果消费者挂掉了,没有发送响应,RabbitMQ则会任务消息没有被处理,会把这条消息重新发送给其它的worker进行处理。这样就能保证消息不会丢失。
消息的持久化
按照以上代码,如果RabbitMQ退出或者因崩溃而退出的时候,RabbitMQ将会失去所有的队列和消息。如果我们向确保这些信息不要丢失,我们就需要把队列和消息进行持久化。
首先,为了不让队列消失,我们需要将队列声明为持久化(durable):
channel.queue_declare(queue='hello', durable=True)
如果我们一开始这么设置的话,那代码运行肯定没有问题,但是,由于我们之前已经定义了一个叫"hello"的队列了,RabbitMQ就无法使用不同的参数重新定义一个队列了,我们可以选择使用不同的名字,或者重启RabbitMQ。
如果修改名字的话,则注意必须在生产者和消费者端都要修改。
再者,我们还需要把我们的消息设置为持久化,将delivery_mode属性设置为2
channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties( delivery_mode=2 ) )
公平调度
之前的代码,在运行时候,我们发现,两个worker中,处理奇数消息的比较繁忙,而处理偶数消息的则比较轻松。然而,RabbitMQ仍旧是轮询派发消息。
如果我们想要实现一个公平的调度,则可以使用basic_qos方法,设置prefetch_count=1。这样就是告诉RabbitMQ,在同一时刻不要发送超过1条消息给一个worker。应当等待该worker处理完上一个消息并作出响应之后,再给它分发下一条任务。
channel.basic_qos(prefetch_count=1)
最终我们的代码如下:
new_task.py:
import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message, properties=pika.BasicProperties( delivery_mode=2 )) print('[x] Sent %r' %message) connection.close()
import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello', durable=True) print('[*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print("[x] Recieved %r" %body) time.sleep(body.count('.'.encode())) print('[x] Done') channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True) channel.start_consuming()
转载请注明:禅思 » 消息队列之RabbitMQ-02_工作队列?