在前面的发布订阅模式中,发送者发布的消息可以广播给多个接收者。比如前面的日志系统中,日志消息可以发送给一个接收方在屏幕显示,同时发送给另一个接收方将日志消息保存到文件内。
假如现在我们需要实现这样的需求:日志信息有日志的重要程度进行区分,我们希望把比较严重的日志写入到文件中,而且同时把所有的日志都显示在屏幕上。
现在我们就需要交换机的另外一种模式,并搭配routing key实现。 在此,我们可以把这个参数称为绑定键(bindging key),我们可以通过以下方式创建一个带绑定键的绑定:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='balck')
绑定
绑定(binding)是值交换机(exchange)和队列(queue)的关系。可以理解为:这个队列(queue)对这个交换机(exchange)的消息感兴趣。我们绑定的时候可以带上routing_key参数。在我们前面的发布订阅模式中,扇形交换机(fanout exchange)会忽略这个值。现在在我们这种模式下,routing_key将发挥自己的作用。
直连交换机(direct exchange)
现在我们来改写以下我们的日志系统,让它把那些比较严重的日志写入到磁盘,其余的信息不写入磁盘。此时,我们使用扇形交换机(fanout exchange)就不好实现了。 在这里,我们使用直连交换机(direct exchange)来实现。这个过程很简单,交换机会对bingding key和routing key进行精确的匹配,从而确定消息应该发送到哪个队列。 如下图:
在此情况下,我们可以看到直连交换机X对两个队列进行了绑定。第一个队列只用orange作为绑定键,第二个队列和两个绑定键绑定,分别是black和green。 这样的话,当路由键为orange的消息发布到交换机的时候,就会被交换机路由到队列Q1。路由键为black和green的消息就会被路由到Q2。而所有的其它消息都会被丢弃。
多个绑定(multiple bindings)
如下图,多个队列使用相同的绑定键和合法的。我们可以添加X和Q1的绑定,使用black绑定键。这样一来,直连交换机就和扇形交换机的行为一样,可以将消息广播到所有匹配的队列中。带有black路由键的消息会同时发送到Q1和Q2。
发送日志
现在我们发送消息到一个直连交换机,把日志级别当作路由键。这样接收日志的脚本就可以根据严重级别来选择它想要处理的日志。我们先看看发送日志: 创建一个交换机(exchange):
channel.exchange_declare(exchange='direct_logs', type='direct')
然后,我们发送一则消息:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
其中的severity的值是info, waring, error中的一个。
订阅 接收消息的方式和之前产不多,这里的区别就是我们会为我们感兴趣的每个严重级别分别创建一个新的绑定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
完整代码
完整的流程图如下:
emit_log_direct.py
import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') servity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or "Hello World!" channel.basic_publish(exchange='direct_logs', routing_key=servity, body=message) print('[x] Sent %r' %message) connection.close()
receive_logs_direct.py
import sys import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) severities = sys.argv[1:] if not severities: print(sys.stderr, 'Usage: %s [info] [Warning] [Error]' %(sys.argv[0],)) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=result.method.queue, routing_key=severity) print('[*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print("[x] %r" %body) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=callback, queue=result.method.queue, auto_ack=True) channel.start_consuming()
下面来运行我们的程序。 我们只希望把比较严重级别的日志保存到日志文件中,我们可以在命令行中输入:
(learn_rabbitmq) $ python receive_logs_direct.py warning error > logs_from_rabbit.log
我们同时希望所有的日志信息都输出到屏幕中,我们可以在一个新的命令行中输入:
(learn_rabbitmq) $ python receive_logs_direct.py warning info error
我们在另外一个命令行中来监控info和warning级别的日志:
(learn_rabbitmq) $ python receive_logs_direct.py info warning
现在我们在另外的命令行中使用emit_log_direct.py文件发送日志信息:
(learn_rabbitmq) $ python emit_log_direct.py info this line is info log [x] Sent 'this line is info log' (learn_rabbitmq) $ python emit_log_direct.py info this line is info log [x] Sent 'this line is info log' (learn_rabbitmq) $ python emit_log_direct.py error this line is error log [x] Sent 'this line is error log' (learn_rabbitmq) $ python emit_log_direct.py warning this line is warning log [x] Sent 'this line is warning log' (learn_rabbitmq) $ ls emit_log_direct.py logs_from_rabbit.log receive_logs_direct.py (learn_rabbitmq) $ code logs_from_rabbit.log (learn_rabbitmq) $
除了写入日志文件的终端没有打印信息之外,其余的其它终端打印信息如下:
(learn_rabbitmq) $ python receive_logs_direct.py warning info error [*] Waiting for messages. To exit press CTRL+C [x] b'this line is info log' [x] b'this line is info log' [x] b'this line is error log' [x] b'this line is warning log'
上面和我们要求的一样,打印了所有级别的日志信息。
(learn_rabbitmq) $ python receive_logs_direct.py info warning [*] Waiting for messages. To exit press CTRL+C [x] b'this line is info log' [x] b'this line is warning log'
上面也和我们要求的一样,只打印了info和warning级别的信息。
转载请注明:禅思 » 消息队列之RabbitMQ-04_直连交换机?