kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Redash v9 で採用されたジョブキューライブラリ RQ (Redis Queue) の基本機能を試した

Redash v9「ジョブキューライブラリ」として採用された RQ (Redis Queue) を試した.今までの Redash では Celery を使っていた.RQRedis を使って「ジョブ登録 (enqueue)」「ジョブ実行 (work)」の機能をサポートする.ドキュメントを読むと,多くの機能が実装されているけど,今回は基本機能にフォーカスした.

github.com

検証環境

今回は Docker Compose で検証環境を構築した.役割としては,大きく4種類あり,以下に載せておく.簡単に構成図も描いておいた.

  • enqueue コンテナ : ジョブ登録 (Python)
  • queue コンテナ : キュー (Redis)
  • worker1 コンテナ : ジョブ実行 (Python)
  • worker2 コンテナ : ジョブ実行 (Python)

f:id:kakku22:20200810083901p:plain

なお,検証環境を再現する場合は GitHub の sandbox-rq リポジトリを見てもらえればと!docker-compose.ymlenqueue.pyworker.py など,サンプルコードも全てコミットしてある.

github.com

1.「queue」コンテナを起動する

最初に queue コンテナを起動する.今回は RQ の Redis コマンドを確認するため,並行して queue に接続して redis-cli monitor を実行しておく.実行された Redis コマンドがストリーミングされるため,便利!

$ docker-compose up -d queue
Starting sandbox-rq_queue_1 ... done

2.「enqueue」コンテナを起動する

次に RQ のドキュメントの通りに,サンプルジョブ job.py を実装する.実装は簡単で count_words_at_url() 関数は requests を使って与えられた URL の文字数を返す.

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

python-rq.org

今度はジョブを登録をするために enqueue.py を実装する.同じくドキュメントのサンプルコードを参考にする.Redis に対するコネクションを作成したら enqueue() 関数を使ってジョブを登録する.引数には job.py に実装した count_words_at_url() 関数と引数となる URL を渡す.今回は kakakakakku blog の URL にした.RQ は非同期処理を前提にするため,もし結果を取得する場合は,数秒待機してから job.result で取得できる.

from rq import Queue
from redis import Redis
from job import count_words_at_url
import time

r = Redis(host='queue', port=6379, db=0)
q = Queue(connection=r)

job = q.enqueue(count_words_at_url, 'https://kakakakakku.hatenablog.com/')

time.sleep(5)
print(job.result)

動作確認をするために enqueue コンテナを起動する.

$ docker-compose up -d enqueue
Starting sandbox-rq_enqueue_1 ... done

redis-cli monitor の結果から,RQrq:queue:default キーにジョブをリスト型として RPUSH していることを確認できた.そして rq:job:${UUID} キーにはジョブ情報を HSET していることを確認できた.実際に HGET でキーの中身を見ると,以下のように enqueued_atstatusdescription を取得できた.

127.0.0.1:6379> LRANGE rq:queue:default 0 -1
1) "18d1bb26-4948-45f9-8195-0de1c06c395e"
2) "d391221d-7c31-4822-98ae-1e25be4306f4"
3) "dacbcd76-af61-4b91-8d95-6c702bd4d3ba"

127.0.0.1:6379> HGET rq:job:18d1bb26-4948-45f9-8195-0de1c06c395e enqueued_at
"2020-08-09T22:32:59.752159Z"

127.0.0.1:6379> HGET rq:job:18d1bb26-4948-45f9-8195-0de1c06c395e status
"queued"

127.0.0.1:6379> HGET rq:job:18d1bb26-4948-45f9-8195-0de1c06c395e description
"job.count_words_at_url('https://kakakakakku.hatenablog.com/')"

3.「worker」コンテナを起動する

最後にジョブ実行をするために,ドキュメントのサンプルコードを参考に worker.py を実装する.ポイントは Redis に対するコネクションを作成したら work() 関数でジョブを実行する.なお,実行結果は直接 Redis に HSET で更新される.

from redis import Redis
import rq
from rq import Connection, Worker

r = Redis(host='queue', port=6379, db=0)

with Connection(r):
    w = Worker(['default'])
    w.work()

なお,載せた worker.py を見るとわかる通り,ジョブに依存せず,汎用的な実装になっている.具体的には job.py をインポートせずに動く.仕組みとしては,RQ のドキュメントにも載っているけど,内部的には Python の pickle モジュールを使って,関数自体をシリアライズしている.pickle は今まで使ったことがなかった.

docs.python.org

worker1 コンテナと worker2 コンテナを起動して,常駐させておく.

$ docker-compose up -d worker1 worker2
sandbox-rq_queue_1 is up-to-date
Recreating sandbox-rq_worker2_1 ... done
Recreating sandbox-rq_worker1_1 ... done

4. 動作確認をする

最後に全体を連携させながら動作確認をする.enqueue コンテナに -d オプションを付けずにフォアグラウンドで実行すると,5秒後に「5649文字」を取得できている.

$ docker-compose up enqueue
sandbox-rq_queue_1 is up-to-date
Starting sandbox-rq_enqueue_1 ... done
Attaching to sandbox-rq_enqueue_1
enqueue_1  | 5649
sandbox-rq_enqueue_1 exited with code 0

まとめ

Redash v9「ジョブキューライブラリ」として採用された RQ (Redis Queue) の基本機能を試した.pickle モジュールを使って,関数自体をシリアライズして「ジョブ登録 (enqueue)」をしている仕組みなのは興味深かった.ドキュメントを読むと,RQ にはまだまだ他に機能があり,引き続き試していく.

  • プライオリティ
  • タイムアウト
  • リトライ
  • スケジューリング
  • 監視
  • etc

関連記事

RQ (Redis Queue) を使った「Redash v9.0.0-beta」の紹介記事は以下にある!

kakakakakku.hatenablog.com