近々 RabbitMQ を使う機会がありそうで,今まで使ったことがなかったので入門してみた.素振り重要!
操作する言語は慣れてる Ruby にした.Ruby 以外にも多くの言語でチュートリアルが用意されていてかなり充実してた.
チュートリアル
計6ステップある.
- RabbitMQ - RabbitMQ tutorial - "Hello World!"
- RabbitMQ - RabbitMQ tutorial - Work Queues
- RabbitMQ - RabbitMQ tutorial - Publish/Subscribe
- RabbitMQ - RabbitMQ tutorial - Routing
- RabbitMQ - RabbitMQ tutorial - Topics
- RabbitMQ - RabbitMQ tutorial - Remote procedure call (RPC)
チュートリアルで使うコードは全て以下のリポジトリにあるので,取ってくることもできる.
チュートリアルのページも以下のリポジトリにあるので,もし試しながら誤りなどを見つけたらコミットチャンス!だと思う.
事前準備
brew でインストールしておく.
brew install rabbitmq
.zshrc
で ${PATH}
に /usr/local/sbin
を追加しておくと RabbitMQ コマンドの補完ができて便利!
➜ rabbitmq rabbitmq-defaults rabbitmq-plugins rabbitmqadmin rabbitmq-env rabbitmq-server rabbitmqctl
個人的メモ(学びなど)
チュートリアルを試しながら学んだことをメモっておこうと思う.
1. "Hello World!"
send.rb
でメッセージを publish すると Queue に溜まった.
➜ ~ rabbitmqctl list_queues
Listing queues ...
hello 1
receive.rb
でメッセージを subscribe すると Queue から削除された.
➜ ~ rabbitmqctl list_queues
Listing queues ...
hello 0
RabbitMQ 用語(基本的には ActiveMQ と Kafka でも同じだけど)を整理しておく!
- A producer is a user application that sends messages.
- A queue is a buffer that stores messages.
- A consumer is a user application that receives messages.
2. Work queues
Round-robin dispatching
Consumer を Work queues として使う例を試した.
まず,ラウンドロビンの挙動を確認するために Work queues を3プロセス立ち上げて,以下のコマンドを叩いてみた.
ruby new_task.rb First message. ruby new_task.rb Second message.. ruby new_task.rb Third message... ruby new_task.rb Fourth message.... ruby new_task.rb Fifth message.....
# Worker_1 [x] Received 'First message.' [x] Done [x] Received 'Fourth message....' [x] Done # Worker_2 [x] Received 'Second message..' [x] Done [x] Received 'Fifth message.....' [x] Done # Worker_3 [x] Received 'Third message...' [x] Done
ちゃんとラウンドロビンでメッセージが配送された!
どうやって実現してるんだろう?Producer からすると Worker 数は意識して無さそうなのになー.
Message acknowledgment
デフォルト設定だと Worker が死んでしまうとメッセージも一緒に無くなってしまうので,それを防ぐ場合は Consumer から Ack が返ってこなかったらメッセージを残すという設定をすることができて,それを Message acknowledgment
と言う.
q.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, body| # (中略) end
:manual_ack => true
を明示するだけで設定できる.ちなみに :ack => true
は既に DEPRECATION なので,使えるけど WARNING が出るので注意する.
Message durability
Ack で Worker の死を救ったとしても RabbitMQ 自体が落ちてしまったらメッセージをロストしてしまうため,Queue を永続化する durable
という設定がある.基本的には durable
を使うんじゃないかと思う.Producer と Consumer の双方で :durable => true
を設定する必要がある.
さらに Queue だけではなくメッセージまで永続化する場合は publish するときに :persistent => true
にする必要がある.基本的に true
で良いと思う.
q = ch.queue("task_queue", :durable => true) # (中略) q.publish(msg, :persistent => true)
Fair dispatch
prefetch
を指定すると Consumer で1度にフェッチするメッセージ数を決めることができる.メッセージのバランスによって一部の Worker の負荷が高くなってしまうような場合に任意の数を設定する.実際に使うのかな?
ch.prefetch(1)
3. Publish/Subscribe
ここで Producer / Queue / Consumer に続いて Exchange という概念が出てくる.Exchange はその名前の通り Producer から流れてきたメッセージを仲介して複数の Queue に配送する機能のことを指す.
Exchange には複数の種類がある.まず fanout
を試す.
- direct
- topic
- headers
- fanout
fanout
は「広がる」っていう感じの意味で,接続されている全ての Queue に Binding する.よって複数の Consumer で同じメッセージを受け取ることができる.
x = ch.fanout("logs") q = ch.queue("", :exclusive => true) q.bind(x)
複数のターミナルで receive_logs.rb
を起動させた状態でメッセージを送信すると,全ての Consumer で同じメッセージを受け取ることができた.
ruby receive_logs.rb [*] Waiting for logs. To exit press CTRL+C [x] Hello World!
4. Routing
次に Exchange の direct
を使って Routing Key にマッチしたメッセージを配送する.
Producer 側で routing_key
を指定して publish して,Consumer 側でも routing_key
を指定して Binding する.
x.publish(msg, :routing_key => severity)
ARGV.each do |severity| q.bind(x, :routing_key => severity) end
実際にログの接頭辞にある INFO / WARN / ERROR の Routing Key として配送する Consumer を振り分けることができた.
5. Topics
次に Exchange の topic
を使う.direct
よりも柔軟に Routing Key の設定をすることができて,具体的には .
で階層化したり,#
で正規表現的なパターンマッチをすることができる.
以下の4種類の Worker を起動して kern.critical
を指定すると全ての Worker にメッセージが配送された.
ruby receive_logs_topic.rb "#" ruby receive_logs_topic.rb "kern.*" ruby receive_logs_topic.rb "*.critical" ruby receive_logs_topic.rb "kern.*" "*.critical"
ruby emit_log_topic.rb "kern.critical" "A critical kernel error"
6. RPC
最後は RPC のサポートを試した.まぁ RPC は実際に使うときで良いかなと思って動作確認だけした.ちゃんとリモートサーバのフィボナッチ計算を呼び出すことができた.
ruby rpc_client.rb [x] Requesting fib(30) [.] Got 832040
ruby rpc_server.rb [x] Awaiting RPC requests [.] fib(30)
管理画面
デフォルトで用意されててメッセージングの状況を可視化できるのは重要だね!
ActiveMQ とポート競合した
RabbitMQ が起動しなくて何だろうと思って調べてたら ActiveMQ をポート競合してた.
今は ActiveMQ をメインで使ってるので,気を付けないと!
➜ ~ rabbitmq-server RabbitMQ 3.5.3. Copyright (C) 2007-2014 GoPivotal, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log ###### ## /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log ########## Starting broker... BOOT FAILED =========== Error description: {could_not_start,rabbitmq_stomp, {{shutdown, {failed_to_start_child,'rabbit_stomp_listener_sup_:::61613', {shutdown, {failed_to_start_child,tcp_listener, {cannot_listen,{0,0,0,0,0,0,0,0},61613,eaddrinuse}}}}}, {rabbit_stomp,start,[normal,[]]}}} Log files (may contain more information): /usr/local/var/log/rabbitmq/rabbit@localhost.log /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log {"init terminating in do_boot",{could_not_start,rabbitmq_stomp,{{shutdown,{failed_to_start_child,'rabbit_stomp_listener_sup_:::61613',{shutdown,{failed_to_start_child,tcp_listener,{cannot_listen,{0,0,0,0,0,0,0,0},61613,eaddrinuse}}}}},{rabbit_stomp,start,[normal,[]]}}}} Crash dump was written to: erl_crash.dump init terminating in do_boot ()
まとめ
2,3時間でチュートリアルを試しただけだけど RabbitMQ に入門してみた!
チュートリアルが充実してて良かった.特に 2. 3. 4. 5. あたりが重要.
ActiveMQ と比べると柔軟にメッセージングが実現できそうだなという印象だった.最近流行ってる Kafka も入門して比較してみたいところ.