欢迎来访我的博客。

消息队列之RabbitMQ-03_发布订阅

其他 小张哥哥 3737浏览 41评论

前面文章中的工作队列,生产者生产的一个消息通过消息队列最终分发给一个消费者消费。
在本文中,我们来看以下一个不同形式的消息队列,即“发布/订阅”模式。
在这种模式下,一个生产者发出的一个消息,经过消息队列最终分发给多个消费者。

我们通过做一个简单的日志系统。日志系统包括两个程序。一个程序负责发送日志消息,另一个程序负责获取消息内容并输出。
在日志系统中,所有正在运行的接收程序都会接收消息,我们用其中过一个接收者把日志写入硬盘,另一个接收者把日志输出到屏幕上。


交换机
在前面的例子中,我们都把exchange赋值为空字符串,使用默认的。现在我们需要用到该交换机了。

RabbitMQ消息模型的核心理念是:发布者不会直接发送消息给队列,甚至都不知道消息是否已经被投递给队列。发布者只需要把消息发送给一个交换机(exchange)。交换机从发布者乙方接收消息,另一边把消息推送到队列。交换机知道如何处理它收到的消息,是推送到指定的队列还是多个队列,或者直接忽略该消息。交换机是通过其类型(exchange type)来定义的。

publisher_subscriber_exchange.png

交换机的类型有几种,直连交换机(direct),主题交换机(topic), 头交换机(heanders)和扇形交换机(fanout).

我们现在的发布订阅模式则需要使用fanout交换机。
我们现在定义一个fanout类型的交换机,名字命名为logs:

channel.exchange_declare(exchange='logs',
                         type='fanout')

扇形交换机就是把消息发送给它所知道的所有队列。

我们可以使用rabbitmqctl列出服务器上的所有交换器
$ sudo rabbitmqctl list_exchanges
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.

匿名交换器
前面我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")默认的交换机。
exchange参数就是交换机的名称。空字符串代表默认或者匿名交换机:消息将会根据指定的routing_key分发到指定的队列

现在我们就可以将消息发到一个带名字的交换机上了

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

临时队列
前面的文章中,我们需要生产者和消费者共享队列,所以将队列名字命名为相同。
但是现在,对于我们的日志系统,我们打算接收所有的日志消息,而且,我们只关心最新的消息而不是旧的。在此,我们需要做以下两件事情。
首先,当我们连上RabbitMQ的时候,我们需要一个全新的,空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名。我们只需要在调用queue_declare方法的时候,不提供queue参数就可以了。

result = channel.queue_declare()

这种情况下,我们可以通过result.method.queue获取随机生成的队列名。

然后,当消费者断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的。

result = channel.queue_declare(exclusive=True)

绑定
现在,我们已经创建了一个扇形交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。我们通过绑定交换机和队列来达到这个目的。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

现在logs交换机就会把消息添加到我们的队列中了。

publisher_subscriber_binding.png

完整代码
publisher_subscriber.png

发布日志消息和之前的生产者没有太大区别。其中最重要的一点就是把消息发送给logs交换机而不是匿名交换机,在发送的时候我们需要提供routing_key参数,但是在扇形交换机情况下,该值会被忽略。
改写后的emit_log.py文件如下:

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='logs',
                      routing_key='hello',
                      body=message)

print('[x] Sent %r' %message)
connection.close()

在上面代码中,在连接RabbitMQ成功之后,我们声明了一个交换机,然后将消息绑定到了交换机上。如果我们没有将交换机绑定,则此时没有消费者监听,消息会被忽略。

recieve_log.py

import time
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

print('[*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print("[x] %r" %body)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=callback,
                      queue=result.method.queue,
                      auto_ack=True)

channel.start_consuming()

如果我们需要把日志保存到文件中,我们可以在命令行中输入:

(learn_rabbitmq) $ python receive_logs.py > logs_from_rabbit.log

我们再打开一个新的终端,用于在屏幕中查看日志信息。

(learn_rabbitmq) $ python receive_logs.py

然后我们在另一个终端中发送日志信息。

(learn_rabbitmq) $ python emit_log.py


我们可以使用rabbitmqctl list_bindings来确认创建的队列已经被绑定。

$ sudo rabbitmqctl list_bindings
[sudo] password for zcw:
Listing bindings
......
exchange	0e8e00a732014c8d9193a1f324c6fc37	queue 0e8e00a732014c8d9193a1f324c6fc37	[]
exchange	3becf532429449fdaeb71ddcd0634c41	queue 3becf532429449fdaeb71ddcd0634c41	[]
exchange	5a3ae673963145f98d2383a5abde6d7b	queue 5a3ae673963145f98d2383a5abde6d7b	[]
......


转载请注明:禅思 » 消息队列之RabbitMQ-03_发布订阅?

喜欢 (1) or 分享 (0)

我的个人微信公众号,欢迎关注

扫码或搜索:Python后端开发Django

Python后端开发Django

微信公众号 扫一扫关注

结交朋友、一起学习,一起进步。

科波之主

QQ号 386046154 立即加入

添加微信,进行技术交流

专注技术交流, 一同成长进步

我的微信号

如果您喜欢我的文章,感觉我的文章对您有帮助,请狠狠点击下面

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(41)个小伙伴在吐槽
    1. loans in chicago
      Online Loan 2021-06-20 22:23:45 回复
      • amlodipine 10 mg amlodipine and simvastatin amlodipine use amlodipine generic norvasc side effects amlodipine olmesartan
        Davidnip 2021-06-21 07:15:38 回复
        • sildenafil medicine
          AmyBelty 2021-06-21 08:30:52 回复
          • order amoxicillin 500mg amoxicillin generic - amoxicillin azithromycin
            Larryjek 2021-06-21 09:48:56 回复
            • losartan 100mg losartan and alcohol losartan and alcohol losartan potassium 100mg interactions for losartan losartan dosage
              JamesunelE 2021-06-21 15:48:36 回复
              • meloxicam meloxicam 15 mg meloxicam warnings meloxicam side effects meloxicam dose interactions for meloxicam
                FidelMon 2021-06-21 20:05:34 回复
                • metoprolol succinate er metoprolol tartrate 50mg metoprolol generic name metoprolol succinate er metoprolol er succinate metoprolol tartrate 50mg
                  RobertGen 2021-06-21 23:53:16 回复
                  • Hello there! This is my first comment here so I just wanted to give a quick shout out and tell you I genuinely enjoy reading through your articles. Can you suggest any other blogs/websites/forums that go over the same topics? Many thanks!
                    beğeni satın al 2021-06-22 00:57:46 回复
                    • amoxicillin online no prescription amoxil - buy amoxicillin 250mg
                      Larryjek 2021-06-22 05:48:33 回复
                      • cymbalta uses cymbalta for pain duloxetine withdrawal duloxetine withdrawal symptoms duloxetine dr cymbalta weight gain
                        Robertdaulp 2021-06-22 07:56:03 回复
                        • prednisone dosage prednisone uses prednisone for cats prednisone half life generic prednisone 20mg prednisone withdrawal remedies
                          JesseBiz 2021-06-22 11:51:18 回复
                          • [url=http://badcreditpdloans.com/]loans for homes[/url]
                            Best Payday Loan 2021-06-22 13:31:44 回复
                            • Hello there! I know this is kinda off topic but I was wondering which blog platform are you using for this site? I'm getting fed up of Wordpress because I've had problems with hackers and I'm looking at options for another platform. I would be awesome if you could point me in the direction of a good platform.
                              TAKipçi Hilesi 2021-06-22 15:20:20 回复
                              • viagra meaning pfizer viagra pills viagra pill cutter mexican viagra ebay viagra pills generic for viagra
                                ShawnFex 2021-06-22 15:56:45 回复
                                • cialis substitute cialis dose cialis peak effect cialis pharmacy uk cialis online purchase cialis 10mg generic
                                  WilliamMab 2021-06-22 20:25:35 回复
                                  • Way cool! Some extremely valid points! I appreciate you writing this post and also the rest of the site is also very good.
                                    php SheLL 2021-06-22 20:56:48 回复
                                    • levitra lowest price levitra price australia canadian pharmacy levitra levitra daily use levitra nz levitra cvs
                                      GeorgeInsug 2021-06-23 00:53:05 回复
                                      • generic aralen aralen medication - order aralen online
                                        Larryjek 2021-06-23 02:31:44 回复
                                        • If you wish for to take a good deal from this piece of writing then you have to apply these strategies to your won website.
                                          takipçi hilesi 2021-06-23 06:10:39 回复
                                          • amitriptyline uses amitriptyline 50 mg what is amitriptyline amitriptyline 10mg tablets amitriptyline 10mg tablets amitriptyline reviews
                                            DylanChoit 2021-06-23 07:05:58 回复
                                            • duloxetine class interactions for cymbalta cymbalta drug class duloxetine dosing duloxetine 20 mg cymbalta uses
                                              MartinOrarm 2021-06-23 11:07:45 回复
                                              • metformin benefits urgent news about metformin metformin xr metformin moa side effects for metformin metformin side effects
                                                AnthonyBup 2021-06-23 19:06:17 回复
                                                • advair disk
                                                  AmyBelty 2021-06-23 19:25:17 回复
                                                  • mirtazapine uses mirtazapine 30mg what is mirtazapine mirtazapine appetite stimulant mirtazapine mirtazapine appetite stimulant
                                                    DeweyPor 2021-06-23 22:59:51 回复
                                                    • order viagra online viagra - generic viagra walmart
                                                      Danielhaw 2021-06-23 23:54:25 回复
                                                      • wellbutrin weight loss wellbutrin weight loss wellbutrin and methamphetamine bupropion xl 150mg wellbutrin for adhd wellbutrin and methamphetamine
                                                        WilliamTic 2021-06-24 12:58:49 回复