最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现。以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic。
实例化MQ
base.py:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters( "192.168.0.102", 5672, "/", credentials))
channel = connection.channel()
|
fanout模式
fanout模式会向绑定到指定exchange的queue中发送消息,消费者从queue中取出数据,类似于广播模式、发布订阅模式。
绑定方式: 在接收端channel.queue_bind(exchange=”logs”, queue=queue_name)
代码:
publisher.py:
1 2 3 4 5 6 7 8 9 10 11
| from base import channel, connection
channel.exchange_declare(exchange="logs", exchange_type="fanout") message = "hello fanout" channel.basic_publish( exchange="logs", routing_key="", body=message ) connection.close()
|
consumer.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| from base import channel, connection
channel.exchange_declare(exchange="logs", exchange_type="fanout")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="logs", queue=queue_name)
def callback(ch, method, properties, body): print("body:%s" % body)
channel.basic_consume( callback, queue=queue_name )
channel.start_consuming()
|
direct模式
direct模式中发送端会绑定一个routing_key1, queue中绑定若干个routing_key2, 若key1与key2相等,或者key1在key2中,则消息就会发送到这个queue中,再由相应的消费者去queue中取数据。
publisher.py:
1 2 3 4 5 6 7 8 9 10 11 12 13
| from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct")
message = "hello"
channel.basic_publish( exchange="direct_test", routing_key="info", body=message ) connection.close()
|
consumer01.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
channel.queue_bind( exchange="direct_test", queue=queue_name, routing_key="info" )
def callback(ch, method, properties, body): print("body:%s" % body)
channel.basic_consume( callback, queue=queue_name )
channel.start_consuming()
|
consumer02.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
channel.queue_bind( exchange="direct_test", queue=queue_name, routing_key="error" )
def callback(ch, method, properties, bosy): print("body:%s" % body)
channel.basic_consume( callback, queue=queue_name )
channel.start_consuming()
|
consumer03.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| from base import channel, connection
channel.exchange_declare(exchange="direct_test", exchange_type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
key_list = ["info", "warning"] for key in key_list: channel.queue_bind( exchange="direct_test", queue=queue_name, routing_key=key )
def callback(ch, method, properties, body): print("body:%s" % body)
channel.basic_consume( callback, queue=queue_name )
channel.start_consuming()
|
执行:
1 2 3 4
| python producer.py python consumer01.py python consumer02.py python consumer03.py
|
打印结果如下:
1 2 3
| consumer01.py: body:b'hello' consumer02.py没收到结果 consumer03.py: body:b'hello'
|
topic模式
topic模式不是太好理解,我的理解如下:
对于发送端绑定的routing_key1,queue绑定若干个routing_key2;若routing_key1满足任意一个routing_key2,则该消息就会通过exchange发送到这个queue中,然后由接收端从queue中取出其实就是direct模式的扩展。
发送端绑定:
1 2 3 4 5
| channel.basic_publish( exchange="topic_logs", routing_key=routing_key, body=message )
|
接收端绑定:
1 2 3 4 5
| channel.queue_bind( exchange="topic_logs", queue=queue_name, routing_key=binding_key )
|
publisher.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import sys from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic")
message = " ".join(sys.argv[1:]) or "hello topic"
channel.basic_publish( exchange="topic_test", routing_key="mysql.error", body=message ) connection.close()
|
consumer01.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
channel.queue_bind( exchange="topic_test", queue=queue_name, routing_key="*.error" )
def callback(ch, method, properties, body): print("body:%s" % body)
channel.basic_consume( callback, queue=queue_name, no_ack=True )
channel.start_consuming()
|
consumer02.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| from base import channel, connection
channel.exchange_declare(exchange="topic_test", exchange_type="topic") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
channel.queue_bind( exchange="topic_test", queue=queue_name, routing_key="mysql.*" )
def callback(ch, method, properties, body): print("body:%s" % body)
channel.basic_consume( callback, queue=queue_name, no_ack=True )
channel.start_consuming()
|
执行:
1 2 3
| python publisher02.py "this is a topic test" python consumer01.py python consumer02.py
|
打印结果:
1 2
| consumer01.py的结果: body:b'this is a topic test' consumer02.py的结果: body:b'this is a topic test'
|
说明通过绑定相应的routing_key,两个消费者都收到了消息
将publisher.py的routing_key改成”mysql.info”
再次执行:
1 2 3
| python publisher02.py "this is a topic test" python consumer01.py python consumer02.py
|
结果:
1 2
| consumer01.py没收到结果 consumer02.py的结果: body:b'this is a topic test'
|
通过这个例子我们就能明白topic的运行方式了。
RabbitMQ的三种exchange模式就介绍到这里。