Redash v9 で「ジョブキューライブラリ」として採用された RQ (Redis Queue) を試した.今までの Redash では Celery を使っていた.RQ は Redis を使って「ジョブ登録 (enqueue
)」と「ジョブ実行 (work
)」の機能をサポートする.ドキュメントを読むと,多くの機能が実装されているけど,今回は基本機能にフォーカスした.
検証環境
今回は Docker Compose で検証環境を構築した.役割としては,大きく4種類あり,以下に載せておく.簡単に構成図も描いておいた.
enqueue
コンテナ : ジョブ登録 (Python)queue
コンテナ : キュー (Redis)worker1
コンテナ : ジョブ実行 (Python)worker2
コンテナ : ジョブ実行 (Python)
なお,検証環境を再現する場合は GitHub の sandbox-rq
リポジトリを見てもらえればと!docker-compose.yml
や enqueue.py
や worker.py
など,サンプルコードも全てコミットしてある.
1.「queue」コンテナを起動する
最初に queue
コンテナを起動する.今回は RQ の Redis コマンドを確認するため,並行して queue
に接続して redis-cli monitor
を実行しておく.実行された Redis コマンドがストリーミングされるため,便利!
$ docker-compose up -d queue Starting sandbox-rq_queue_1 ... done
2.「enqueue」コンテナを起動する
次に RQ のドキュメントの通りに,サンプルジョブ job.py
を実装する.実装は簡単で count_words_at_url()
関数は requests
を使って与えられた URL の文字数を返す.
import requests def count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())
今度はジョブを登録をするために enqueue.py
を実装する.同じくドキュメントのサンプルコードを参考にする.Redis に対するコネクションを作成したら enqueue()
関数を使ってジョブを登録する.引数には job.py
に実装した count_words_at_url()
関数と引数となる URL を渡す.今回は kakakakakku blog の URL にした.RQ は非同期処理を前提にするため,もし結果を取得する場合は,数秒待機してから job.result
で取得できる.
from rq import Queue from redis import Redis from job import count_words_at_url import time r = Redis(host='queue', port=6379, db=0) q = Queue(connection=r) job = q.enqueue(count_words_at_url, 'https://kakakakakku.hatenablog.com/') time.sleep(5) print(job.result)
動作確認をするために enqueue
コンテナを起動する.
$ docker-compose up -d enqueue Starting sandbox-rq_enqueue_1 ... done
redis-cli monitor
の結果から,RQ は rq:queue:default
キーにジョブをリスト型として RPUSH
していることを確認できた.そして rq:job:${UUID}
キーにはジョブ情報を HSET
していることを確認できた.実際に HGET
でキーの中身を見ると,以下のように enqueued_at
や status
や description
を取得できた.
127.0.0.1:6379> LRANGE rq:queue:default 0 -1 1) "18d1bb26-4948-45f9-8195-0de1c06c395e" 2) "d391221d-7c31-4822-98ae-1e25be4306f4" 3) "dacbcd76-af61-4b91-8d95-6c702bd4d3ba" 127.0.0.1:6379> HGET rq:job:18d1bb26-4948-45f9-8195-0de1c06c395e enqueued_at "2020-08-09T22:32:59.752159Z" 127.0.0.1:6379> HGET rq:job:18d1bb26-4948-45f9-8195-0de1c06c395e status "queued" 127.0.0.1:6379> HGET rq:job:18d1bb26-4948-45f9-8195-0de1c06c395e description "job.count_words_at_url('https://kakakakakku.hatenablog.com/')"
3.「worker」コンテナを起動する
最後にジョブ実行をするために,ドキュメントのサンプルコードを参考に worker.py
を実装する.ポイントは Redis に対するコネクションを作成したら work()
関数でジョブを実行する.なお,実行結果は直接 Redis に HSET
で更新される.
from redis import Redis import rq from rq import Connection, Worker r = Redis(host='queue', port=6379, db=0) with Connection(r): w = Worker(['default']) w.work()
なお,載せた worker.py
を見るとわかる通り,ジョブに依存せず,汎用的な実装になっている.具体的には job.py
をインポートせずに動く.仕組みとしては,RQ のドキュメントにも載っているけど,内部的には Python の pickle
モジュールを使って,関数自体をシリアライズしている.pickle
は今まで使ったことがなかった.
worker1
コンテナと worker2
コンテナを起動して,常駐させておく.
$ docker-compose up -d worker1 worker2 sandbox-rq_queue_1 is up-to-date Recreating sandbox-rq_worker2_1 ... done Recreating sandbox-rq_worker1_1 ... done
4. 動作確認をする
最後に全体を連携させながら動作確認をする.enqueue
コンテナに -d
オプションを付けずにフォアグラウンドで実行すると,5秒後に「5649文字」を取得できている.
$ docker-compose up enqueue sandbox-rq_queue_1 is up-to-date Starting sandbox-rq_enqueue_1 ... done Attaching to sandbox-rq_enqueue_1 enqueue_1 | 5649 sandbox-rq_enqueue_1 exited with code 0
まとめ
Redash v9 で「ジョブキューライブラリ」として採用された RQ (Redis Queue) の基本機能を試した.pickle
モジュールを使って,関数自体をシリアライズして「ジョブ登録 (enqueue
)」をしている仕組みなのは興味深かった.ドキュメントを読むと,RQ にはまだまだ他に機能があり,引き続き試していく.
- プライオリティ
- タイムアウト
- リトライ
- スケジューリング
- 監視
- etc
関連記事
RQ (Redis Queue) を使った「Redash v9.0.0-beta」の紹介記事は以下にある!