Cloud

RabbitMQ 토픽

arisu1000 2015. 3. 17. 09:00
특정 주제에 대한 메세지를 주고받는 기능.
topic exchange로 보내지는 메세지는 반드시 routing_key를 가지고 있어야함. routing_key 크기는 255 바이트.

바인딩 키에 특수문자 사용가능.
- * 하나의 단어를 의미
- # 없거나 하나이상의 단어를 의미





예제소스
emit_log_topic.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

receive_logs_topic.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

channel.start_consuming()


테스트방법
각 패턴별 메세지 수신자 실행.
python receive_logs_topic.py "#"
python receive_logs_topic.py "kern.*"
python receive_logs_topic.py "*.critical"
python receive_logs_topic.py "kern.*" "*.critical"

메세지 발행
python emit_log_topic.py "kern.critical" "A critical kernel error"


참고