kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Airflow : PythonOperator と BranchPythonOperator に入門する

前回の記事では Airflow 検証環境を使って「Airflow Tutorial」を進めた.BashOperator を使った基本的な DAG を理解できるようになった!今回はプリセットされている他の DAG を使って BashOperatorPythonOperatorBranchPythonOperator を試す.

kakakakakku.hatenablog.com

BashOperator : example_bash_operator DAG

最初は「Airflow Tutorial」の復習も兼ねて BashOperator を試す.プリセットされている example_bash_operator DAG を使う.コードは以下にも載っている.

github.com

f:id:kakku22:20220301160118p:plain

DAG は Python で実装できるため,以下のように for 文と組み合わせて関連するタスクを複数作れるのは便利!命令的に記述できるメリットとも言える.以下の例は BashOperator を使って task_instance_key_str(タスクキー名 : {dag_id}__{task_id}__{ds_nodash}) を表示するタスクを3個 runme_0 / runme_1 / runme_2 作っている.

for i in range(3):
    task = BashOperator(
        task_id='runme_' + str(i),
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
    )
    task >> run_this

以下に実行ログを残しておく.task_instance_key_str が表示されている.

[2022-03-01, 16:00:00 ] {subprocess.py:89} INFO - example_bash_operator__runme_0__20220301
[2022-03-01, 16:00:00 ] {subprocess.py:89} INFO - example_bash_operator__runme_1__20220301
[2022-03-01, 16:00:00 ] {subprocess.py:89} INFO - example_bash_operator__runme_2__20220301

コードを読むと DummyOperator というまた違うオペレーターも出てくる.名前の通り,特に処理はせず,関連するタスクを終端するときなどに使う.詳しくは以下のドキュメントに載っている.今回は依存関係として1番最後に設定されている.

run_this_last = DummyOperator(
    task_id='run_this_last',
)

airflow.apache.org

実行して疑問に感じるのは this_will_skip で,Airflow は基本的に実行結果が 0 以外だと失敗するはずなのに,なぜ skip なのだろう?詳しくはドキュメントに載っていた.99 を返す(もしくは skip_exit_code と組み合わせて任意のコードを返す)と Airflow は失敗ではなく skip として判断する.調べながら仕様を理解できた!

this_will_skip = BashOperator(
    task_id='this_will_skip',
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)

PythonOperator : example_python_operator DAG

次に PythonOperator を試す.プリセットされている example_python_operator DAG を使う.コードは以下にも載っている.

github.com

f:id:kakku22:20220301160345p:plain

DAG は Python で実装するため,関数を実装したら直接タスクとして使える.最初は BashOperator のようにオブジェクトがなく戸惑った.task id はデコレータを使って指定する.以下の例は pprintprint を使って表示している.ds は前回の記事でも紹介した「実行する論理日付」kwargs は DAG 情報(コンテキスト情報)となる.

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = print_context()

for 文と組み合わせるのは BashOperator と同じ.今回は Python の time モジュールを使って待機するタスクを5個 sleep_for_0 ~ sleep_for_4 作っている.待機時間 random_base は計算して引数に設定している.

for i in range(5):

    @task(task_id=f'sleep_for_{i}')
    def my_sleeping_function(random_base):
        """This is a function that will run within the DAG execution"""
        time.sleep(random_base)

    sleeping_task = my_sleeping_function(random_base=float(i) / 10)

    run_this >> sleeping_task

また Virtualenv を使って独立した Python 環境でタスクを実行することもできる.今回は shutil.whichVirtualenv の有無を確認している.他にも PythonVirtualenvOperator もある.Python に限定せずに環境を独立させるなら DockerOperator を使うのが良さそう.

if not shutil.which("virtualenv"):
    log.warning("The virtalenv_python example task requires virtualenv, please install it.")
else:
    # [START howto_operator_python_venv]
    @task.virtualenv(
        task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
    )
    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + 'some red text')
        print(Back.GREEN + 'and with a green background')
        print(Style.DIM + 'and in dim text')
        print(Style.RESET_ALL)
        for _ in range(10):
            print(Style.DIM + 'Please wait...', flush=True)
            sleep(10)
        print('Finished')

    virtualenv_task = callable_virtualenv()

PythonOperator のドキュメントは以下にある.

airflow.apache.org

BranchPythonOperator : example_branch_operator DAG

最後は BranchPythonOperator を試す.Airflow の DAG でどうやって条件分岐を実装するのか気になっていた.今回はプリセットされている example_branch_operator DAG を使う.コードは以下にも載っている.

github.com

f:id:kakku22:20220301162611p:plain

f:id:kakku22:20220301162622p:plain

今回は BranchPythonOperator を使って python_callable に条件分岐を実装した関数を指定する.今回は branch_a から branch_d まででランダムに1個選ぶという条件分岐になっていてわかりにくさもあるけど,ここに if/else を実装すれば条件分岐を実現できる.実行結果として branch_a を通るパターンと branch_c を通るパターンを載せた.他にも Airflow の Edge Labels という機能を使って依存関係(矢印)にラベルを付ける機能も学べた.

run_this_first = DummyOperator(
    task_id='run_this_first',
)

options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

for option in options:
    t = DummyOperator(
        task_id=option,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
    )

    # Label is optional here, but it can help identify more complex branches
    branching >> Label(option) >> t >> dummy_follow >> join

BranchPythonOperator を使った条件分岐の仕組みは以下のドキュメントにも載っている.

airflow.apache.org

まとめ

今回は Airflow にプリセットされている DAG を使って BashOperatorPythonOperatorBranchPythonOperator を試した.特に PythonOperator を使うことで BashOperator よりも自由度が高く多くの処理を実装できるようになった.次回もまた別のオペレーターを試していくぞー!

Airflow 関連記事