kakakakakku blog

Weekly Tech Blog: Keep on Learning!

RabbitMQ に入門してピョンピョンしてみた

近々 RabbitMQ を使う機会がありそうで,今まで使ったことがなかったので入門してみた.素振り重要!

操作する言語は慣れてる Ruby にした.Ruby 以外にも多くの言語でチュートリアルが用意されていてかなり充実してた.

チュートリアル

計6ステップある.

チュートリアルで使うコードは全て以下のリポジトリにあるので,取ってくることもできる.

github.com

チュートリアルのページも以下のリポジトリにあるので,もし試しながら誤りなどを見つけたらコミットチャンス!だと思う.

github.com

事前準備

brew でインストールしておく.

brew install rabbitmq

.zshrc${PATH}/usr/local/sbin を追加しておくと RabbitMQ コマンドの補完ができて便利!

➜ rabbitmq
rabbitmq-defaults  rabbitmq-plugins   rabbitmqadmin
rabbitmq-env       rabbitmq-server    rabbitmqctl

個人的メモ(学びなど)

チュートリアルを試しながら学んだことをメモっておこうと思う.

1. "Hello World!"

send.rb でメッセージを publish すると Queue に溜まった.

➜  ~  rabbitmqctl list_queues
Listing queues ...
hello   1

receive.rb でメッセージを subscribe すると Queue から削除された.

➜  ~  rabbitmqctl list_queues
Listing queues ...
hello   0

RabbitMQ 用語(基本的には ActiveMQ と Kafka でも同じだけど)を整理しておく!

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

2. Work queues

Round-robin dispatching

Consumer を Work queues として使う例を試した.

まず,ラウンドロビンの挙動を確認するために Work queues を3プロセス立ち上げて,以下のコマンドを叩いてみた.

ruby new_task.rb First message.
ruby new_task.rb Second message..
ruby new_task.rb Third message...
ruby new_task.rb Fourth message....
ruby new_task.rb Fifth message.....
# Worker_1
[x] Received 'First message.'
[x] Done
[x] Received 'Fourth message....'
[x] Done

# Worker_2
[x] Received 'Second message..'
[x] Done
[x] Received 'Fifth message.....'
[x] Done

# Worker_3
[x] Received 'Third message...'
[x] Done

ちゃんとラウンドロビンでメッセージが配送された!

どうやって実現してるんだろう?Producer からすると Worker 数は意識して無さそうなのになー.

Message acknowledgment

デフォルト設定だと Worker が死んでしまうとメッセージも一緒に無くなってしまうので,それを防ぐ場合は Consumer から Ack が返ってこなかったらメッセージを残すという設定をすることができて,それを Message acknowledgment と言う.

q.subscribe(:manual_ack => true, :block => true) do |delivery_info, properties, body|
  # (中略)
end

:manual_ack => true を明示するだけで設定できる.ちなみに :ack => true は既に DEPRECATION なので,使えるけど WARNING が出るので注意する.

Message durability

Ack で Worker の死を救ったとしても RabbitMQ 自体が落ちてしまったらメッセージをロストしてしまうため,Queue を永続化する durable という設定がある.基本的には durable を使うんじゃないかと思う.Producer と Consumer の双方で :durable => true を設定する必要がある.

さらに Queue だけではなくメッセージまで永続化する場合は publish するときに :persistent => true にする必要がある.基本的に true で良いと思う.

q = ch.queue("task_queue", :durable => true)
# (中略)
q.publish(msg, :persistent => true)

Fair dispatch

prefetch を指定すると Consumer で1度にフェッチするメッセージ数を決めることができる.メッセージのバランスによって一部の Worker の負荷が高くなってしまうような場合に任意の数を設定する.実際に使うのかな?

ch.prefetch(1)

3. Publish/Subscribe

ここで Producer / Queue / Consumer に続いて Exchange という概念が出てくる.Exchange はその名前の通り Producer から流れてきたメッセージを仲介して複数の Queue に配送する機能のことを指す.

Exchange には複数の種類がある.まず fanout を試す.

  • direct
  • topic
  • headers
  • fanout

fanout は「広がる」っていう感じの意味で,接続されている全ての Queue に Binding する.よって複数の Consumer で同じメッセージを受け取ることができる.

x = ch.fanout("logs")
q = ch.queue("", :exclusive => true)
q.bind(x)

複数のターミナルで receive_logs.rb を起動させた状態でメッセージを送信すると,全ての Consumer で同じメッセージを受け取ることができた.

ruby receive_logs.rb
 [*] Waiting for logs. To exit press CTRL+C
 [x] Hello World!

4. Routing

次に Exchange の direct を使って Routing Key にマッチしたメッセージを配送する.

Producer 側で routing_key を指定して publish して,Consumer 側でも routing_key を指定して Binding する.

x.publish(msg, :routing_key => severity)
ARGV.each do |severity|
  q.bind(x, :routing_key => severity)
end

実際にログの接頭辞にある INFO / WARN / ERROR の Routing Key として配送する Consumer を振り分けることができた.

5. Topics

次に Exchange の topic を使う.direct よりも柔軟に Routing Key の設定をすることができて,具体的には . で階層化したり,#正規表現的なパターンマッチをすることができる.

以下の4種類の Worker を起動して kern.critical を指定すると全ての Worker にメッセージが配送された.

ruby receive_logs_topic.rb "#"
ruby receive_logs_topic.rb "kern.*"
ruby receive_logs_topic.rb "*.critical"
ruby receive_logs_topic.rb "kern.*" "*.critical"
ruby emit_log_topic.rb "kern.critical" "A critical kernel error"

6. RPC

最後は RPC のサポートを試した.まぁ RPC は実際に使うときで良いかなと思って動作確認だけした.ちゃんとリモートサーバのフィボナッチ計算を呼び出すことができた.

ruby rpc_client.rb
 [x] Requesting fib(30)
 [.] Got 832040
ruby rpc_server.rb
 [x] Awaiting RPC requests
 [.] fib(30)

管理画面

デフォルトで用意されててメッセージングの状況を可視化できるのは重要だね!

f:id:kakku22:20150826000416p:plain

ActiveMQ とポート競合した

RabbitMQ が起動しなくて何だろうと思って調べてたら ActiveMQ をポート競合してた.

今は ActiveMQ をメインで使ってるので,気を付けないと!

➜  ~  rabbitmq-server

              RabbitMQ 3.5.3. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker...

BOOT FAILED
===========

Error description:
   {could_not_start,rabbitmq_stomp,
       {{shutdown,
            {failed_to_start_child,'rabbit_stomp_listener_sup_:::61613',
                {shutdown,
                    {failed_to_start_child,tcp_listener,
                        {cannot_listen,{0,0,0,0,0,0,0,0},61613,eaddrinuse}}}}},
        {rabbit_stomp,start,[normal,[]]}}}

Log files (may contain more information):
   /usr/local/var/log/rabbitmq/rabbit@localhost.log
   /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log

{"init terminating in do_boot",{could_not_start,rabbitmq_stomp,{{shutdown,{failed_to_start_child,'rabbit_stomp_listener_sup_:::61613',{shutdown,{failed_to_start_child,tcp_listener,{cannot_listen,{0,0,0,0,0,0,0,0},61613,eaddrinuse}}}}},{rabbit_stomp,start,[normal,[]]}}}}

Crash dump was written to: erl_crash.dump
init terminating in do_boot ()

まとめ

2,3時間でチュートリアルを試しただけだけど RabbitMQ に入門してみた!

チュートリアルが充実してて良かった.特に 2. 3. 4. 5. あたりが重要.

ActiveMQ と比べると柔軟にメッセージングが実現できそうだなという印象だった.最近流行ってる Kafka も入門して比較してみたいところ.

github.com

関連エントリー