kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Airflow : Tutorial を使って Python で実装された DAG の基礎を学ぶ

前回の記事で構築した Airflow 検証環境を使って「Airflow Tutorial」を進めていく!今回紹介する「Airflow Tutorial」は本当によくできてて,Airflow の基本的な仕組みや操作を学べる.そして Python で実装された DAG も読めるようになる.どの項目も丁寧に書かれていて挫折しにくく工夫されていると思う.

kakakakakku.hatenablog.com

Airflow Tutorial

airflow.apache.org

検証環境に tutorial という DAG がプリセットされている.Airflow 画面で「Graph タブ」を見るとワークフローの流れをザッと理解できる.以下の3種類のタスクから構成されていて,依存関係があることも確認できる.

  • print_date
  • sleep
  • templated

f:id:kakku22:20220228002923p:plain

タスクの詳細は Airflow 画面で「Code タブ」を見ると確認できる.Airflow の DAG は Python で実装されてるからスラスラ読めるのかと思ったけど,全然そんなことはなかった.特に最初は読みにくく感じた.ドキュメントにも 最初は誰にとっても直感的ではないかも と書いてある.でも「Airflow Tutorial」を進めていくと最終的に読めるようになるから最初は細かいところは気にせずに進めて良いかと!

One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code.

f:id:kakku22:20220228002946p:plain

なお tutorial を含めてプリセットされている DAG のコードは GitHub で確認できる.

github.com

DAG と BashOperator

まず重要なのは DAG オブジェクトで tutorial という DAG id を指定している.ワークフロー全体を意味するオブジェクトになる.パラメータは多くあるけど,例えば schedule_interval はワークフローの実行間隔を指定する.今回は1日間隔で実行する設定になっている.

with DAG(
    'tutorial',
    # (中略)
    schedule_interval=timedelta(days=1),
) as dag:

次に重要なのは DAG オブジェクトの中にネストする「タスク」で,今回は BashOperator という任意のスクリプトを実行する仕組みを使う.3種類あるタスクは全て BashOperator を使っている.ポイントは BashOperatorbash_command パラメータで date(時刻を表示する)sleep 5(5秒待つ) を設定している.単純にスクリプトを実行しているため簡単!

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
)

では t3bash_command に設定している templated_command って何?と疑問に思う.これは独自コマンドを定義していて,Airflow では Jinja テンプレートを使ってダイナミックに実装できる.以下は単純な例で for 記法を使って5回 ds : 実行する論理日付macros.ds_add(ds, 7) : 実行する論理日付の7日後 を繰り返し表示する実装になっている.

templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

dsmacros.ds_add など,テンプレートとして使える値とマクロはドキュメントに載っている.

airflow.apache.org

依存関係

現状では3種類あるタスク t1t2t3 は独立している.Airflow では以下のようにビットシフト記法 >> を使ってタスク間の依存関係を定義できる.直感的でわかりやすい!もしくはビットシフト記法 >> を使わずに以下のように set_downstream を使っても同じワークフローを定義できる.個人的にはビットシフト記法が好きかなぁー!

t1 >> [t2, t3]
# もしくは
t1.set_downstream([t2, t3])

ドキュメント

既に tutorial.py を多く読めるようになった!残るは以下の .doc 記法で DAG とタスクにドキュメントを設定できる.この記法は今まで知らなかった!Markdown もサポートしている.ワークフローは日々増えるだろうからドキュメントを残せるのは便利!

t1.doc_md = dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)

dag.doc_md = __doc__
dag.doc_md = """
This is a documentation placed anywhere
"""

f:id:kakku22:20220228014811p:plain

テスト実行

やっとここで DAG を実行する.「Airflow Tutorial」では最初に /airflow.sh tasks test コマンドを使ってテスト実行をする.前回の記事でも使った ./airflow.sh dags trigger コマンドとの違いは,依存関係を気にせずタスク単体で実行できる点と実行ログは出力されるけど実行結果をデータベースに登録しない点となる.あくまでテスト用途になる.

$ ./airflow.sh tasks test tutorial print_date 2015-06-01
[2022-02-27 16:36:54,240] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'date']
[2022-02-27 16:36:54,252] {subprocess.py:85} INFO - Output:
[2022-02-27 16:36:54,263] {subprocess.py:89} INFO - Sun Feb 27 16:36:54 UTC 2022
[2022-02-27 16:36:54,264] {subprocess.py:93} INFO - Command exited with return code 0

$ ./airflow.sh tasks test tutorial sleep 2015-06-01
[2022-02-27 16:37:20,533] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'sleep 5']
[2022-02-27 16:37:20,545] {subprocess.py:85} INFO - Output:
[2022-02-27 16:37:25,553] {subprocess.py:93} INFO - Command exited with return code 0

$ ./airflow.sh tasks test tutorial templated 2015-06-01
[2022-02-27 16:37:42,015] {subprocess.py:74} INFO - Running command: ['bash', '-c', '\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n']
[2022-02-27 16:37:42,030] {subprocess.py:85} INFO - Output:
[2022-02-27 16:37:42,033] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,033] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,035] {subprocess.py:89} INFO - 2015-06-08

バックフィル実行 (backfill)

次に過去日付を指定して実行する.「Airflow Tutorial」では backfill と書かれていた.Airflow の DAG は基本的には定期実行をするため,実行日付を意識する必要がある.今回は ./airflow.sh dags backfill コマンドを使って 2015-06-01 ~ 2015-06-07 を指定して実行する.Airflow 画面で「Calendar タブ」を見たら期待通り,過去日付で1週間分実行されていた.

$ ./airflow.sh dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

(中略)

[2022-02-27 16:41:35,370] {backfill_job.py:397} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-02-27 16:41:35,377] {backfill_job.py:851} INFO - Backfill done. Exiting.

f:id:kakku22:20220228014358p:plain

まとめ

前回の記事で構築した Airflow 検証環境を使って「Airflow Tutorial」を進めた.Python で実装された Airflow の DAG も読めるようになって楽しくなってきた!ちなみに「Airflow Tutorial」の後半に載っている ETL の例はまた別途試す予定!次回は BashOperator 以外のオペレーターとして PythonOperatorBranchPythonOperator に入門するぞー!

airflow.apache.org