近々 RabbitMQ を使う機会がありそうで,今まで使ったことがなかったので入門してみた.素振り重要!
操作する言語は慣れてる Ruby にした.Ruby 以外にも多くの言語でチュートリアルが用意されていてかなり充実してた.
チュートリアル
計6ステップある.
- RabbitMQ - RabbitMQ tutorial - "Hello World!"
- RabbitMQ - RabbitMQ tutorial - Work Queues
- RabbitMQ - RabbitMQ tutorial - Publish/Subscribe
- RabbitMQ - RabbitMQ tutorial - Routing
- RabbitMQ - RabbitMQ tutorial - Topics
- RabbitMQ - RabbitMQ tutorial - Remote procedure call (RPC)
チュートリアルで使うコードは全て以下のリポジトリにあるので,取ってくることもできる.
チュートリアルのページも以下のリポジトリにあるので,もし試しながら誤りなどを見つけたらコミットチャンス!だと思う.
事前準備
brew でインストールしておく.
brew install rabbitmq
.zshrc で ${PATH} に /usr/local/sbin を追加しておくと RabbitMQ コマンドの補完ができて便利!
➜ rabbitmq rabbitmq-defaults rabbitmq-plugins rabbitmqadmin rabbitmq-env rabbitmq-server rabbitmqctl
個人的メモ(学びなど)
チュートリアルを試しながら学んだことをメモっておこうと思う.
1. "Hello World!"
send.rb でメッセージを publish すると Queue に溜まった.
➜ ~ rabbitmqctl list_queues
Listing queues ...
hello 1
receive.rb でメッセージを subscribe すると Queue から削除された.
➜ ~ rabbitmqctl list_queues
Listing queues ...
hello 0
RabbitMQ 用語(基本的には ActiveMQ と Kafka でも同じだけど)を整理しておく!
- A producer is a user application that sends messages.
- A queue is a buffer that stores messages.
- A consumer is a user application that receives messages.
2. Work queues
Round-robin dispatching
Consumer を Work queues として使う例を試した.
まず,ラウンドロビンの挙動を確認するために Work queues を3プロセス立ち上げて,以下のコマンドを叩いてみた.
ruby new_task.rb First message. ruby new_task.rb Second message.. ruby new_task.rb Third message... ruby new_task.rb Fourth message.... ruby new_task.rb Fifth message.....
# Worker_1 [x] Received 'First message.' [x] Done [x] Received 'Fourth message....' [x] Done # Worker_2 [x] Received 'Second message..' [x] Done [x] Received 'Fifth message.....' [x] Done # Worker_3 [x] Received 'Third message...' [x] Done
ちゃんとラウンドロビンでメッセージが配送された!
どうやって実現してるんだろう?Producer からすると Worker 数は意識して無さそうなのになー.
Message acknowledgment
デフォルト設定だと Worker が死んでしまうとメッセージも一緒に無くなってしまうので,それを防ぐ場合は Consumer から Ack が返ってこなかったらメッセージを残すという設定をすることができて,それを Message acknowledgment と言う.
q.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, body| # (中略) end
:manual_ack => true を明示するだけで設定できる.ちなみに :ack => true は既に DEPRECATION なので,使えるけど WARNING が出るので注意する.
Message durability
Ack で Worker の死を救ったとしても RabbitMQ 自体が落ちてしまったらメッセージをロストしてしまうため,Queue を永続化する durable という設定がある.基本的には durable を使うんじゃないかと思う.Producer と Consumer の双方で :durable => true を設定する必要がある.
さらに Queue だけではなくメッセージまで永続化する場合は publish するときに :persistent => true にする必要がある.基本的に true で良いと思う.
q = ch.queue("task_queue", :durable => true) # (中略) q.publish(msg, :persistent => true)
Fair dispatch
prefetch を指定すると Consumer で1度にフェッチするメッセージ数を決めることができる.メッセージのバランスによって一部の Worker の負荷が高くなってしまうような場合に任意の数を設定する.実際に使うのかな?
ch.prefetch(1)
3. Publish/Subscribe
ここで Producer / Queue / Consumer に続いて Exchange という概念が出てくる.Exchange はその名前の通り Producer から流れてきたメッセージを仲介して複数の Queue に配送する機能のことを指す.
Exchange には複数の種類がある.まず fanout を試す.
- direct
- topic
- headers
- fanout
fanout は「広がる」っていう感じの意味で,接続されている全ての Queue に Binding する.よって複数の Consumer で同じメッセージを受け取ることができる.
x = ch.fanout("logs") q = ch.queue("", :exclusive => true) q.bind(x)
複数のターミナルで receive_logs.rb を起動させた状態でメッセージを送信すると,全ての Consumer で同じメッセージを受け取ることができた.
ruby receive_logs.rb [*] Waiting for logs. To exit press CTRL+C [x] Hello World!
4. Routing
次に Exchange の direct を使って Routing Key にマッチしたメッセージを配送する.
Producer 側で routing_key を指定して publish して,Consumer 側でも routing_key を指定して Binding する.
x.publish(msg, :routing_key => severity)
ARGV.each do |severity| q.bind(x, :routing_key => severity) end
実際にログの接頭辞にある INFO / WARN / ERROR の Routing Key として配送する Consumer を振り分けることができた.
5. Topics
次に Exchange の topic を使う.direct よりも柔軟に Routing Key の設定をすることができて,具体的には . で階層化したり,# で正規表現的なパターンマッチをすることができる.
以下の4種類の Worker を起動して kern.critical を指定すると全ての Worker にメッセージが配送された.
ruby receive_logs_topic.rb "#" ruby receive_logs_topic.rb "kern.*" ruby receive_logs_topic.rb "*.critical" ruby receive_logs_topic.rb "kern.*" "*.critical"
ruby emit_log_topic.rb "kern.critical" "A critical kernel error"
6. RPC
最後は RPC のサポートを試した.まぁ RPC は実際に使うときで良いかなと思って動作確認だけした.ちゃんとリモートサーバのフィボナッチ計算を呼び出すことができた.
ruby rpc_client.rb [x] Requesting fib(30) [.] Got 832040
ruby rpc_server.rb [x] Awaiting RPC requests [.] fib(30)
管理画面
デフォルトで用意されててメッセージングの状況を可視化できるのは重要だね!

ActiveMQ とポート競合した
RabbitMQ が起動しなくて何だろうと思って調べてたら ActiveMQ をポート競合してた.
今は ActiveMQ をメインで使ってるので,気を付けないと!
➜ ~ rabbitmq-server
RabbitMQ 3.5.3. Copyright (C) 2007-2014 GoPivotal, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
##########
Starting broker...
BOOT FAILED
===========
Error description:
{could_not_start,rabbitmq_stomp,
{{shutdown,
{failed_to_start_child,'rabbit_stomp_listener_sup_:::61613',
{shutdown,
{failed_to_start_child,tcp_listener,
{cannot_listen,{0,0,0,0,0,0,0,0},61613,eaddrinuse}}}}},
{rabbit_stomp,start,[normal,[]]}}}
Log files (may contain more information):
/usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
{"init terminating in do_boot",{could_not_start,rabbitmq_stomp,{{shutdown,{failed_to_start_child,'rabbit_stomp_listener_sup_:::61613',{shutdown,{failed_to_start_child,tcp_listener,{cannot_listen,{0,0,0,0,0,0,0,0},61613,eaddrinuse}}}}},{rabbit_stomp,start,[normal,[]]}}}}
Crash dump was written to: erl_crash.dump
init terminating in do_boot ()
まとめ
2,3時間でチュートリアルを試しただけだけど RabbitMQ に入門してみた!
チュートリアルが充実してて良かった.特に 2. 3. 4. 5. あたりが重要.
ActiveMQ と比べると柔軟にメッセージングが実現できそうだなという印象だった.最近流行ってる Kafka も入門して比較してみたいところ.