본문 바로가기

AMQP/RabbitMQ

[RabbitMQ] 설치 및 운용 (Fast API)

 

프로젝트를 진행하면서 설치 및 운용 루틴을 기록해둔 글이다.

프로젝트에서 Python의 Pika 라이브러리를 사용하고 RabbitMQ는 Topic Exchange를 사용하였다.
또한, GCP환경에서 docker 서버를 띄우는 것임.

 

 

1. RabbiMQ 실행

GCP에서 docker로 RabbitMQ 서버를 띄우자
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

5672관리자 페이지인 15672 포트를 사용한다.
(listeners.tcp.default = 5672, management.tcp.port = 15672)


그리고 터미널에서 래빗엠큐 컨테이너의 bash를 실행시킨다.
docker exec -it rabbitmq bash​

레빗엠큐 더미널에서 rabbitmqadmin 커맨드라인 도구를 사용한다. rabbitmqadmin 커맨드라인 도구는 RabbitMQ 설치와 함께 설치된다.



bash 터미널에서 Topic Exchange를 만들려고 한다.

$ rabbitmqadmin declare exchange name=incoming type=topic
exchange declared

name=incoming은 exchange의 이름을 incoming으로 한 것이다.


그 다음은, Queue를 만들어야 한다.
서버에서 메세지를 생성하면 rabbitmq의 exchange로 들어오게 되고 rabbitmq내에서 exchange와 binding되어있는 queue로 전달하게 된다.

$ rabbitmqadmin declare queue name=hello
queue declared

$ rabbitmqadmin declare queue name=world
queue declared

topic exchange에 들어온 메세지들의 라벨이 hello.* 접두사로 시작하면 hello 큐에, world.* 접두사로 시작하면 world 큐에 들어가도록 하는 exchange이다. (* -> 와일드 카드)


마지막으로, exchange와 queue를 binding해주어야 한다.

$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="hello" routing_key="hello.*"
binding declared

$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="world" routing_key="world.*"
binding declared

rounting_key는 서버에서 메세지를 생성할 때 붙힌 라벨이 있을 것이다.
hello.* 이나 world.* 으로 구분해서 queue로 나누어 들어간다.

 

 

2. Pika를 이용한 RabbitMQ ( + Fast API )

서버와 RabbitMQ를 연동하기 위해서는 파이썬 PRC 클라이언트인 Pika를 사용해야한다. Pika는 Rabbitmqadmin과 거의 동일한 기능을 수행할 수 있다.
$ pip install pika​

https://pika.readthedocs.io/en/stable/



아래의 코드는 Publisher로 메세지를 서버에서 생성하는 코드이다.
# rabbitmq_publisher.py
import pika
from pika import BlockingConnection, BasicProperties


def message(exchange, topic, message):
    connection = BlockingConnection(
        pika.ConnectionParameters('34.64.142.109')
    )
    try:
        channel = connection.channel()
        props = BasicProperties(content_type='text/plain', delivery_mode=1)
        channel.basic_publish(exchange, topic, message, props)  # incoming exchange로 publish
    finally:
        connection.close()​


그 다음, 아래의 코드는 Consumer로 큐에서 메세지를 가져와 출력하는 코드이다.
import pika
from pika import BlockingConnection, BasicProperties


def on_message(channel, method_frame, header_frame, body):
    label = method_frame.routing_key
    print('-- 새 메시지 --')
    print('label : ', label)
    print('body : ', body)

    channel.basic_ack(delivery_tag=method_frame.delivery_tag)


connection = BlockingConnection(
    pika.ConnectionParameters('34.64.142.109')
)
channel = connection.channel()
channel.basic_consume(queue='hello', on_message_callback=on_message)

print("메시지 수신 대기 중")
try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

connection.close()​

 

하지만, 서버 이벤트 발생시 로그를 기록하자고 쓰이는 기술들이기 때문에 이벤트 발생 함수에서 로그를 생성하는 코드만 삽입하면 logsatsh로 보내지고 kibana로 로그를 확인할 수 있다.

 

'AMQP > RabbitMQ' 카테고리의 다른 글

[RabbitMQ] RabbitMQ의 이해  (0) 2022.06.17
[RabbitMQ] RabbitMQ 설치하기. (Feat. Docker)  (0) 2022.06.11