ワークフローを管理するソフトウェア「Apache Airflow」に入門する.今までは本当にザッと試した程度の経験しかなく,テックブログに記事を書いたことすらなかった.幅広くある機能を試しながら学習ログを記事にしていく.今回は Airflow の検証環境を構築して画面と CLI と REST API で操作するところまでまとめていく💪
なお Airflow の原則と特徴は公式サイトに載っている.
- Principles(原則)
- Scalable(スケーラブル)
- Dynamic(ダイナミック)
- Extensible(拡張可能)
- Elegant(エレガント)
- Features(特徴)
- Pure Python(標準の Pyhon 実装)
- Useful UI(便利な UI)
- Robust Integrations(堅牢な統合)
- Easy to Use(使いやすさ)
- Open Source(オープンソース)
Docker を使う 🐳
検証環境を構築する方法として,ドキュメントには「Python (pip)」と「Docker」の2種類が載っている.今回は環境をそこまで汚さずに使えて,個人的にも慣れている Docker を使う.基本的にはドキュメントの通りに進めればよく,解説も入っている.今回使う macOS など,環境ごとの注意点も載っていて親切だと感じた.
まず,公開されている Docker Compose の docker-compose.yaml
をダウンロードする.検証した時点での最新となる Airflow 2.2.4 を使えるようになっていた.
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'
そして準備コマンドを実行する.Airflow コンテナにマウントする「3種類」のディレクトリを作ったり,Airflow のアカウントを作るための初期化処理を docker-compose up airflow-init
コマンドで実行したりする.
./dags
(DAG ファイル)./logs
(ログ)./plugins
(カスタムプラグイン)
$ mkdir -p ./dags ./logs ./plugins $ echo -e "AIRFLOW_UID=$(id -u)" > .env $ docker-compose up airflow-init
準備をしたらさっそく docker-compose up
コマンドを実行して Airflow を起動する.
$ docker-compose up
http://localhost:8080/
にアクセスするとすぐに Airflow 画面にアクセスできる.初期ユーザーとして airflow/airflow
でログインすると,DAG(有向非巡回グラフ = ワークフロー)の一覧を確認できる.ワークフローの詳細を確認したり,実行したりできる.簡単!
Airflow コンポーネント 🗝
ここで少し立ち止まって Airflow コンポーネントを確認する.まず,Docker Compose で「7種類」のコンテナが起動されている.
$ docker ps --format 'table {{.Names}}' | sort NAMES airflow-airflow-scheduler-1 airflow-airflow-triggerer-1 airflow-airflow-webserver-1 airflow-airflow-worker-1 airflow-flower-1 airflow-postgres-1 airflow-redis-1
コンテナの簡単な説明はドキュメントに載っている.比較的コンポーネントは多いと思う.
airflow-scheduler
: タスクの実行をスケジュールするairflow-triggerer
: タスクの実行をトリガーするairflow-webserver
: ウェブサーバー(アプリケーションは Flask で実装されている)airflow-worker
:airflow-scheduler
によってスケジュールされたタスクを実行する(Celery で実装されている)airflow-flower
:airflow-worker
を監視する(Flower で実装されていてhttp://localhost:5555/
でアクセスできる)postgres
: データベースサーバーredis
:airflow-scheduler
からairflow-worker
にメッセージを受け渡す
また関連する Airflow アーキテクチャは以下のドキュメントに載っている.
CLI と REST API 👾
Airflow の良さとして「画面で操作できること」はあるけど,それ以外に CLI や REST API での操作もできる.
1. CLI
まず CLI を試す.Airflow CLI を使うと Airflow を管理したり,DAG を実行したり,多くの操作ができる.ドキュメントには以下のサブコマンドが載っていて本当に多かった!
- celery cheat-sheet
- config
- connections
- dags
- db
- info
- jobs
- kerberos
- kubernetes
- plugins
- pools
- providers
- roles
- rotate-fernet-key
- scheduler
- standalone
- sync-perm
- tasks
- triggerer
- users
- variables
- version
- webserver
今回は環境構築のドキュメントに載っている通り airflow.sh
をダウンロードして使う.とは言え,スクリプトの内部では docker-compose run --rm airflow-cli
を実行しているため,あくまでラッパーという立ち位置になる.
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/airflow.sh' $ chmod +x airflow.sh
version
サブコマンドを実行すると Airflow のバージョンを確認できる.
$ ./airflow.sh version 2.2.4
users create
サブコマンドを実行するとユーザーを追加するなど Airflow の管理操作も実行できる.
$ ./airflow.sh users create \ --username kakakakakku \ --firstname Yoshida \ --lastname Yoshiaki \ --role Admin \ --email kakakakakku@example.com User "kakakakakku" created with role "Admin"
ユーザー一覧は画面でも確認できる.
dags trigger
サブコマンドを実行すると指定した DAG をトリガーできる.合わせて DAG の初期状態として「一時停止 (Paused)」になっている場合は dags unpause
サブコマンドで再開できる.
$ ./airflow.sh dags trigger example_bash_operator $ ./airflow.sh dags unpause example_bash_operator
画面も1番上に表示されていた example_bash_operator
DAG を実行できた.右上に success
と表示されている.
2. REST API
次に REST API を試す.REST API も CLI と同様に多くの操作をサポートしている.以下のように多くある.特に API を使って他のアプリケーションと簡単に連携できるのは便利だと思う.
- Config
- Connection
- DAG
- DAGRun
- EventLog
- ImportError
- Monitoring
- Pool
- Provider
- TaskInstance
- Variable
- XCom
- Plugin
- Role
- Permission
- User
API 情報はドキュメントに載っているし,検証環境で http://localhost:8080/api/v1/ui/
にアクセスすると Swagger UI も実装されているため,簡単に API の動作確認ができる.
例えば /api/v1/dags (GET)
API を実行すると DAG の一覧を確認できる.以下は画面で1番上に表示されていた example_bash_operator
DAG の情報を表示している.なお REST API は Basic 認証を必要とするため,curl
コマンドに --user
オプションを指定している.
$ curl -s http://localhost:8080/api/v1/dags --user 'airflow:airflow' | jq -r '.dags[0]' { "dag_id": "example_bash_operator", "description": null, "file_token": "xxx", "fileloc": "/home/airflow/.local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py", "is_active": true, "is_paused": false, "is_subdag": false, "owners": [ "airflow" ], "root_dag_id": null, "schedule_interval": { "__type": "CronExpression", "value": "0 0 * * *" }, "tags": [ { "name": "example2" }, { "name": "example" } ] }
まとめ
「Apache Airflow」に入門するためにドキュメントを参照しながら Docker を使って検証環境を構築した.また Airflow コンポーネントを確認したり,画面と CLI と REST API を使って Airflow の操作を試した.次は「Airflow Tutorial」を使って DAG を実行していくぞー!