프로젝트를 진행하면서 설치 및 운용 루틴을 기록해둔 글이다.
프로젝트에서 Python의 Pika 라이브러리를 사용하고 RabbitMQ는 Topic Exchange를 사용하였다.
또한, GCP환경에서 docker 서버를 띄우는 것임.
1. RabbiMQ 실행
GCP에서 docker로 RabbitMQ 서버를 띄우자
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
5672와 관리자 페이지인 15672 포트를 사용한다.
(listeners.tcp.default = 5672, management.tcp.port = 15672)
그리고 터미널에서 래빗엠큐 컨테이너의 bash를 실행시킨다.
docker exec -it rabbitmq bash
레빗엠큐 더미널에서 rabbitmqadmin 커맨드라인 도구를 사용한다. rabbitmqadmin 커맨드라인 도구는 RabbitMQ 설치와 함께 설치된다.
bash 터미널에서 Topic Exchange를 만들려고 한다.$ rabbitmqadmin declare exchange name=incoming type=topic exchange declared
name=incoming은 exchange의 이름을 incoming으로 한 것이다.
그 다음은, Queue를 만들어야 한다.
서버에서 메세지를 생성하면 rabbitmq의 exchange로 들어오게 되고 rabbitmq내에서 exchange와 binding되어있는 queue로 전달하게 된다.$ rabbitmqadmin declare queue name=hello queue declared $ rabbitmqadmin declare queue name=world queue declared
topic exchange에 들어온 메세지들의 라벨이 hello.* 접두사로 시작하면 hello 큐에, world.* 접두사로 시작하면 world 큐에 들어가도록 하는 exchange이다. (* -> 와일드 카드)
마지막으로, exchange와 queue를 binding해주어야 한다.$ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="hello" routing_key="hello.*" binding declared $ rabbitmqadmin declare binding source="incoming" destination_type="queue" destination="world" routing_key="world.*" binding declared
rounting_key는 서버에서 메세지를 생성할 때 붙힌 라벨이 있을 것이다.
hello.* 이나 world.* 으로 구분해서 queue로 나누어 들어간다.
2. Pika를 이용한 RabbitMQ ( + Fast API )
서버와 RabbitMQ를 연동하기 위해서는 파이썬 PRC 클라이언트인 Pika를 사용해야한다. Pika는 Rabbitmqadmin과 거의 동일한 기능을 수행할 수 있다.
$ pip install pika
https://pika.readthedocs.io/en/stable/
아래의 코드는 Publisher로 메세지를 서버에서 생성하는 코드이다.
# rabbitmq_publisher.py import pika from pika import BlockingConnection, BasicProperties def message(exchange, topic, message): connection = BlockingConnection( pika.ConnectionParameters('34.64.142.109') ) try: channel = connection.channel() props = BasicProperties(content_type='text/plain', delivery_mode=1) channel.basic_publish(exchange, topic, message, props) # incoming exchange로 publish finally: connection.close()
그 다음, 아래의 코드는 Consumer로 큐에서 메세지를 가져와 출력하는 코드이다.
import pika from pika import BlockingConnection, BasicProperties def on_message(channel, method_frame, header_frame, body): label = method_frame.routing_key print('-- 새 메시지 --') print('label : ', label) print('body : ', body) channel.basic_ack(delivery_tag=method_frame.delivery_tag) connection = BlockingConnection( pika.ConnectionParameters('34.64.142.109') ) channel = connection.channel() channel.basic_consume(queue='hello', on_message_callback=on_message) print("메시지 수신 대기 중") try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() connection.close()
하지만, 서버 이벤트 발생시 로그를 기록하자고 쓰이는 기술들이기 때문에 이벤트 발생 함수에서 로그를 생성하는 코드만 삽입하면 logsatsh로 보내지고 kibana로 로그를 확인할 수 있다.
'AMQP > RabbitMQ' 카테고리의 다른 글
[RabbitMQ] RabbitMQ의 이해 (0) | 2022.06.17 |
---|---|
[RabbitMQ] RabbitMQ 설치하기. (Feat. Docker) (0) | 2022.06.11 |