欢迎来访我的博客。

消息队列之RabbitMQ-02_工作队列

其他 小张哥哥 23707浏览 167评论

前面文章中的一个简单队列中只有一个生产者,一个队列和一个消费者。下面我们床架一个工作队列,可以发送一些耗时的任务给多个工作者(worker)。
工作队列(Task Queues)是为了避免等待一些占用时间和资源的操作。当我们把任务(Task)当作消息发送到队列中,一个工作者(worker)就会取出任务然后进行处理,如果任务很多,就会有多个工作者共同分工完成这些任务。

rabbitmq_task_queue.png

在下面的例子中,我们发送一个“Hello World!”的字符串,并把这个字符串当作是一个复杂耗时的任务,其中使用time.sleep()函数来模拟耗时,根据输入的“.”的数量来觉得sleep的秒数。

我们修改以下send.py文件内容,并重命名为new_task.py文件:

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
                            routing_key='hello',
                            body=message)
                      
print('[x] Sent %r' %message)
connection.close()

修改recieve.py文件内容,并重命名为worker.py文件:

import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print("[x] Recieved %r" %body)
    time.sleep(body.count('.'.encode))
    print('[x] Done')
    
channel.basic_consume(on_message_callback=callback,
                           queue='hello',
                           auto_ack=True)
                                       
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

工作队列的好处是可以并行处理队列中的任务,可以有多个worker处理任务。
我们现在打开三个终端,其中一个运行new_task.py用来向队列中发送任务,另外两个运行worker.py用来处理队列中的任务。
两个打开worker的终端:
终端1:

(learn_rabbitmq) $ python worker.py 
[*] Waiting for messages. To exit press CTRL+C

终端2:

(learn_rabbitmq) $ python worker.py 
[*] Waiting for messages. To exit press CTRL+C

发布任务的终端:

(learn_rabbitmq) $ python new_task.py First message.
[x] Sent 'First message.'
(learn_rabbitmq) $ python new_task.py Secode message..
[x] Sent 'Secode message..'
(learn_rabbitmq) $ python new_task.py Third message...
[x] Sent 'Third message...'
(learn_rabbitmq) $ python new_task.py Fourth message....
[x] Sent 'Fourth message....'
(learn_rabbitmq) $ python new_task.py Fifth message....
[x] Sent 'Fifth message....'
(learn_rabbitmq) $

在发布任务之后,我们可以同步观察两个woker端的情况:
worker1:

(learn_rabbitmq) $ python worker.py 
[*] Waiting for messages. To exit press CTRL+C
[x] Recieved b'First message.'
[x] Done
[x] Recieved b'Third message...'
[x] Done
[x] Recieved b'Fifth message....'
[x] Done

worker2:

(learn_rabbitmq) $ python worker.py 
[*] Waiting for messages. To exit press CTRL+C
[x] Recieved b'Secode message..'
[x] Done
[x] Recieved b'Fourth message....'
[x] Done

从上面的情况可以看出,RabbitMQ会按照顺序把消息发送给每个消费者。发送消息的方式为轮询。

消息的确认
上面的代码中,消费者这端,我们添加了一个auto_ack的参数,表示worker完成一个任务后,就发送一个响应。然后RabbitMQ就会把该消息从队列中移除。

如果不添加该参数,RabbitMQ在发送了任务之后,不知道任务的处理情况,而且如果worker异常终止,则此时该消息就会丢失。如果我们不想丢失任何消息,如果一个worker终止了,我们就希望把该任务分配给其它的worker处理。这个就是RabbitMQ提供的消息响应机制。消费者完成任务之后,会发送一个ack响应,告诉RabbitMQ已经处理完该消息了。RabbitMQ知悉之后,就会从队列中删除该消息。如果消费者挂掉了,没有发送响应,RabbitMQ则会任务消息没有被处理,会把这条消息重新发送给其它的worker进行处理。这样就能保证消息不会丢失。

消息的持久化

按照以上代码,如果RabbitMQ退出或者因崩溃而退出的时候,RabbitMQ将会失去所有的队列和消息。如果我们向确保这些信息不要丢失,我们就需要把队列和消息进行持久化。

首先,为了不让队列消失,我们需要将队列声明为持久化(durable):

channel.queue_declare(queue='hello', durable=True)

如果我们一开始这么设置的话,那代码运行肯定没有问题,但是,由于我们之前已经定义了一个叫"hello"的队列了,RabbitMQ就无法使用不同的参数重新定义一个队列了,我们可以选择使用不同的名字,或者重启RabbitMQ。
如果修改名字的话,则注意必须在生产者和消费者端都要修改。

再者,我们还需要把我们的消息设置为持久化,将delivery_mode属性设置为2

channel.basic_publish(exchange='',
                                     routing_key='hello',
                                     body=message,
                                     properties=pika.BasicProperties(
                                          delivery_mode=2
                                      )
                                   )

公平调度

之前的代码,在运行时候,我们发现,两个worker中,处理奇数消息的比较繁忙,而处理偶数消息的则比较轻松。然而,RabbitMQ仍旧是轮询派发消息。

rabbitmq_task_queue_delivery_mode2.png


如果我们想要实现一个公平的调度,则可以使用basic_qos方法,设置prefetch_count=1。这样就是告诉RabbitMQ,在同一时刻不要发送超过1条消息给一个worker。应当等待该worker处理完上一个消息并作出响应之后,再给它分发下一条任务。

channel.basic_qos(prefetch_count=1)

最终我们的代码如下:
new_task.py:

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode=2
                      ))

print('[x] Sent %r' %message)
connection.close()

worker.py

import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)
print('[*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] Recieved %r" %body)
    time.sleep(body.count('.'.encode()))
    print('[x] Done')

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=callback,
                      queue='hello',
                      auto_ack=True)

channel.start_consuming()


转载请注明:禅思 » 消息队列之RabbitMQ-02_工作队列?

喜欢 (0) or 分享 (0)

我的个人微信公众号,欢迎关注

扫码或搜索:Python后端开发Django

Python后端开发Django

微信公众号 扫一扫关注

结交朋友、一起学习,一起进步。

科波之主

QQ号 386046154 立即加入

添加微信,进行技术交流

专注技术交流, 一同成长进步

我的微信号

如果您喜欢我的文章,感觉我的文章对您有帮助,请狠狠点击下面

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(167)个小伙伴在吐槽
    1. neurontin price neurontin without prescription - cheap neurontin
      Larryjek 2021-06-20 20:07:55 回复
      • mail pharmacy
        JaneBelty 2021-06-21 01:09:36 回复
        • amlodipine 10 mg amlodipine dosage side effects amlodipine amlodipine 10 mg norvasc amlodipine amlodipine 10mg tablets
          Davidnip 2021-06-21 07:06:06 回复
          • plaquenil nz
            JaneBelty 2021-06-21 07:46:13 回复
            • cialis 40 mg online
              AmyBelty 2021-06-21 10:59:16 回复
              • lipitor generic name warnings for lipitor atorvastatin warnings warnings for lipitor atorvastatin calcium 80mg atorvastatin calcium 40mg
                Thomasgrind 2021-06-21 11:22:50 回复
                • cialis brand
                  KimBelty 2021-06-21 14:26:26 回复
                  • chloroquine aralen phosphate - aralen where to buy
                    Larryjek 2021-06-21 15:22:31 回复
                    • losartan potassium losartan dose losartan potassium 25g losartan and alcohol losartan drug class losartan recall 2020
                      JamesunelE 2021-06-21 15:39:11 回复
                      • viagra 120 mg
                        EvaBelty 2021-06-21 16:12:04 回复
                        • meloxicam 15mg interactions for meloxicam mobic meloxicam meloxicam 10 meloxicam dosage meloxicam dose
                          FidelMon 2021-06-21 19:57:11 回复
                          • cialis paypal
                            EvaBelty 2021-06-21 20:54:34 回复
                            • metoprolol side effects metoprolol succinate er metoprolol medication metoprolol dosage metoprolol 25 mg metoprolol xl
                              RobertGen 2021-06-21 23:44:12 回复
                              • duloxetine dr 30mg duloxetine generic generic for cymbalta cymbalta reviews duloxetine generic duloxetine dosing
                                Robertdaulp 2021-06-22 07:47:09 回复
                                • ivermectin 200 mcg
                                  JaneBelty 2021-06-22 08:49:40 回复
                                  • alcohol and prednisone prednisone side effects prednisone 10mg cost prednisone 10mg prednisone 10mg cost prednisone 20mg tablet
                                    JesseBiz 2021-06-22 11:42:16 回复
                                    • buy doxycycline cheap odering doxycycline - doxycycline 50 mg
                                      Larryjek 2021-06-22 11:47:23 回复
                                      • cialis purchase
                                        EvaBelty 2021-06-22 12:16:33 回复
                                        • canadian viagra pharmacy genuine viagra canada viagra generic name viagra prescription viagra dosages viagra by phone
                                          ShawnFex 2021-06-22 15:47:05 回复
                                          • quick money
                                            Quick Loan 2021-06-22 17:11:34 回复
                                            • tadalafil citrate tadalafil cheap online cialis 10mg uk cialis 100mg dosage cialis india price cialis price nz
                                              WilliamMab 2021-06-22 20:15:19 回复
                                              • tadalafil online 10mg
                                                JaneBelty 2021-06-22 20:44:53 回复
                                                • viagra otc usa
                                                  EvaBelty 2021-06-22 22:05:12 回复
                                                  • price viagra uk
                                                    KimBelty 2021-06-22 23:12:58 回复
                                                    • vardenafil buy levitra information levitra vardenafil levitra dosage generic levitra dose levitra from usa
                                                      GeorgeInsug 2021-06-23 00:42:59 回复
                                                      • Thank you, I have just been searching for information about this subject for ages and yours is the best I have found out so far. But, what about the conclusion? Are you positive in regards to the source?
                                                        Jessefup 2021-06-23 03:20:32 回复
                                                        • amitriptyline 10mg what is amitriptyline amitriptyline tablets amitriptyline overdose amitriptyline elavil amitriptyline uses
                                                          DylanChoit 2021-06-23 05:55:09 回复
                                                          • tadalafil 10mg price
                                                            EvaBelty 2021-06-23 06:38:52 回复
                                                            • where can i buy neurontin from canada buy neurontin online - neurontin 600 mg cost
                                                              Larryjek 2021-06-23 08:35:04 回复
                                                              • duloxetine 60mg duloxetine cymbalta uses cymbalta weight gain duloxetine 60 mg cymbalta dosage
                                                                MartinOrarm 2021-06-23 10:58:52 回复
                                                                • loans fast
                                                                  A Payday Loan 2021-06-23 11:11:12 回复
                                                                  • warnings for hydrochlorothiazide losartan potassium hydrochlorothiazide hydrochlorothiazide price hydrochlorothiazide tablets chlorthalidone vs hydrochlorothiazide valsartan hydrochlorothiazide
                                                                    Scottelise 2021-06-23 15:01:25 回复
                                                                    • cialis cost canada
                                                                      EvaBelty 2021-06-23 18:55:54 回复
                                                                      • interactions for metformin metformin mechanism of action metformin diarrhea metformin er 1000 mg metformin hcl er side effects for metformin
                                                                        AnthonyBup 2021-06-23 18:57:38 回复
                                                                        • mirtazapine for sleep warnings for mirtazapine mirtazapine side effects bipolar what is mirtazapine prescribed for mirtazapine tablet mirtazapine side effects
                                                                          DeweyPor 2021-06-23 22:51:13 回复
                                                                          • viagra online store
                                                                            JaneBelty 2021-06-24 02:46:55 回复
                                                                            • buying viagra online viagra discount - when will viagra be generic
                                                                              Danielhaw 2021-06-24 05:26:41 回复
                                                                              • sildenafil purchase
                                                                                KimBelty 2021-06-24 06:30:43 回复
                                                                                • fast personal loan
                                                                                  Loans 2021-06-24 07:12:33 回复
                                                                                  • What's up, all is going perfectly here and ofcourse every one is sharing information, that's in fact excellent, keep up writing.
                                                                                    Jessefup 2021-06-24 09:55:49 回复
                                                                                    • wellbutrin dosage range wellbutrin dosage range wellbutrin and alcohol wellbutrin side effects bupropion withdrawal bupropion hcl
                                                                                      WilliamTic 2021-06-24 12:50:27 回复
                                                                                      • xenical 120mg price
                                                                                        KimBelty 2021-06-24 13:41:44 回复