kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Airflow : SlackAPIPostOperator と SqsPublishOperator に入門する

前回の記事では Airflow 検証環境にプリセットされている DAG を使って BashOperatorPythonOperatorBranchPythonOperator を試した.今回は新しく DAG を作りつつ,気になる他のオペレーターを試す.

kakakakakku.hatenablog.com

SlackAPIPostOperator を試す

Docker を使って構築した環境だと ./dags ディレクトリに Python ファイルを置くと DAG として認識される.まずは Slack に通知をする SlackAPIPostOperator を試したく ./dags/my_dag_slack.py を以下のように作る.SlackAPIPostOperator のパラメータは必要最小限にして以下を設定した.

  • task_id : Airflow タスク名
  • token : Slack API Token(Airflow Variables を使う)
  • text : Slack 投稿文
  • channel : Slack Channel
import pendulum

from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.slack_operator import SlackAPIPostOperator

with DAG(
    'my_dag_slack',
    start_date=pendulum.datetime(2022, 3, 10, tz='UTC'),
    schedule_interval='0 0 * * *'
) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start"'
    )

    notification = SlackAPIPostOperator(
        task_id='notification',
        token=Variable.get('slack_api_token'),
        text='notification DAG finish!',
        channel='#general'
    )

    start >> notification

f:id:kakku22:20220311132352p:plain

token で今回新しく Airflow Variable を使う.Airflow 画面のメニューで AdminVariables と進むと値を登録できて,DAG では Variable.get() で参照できる.DAG に設定値を直接書くのではなく,Airflow 側で管理できるのは便利そう.今回は Slack に Bots app をインストールして API TokenAirflow Variable に設定しておく.

slack.com

実行すると Slack に通知できた.簡単!便利!

f:id:kakku22:20220311132954p:plain

SlackAPIPostOperator の詳細は以下のドキュメントに載っている.

airflow.apache.org

SqsPublishOperator を試す

次に Amazon AWS Operators にある SqsPublishOperator を試す.Amazon SQS Queue にメッセージを送信できる.他にも EcsOperator など他のオペレーターもあり以下のドキュメントに載っている.事前にセットアップは済ませておく.

$ pip install 'apache-airflow[amazon]'

airflow.apache.org

さっそく ./dags/my_dag_sqs.py を以下のように作る.SqsPublishOperator のパラメータは必要最小限にして以下を設定した.

  • task_id : Airflow タスク名
  • aws_conn_id : 認証情報(Airflow Connection を使う)
  • sqs_queue : SQS Queue 名
  • message_content : SQS メッセージ
import pendulum

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator

with DAG(
    'my_dag_sqs',
    start_date=pendulum.datetime(2022, 3, 10, tz='UTC'),
    schedule_interval='0 0 * * *'
) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start"'
    )

    send_message_to_queue = SqsPublishOperator(
        task_id='send_message_to_queue',
        aws_conn_id='aws',
        sqs_queue='airflow-sandbox',
        message_content="{{ task_instance_key_str }}-{{ execution_date }}"
    )

    start >> send_message_to_queue

f:id:kakku22:20220311132403p:plain

aws_conn_id で今回新しく Airflow Connection を使う.Airflow 画面のメニューで AdminConnection と進むと AWS などにアクセスするための認証情報を登録できる.今回は簡単に試すために一時的に作ったアクセスキーを設定したけど,以下のドキュメントを読むと Assume Role にも対応している.

airflow.apache.org

実行すると SQS にメッセージを送信できた.以下に動作確認をしたメッセージを載せておく.簡単!便利!

my_dag_sqs__send_message_to_queue__20220311-2022-03-11T04:30:00.000000+00:00

SqsPublishOperator の詳細は以下のドキュメントに載っている.

airflow.apache.org

まとめ

今回は Airflow で新しく DAG を作って SlackAPIPostOperatorSqsPublishOperator を試した.Airflow のメリットとしてオペレーターの多さがあり一部を体験することができた.また今回新しく VariableConnection を試せたのも良かった.

f:id:kakku22:20220311132226p:plain

Airflow 関連記事

無料で受験できる Python 新試験「PythonZen & PEP 8 検定試験」に合格した

2022年3月1日に β リリースになった「一般社団法人 Python エンジニア育成推進協会」の新試験「PythonZen & PEP 8 検定試験」に合格した💡現在まだ β だけど合格すると「本認定」になる.PythonZen (PEP 20 - The Zen of Python)PEP 8 から出題されるため,どのように Python で「良いコード(良いとは何だろう?)」を書くかというお作法の理解を確認できる.試験とは言え,受験料は「無料」だし自宅で「24時間いつでも」受験できるから気軽に挑戦してみると良いんじゃないかと✌️

www.pythonic-exam.com

プレスリリースを読んでいたら本試験をリリースした背景が載っていた.Python を知らない人が書いている書籍PythonZen や PEP 8 を知らない講師 と書いてあり,プログラミング教育界隈としては危機的状況に思える.本試験で全てを解決できるとは思わないけど,PythonZen や PEP 8 を意識するきっかけにはなるし価値はあると思う.本試験の普及を応援するためにもブログを書いている.

Python 需要の増加に伴い、Python の関連書籍やプログラミングスクールが増える一方、Python を知らない人が書いている書籍や PythonZen や PEP 8 を知らない講師が、不適切な Python 文法を教えてしまうケースが散見されています。

試験概要 🐍

試験概要は以下のサイトに載っている.申込みをしたらすぐにメールで試験 URL が送られてきて受験できる.場合によっては迷惑メールになるという注意点が載っていたけど Gmail で本当に迷惑メールに判定されていて最初気付かなかった😇

  • 試験名称 : PythonZen & PEP 8 検定試験
  • 概要 : PythonZen (The Zen of Python) と PEP 8 に関する知識を問う試験
  • 受験料 : 無料 ✌
  • 問題数 : 20 問(全て選択問題)
  • 合格ライン : 70 %
  • 試験方式:WBT (Web Based Training) → ビデオ監視など特別な要件はなく気軽に受験できる🌸
  • 出題範囲:PythonZen と PEP 8 より出題

pythonzen-pep8-exam.jp

PythonZen (PEP 20 - The Zen of Python) 🐍

PythonZen (PEP 20 - The Zen of Python) は以下のサイトに載っている.Python を書くときの「格言(心構え)」とも言える.個人的に好きなのは以下!なお Zen を意味している.

  • Readability counts.(読みやすさは重要)
  • Although never is often better than right now.(でも今すぐやるよりもやらない方が良いことも多い)

www.python.org

また Python インタプリタを起動して import this を実行すると PythonZen (PEP 20 - The Zen of Python) を表示できる!

>>> import this
The Zen of Python, by Tim Peters

Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!

PEP 8 🐍

PEP 8 は以下のサイトに載っている.Python の「コーディング規約」としてよく使われていると思う.コードのお作法として良し悪しを宣言している部分もあるし,例えば,文字列に使う引用符である "' は推奨までは宣言せず「統一すること」と書いてあったりもする.最終的には PEP 8 をディスカッションのテーマにして,プロジェクトで決めていく必要がある.

www.python.org

個人的には import 順序の理解が曖昧だったり,Ruby に慣れていて無意識に { eggs: 2 } と書いてしまって不要な半角文字が入ってしまう場合があったりするけど,普段は pycodestylePylint を使って CI で検知している.

github.com

PEP 8 はサンプルコードも載っていて英語でも比較的読みやすいとは思うけど,日本語に翻訳するプロジェクトもあって,更新もされている.以下も合わせて参考にすると良いと思う.

pep8-ja.readthedocs.io

まとめ 🐍

2022年3月1日に β リリースになった「PythonZen & PEP 8 検定試験」に合格した💡ほとんど準備はせず,PythonZen (PEP 20 - The Zen of Python)PEP 8 をザッと1度読んですぐに受験した.1問間違えてしまって「95点」だった.間違えた問題と回答もメールで送られてくるので復習もしやすくて良かった.合格者番号は PEP-0-000233 だからそこそこ早く合格できたかも!(そもそも2日前まで新試験のリリース情報に気付いてなかったことが悔やまれる🌀)

もし PythonZen (PEP 20 - The Zen of Python)PEP 8 どちらも未経験の場合は,サンプルコードを書きながらドキュメントを1,2度読んでみると十分合格ラインには到達できる気がする💪

まとめると「PythonZen & PEP 8 検定試験」は無料で受験できる Python 新試験でおすすめ!特に「合格したから○○」という価値を求めるのではなく,PythonZen (PEP 20 - The Zen of Python)PEP 8 を学ぶことにより,Python でコードを書く前の準備運動になったり,プロジェクトメンバー間で共通認識を築いたりするきっかけとして有効だと思う!

関連試験 🐍

なお「一般社団法人 Python エンジニア育成推進協会」から提供されている「Python 3 エンジニア認定基礎試験」「Python 3 エンジニア認定データ分析試験」も過去に受験して合格している.詳しくは以下の記事を読んでもらえればと!

kakakakakku.hatenablog.com

kakakakakku.hatenablog.com

プロダクトのロードマップを公開できるサービス「Roadmap.show」を試した

プロダクトのロードマップを公開できるサービス「Roadmap.show」を試した!

サービス自体はとてもシンプルで,例えば「機能 A は開発中だよ!」とか「機能 B は予定してるけどまだ未着手だよ!」という状況をユーザーに伝えることができる.以下の YouTube を見るとすぐわかる!最近は GitHub public roadmap など,全ては公開せずとも一部のロードマップを公開するプロダクトも増えていると思う.

www.youtube.com

「Roadmap.show」の機能をザッとまとめてみた!基本的に無料で使えるけど,Premium プランに契約するとより大規模に使える.

  • 機能状況を伝えることができる (In Progress / Planned / Roadmap / Done)
  • In Progress の場合は「進捗度合い」を表示できる
  • PlannedRoadmap の場合は「お気に入り」を付けられる
  • 機能ごとに「表示 or 非表示」を変えられる
  • 「機能リクエスト」を送れる
  • カスタムドメインを設定できる
  • iframe で外部サイトに埋め込める
  • etc

roadmap.show

kakakakakku blog's Roadmap 📅

今回「Roadmap.show」を試すために kakakakakku blog の記事ロードマップを公開してみた!まず仮運用から開始するため,2022年3月時点で予定している記事ネタに限定して公開している.サイドバーにもリンクを載せておいた.

  • In Progress : 下書き中!
  • Planned : 予定してるけどまだ未着手!
  • Roadmap : 未来的に書くかも!記事リクエストあったら送ってもらえるとー🙏
  • Done : 公開した!(月末にリセットする予定)

実際に画面イメージを整理して以下に載せる.直感的に記事ロードマップを確認できて便利そう!

管理画面は以下のような画面イメージになる.機能リクエストは送ってもすぐ公開されるわけではなく,あくまで管理者側で受け入れた場合となるため,気軽に送れると思う!

まとめ

プロダクトのロードマップを公開できるサービス「Roadmap.show」を試した!

シンプルに使えてとても便利だけど,気になる点もあった.

  • 日本語を入力するときに Enter が勝手に発火してしまって入力するときにストレスを感じる
  • サイトの OGP 画像が 404 で表示されなくなっている
  • 管理画面でカードの順番を変えても反映されない(なんとなく文字列順に強制的にソートされている気がする?)
  • 各ロードマップサイトの title tag が Roadmap.show - Roadmap.show に固定されている
  • ロゴ形式の埋め込みウィジェットが欲しい

ってこれを「機能リクエスト」すれば良いんじゃん💡「Roadmap.show」自体のロードマップは以下に公開されている.

  • Roadmap's Roadmap(2022年5月時点で HTTP 502 Bad Gateway になってしまっている🔥)

以上!「Roadmap.show」の紹介でしたー!

roadmap.show

Airflow : PythonOperator と BranchPythonOperator に入門する

前回の記事では Airflow 検証環境を使って「Airflow Tutorial」を進めた.BashOperator を使った基本的な DAG を理解できるようになった!今回はプリセットされている他の DAG を使って BashOperatorPythonOperatorBranchPythonOperator を試す.

kakakakakku.hatenablog.com

BashOperator : example_bash_operator DAG

最初は「Airflow Tutorial」の復習も兼ねて BashOperator を試す.プリセットされている example_bash_operator DAG を使う.コードは以下にも載っている.

github.com

f:id:kakku22:20220301160118p:plain

DAG は Python で実装できるため,以下のように for 文と組み合わせて関連するタスクを複数作れるのは便利!命令的に記述できるメリットとも言える.以下の例は BashOperator を使って task_instance_key_str(タスクキー名 : {dag_id}__{task_id}__{ds_nodash}) を表示するタスクを3個 runme_0 / runme_1 / runme_2 作っている.

for i in range(3):
    task = BashOperator(
        task_id='runme_' + str(i),
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
    )
    task >> run_this

以下に実行ログを残しておく.task_instance_key_str が表示されている.

[2022-03-01, 16:00:00 ] {subprocess.py:89} INFO - example_bash_operator__runme_0__20220301
[2022-03-01, 16:00:00 ] {subprocess.py:89} INFO - example_bash_operator__runme_1__20220301
[2022-03-01, 16:00:00 ] {subprocess.py:89} INFO - example_bash_operator__runme_2__20220301

コードを読むと DummyOperator というまた違うオペレーターも出てくる.名前の通り,特に処理はせず,関連するタスクを終端するときなどに使う.詳しくは以下のドキュメントに載っている.今回は依存関係として1番最後に設定されている.

run_this_last = DummyOperator(
    task_id='run_this_last',
)

airflow.apache.org

実行して疑問に感じるのは this_will_skip で,Airflow は基本的に実行結果が 0 以外だと失敗するはずなのに,なぜ skip なのだろう?詳しくはドキュメントに載っていた.99 を返す(もしくは skip_exit_code と組み合わせて任意のコードを返す)と Airflow は失敗ではなく skip として判断する.調べながら仕様を理解できた!

this_will_skip = BashOperator(
    task_id='this_will_skip',
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)

PythonOperator : example_python_operator DAG

次に PythonOperator を試す.プリセットされている example_python_operator DAG を使う.コードは以下にも載っている.

github.com

f:id:kakku22:20220301160345p:plain

DAG は Python で実装するため,関数を実装したら直接タスクとして使える.最初は BashOperator のようにオブジェクトがなく戸惑った.task id はデコレータを使って指定する.以下の例は pprintprint を使って表示している.ds は前回の記事でも紹介した「実行する論理日付」kwargs は DAG 情報(コンテキスト情報)となる.

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = print_context()

for 文と組み合わせるのは BashOperator と同じ.今回は Python の time モジュールを使って待機するタスクを5個 sleep_for_0 ~ sleep_for_4 作っている.待機時間 random_base は計算して引数に設定している.

for i in range(5):

    @task(task_id=f'sleep_for_{i}')
    def my_sleeping_function(random_base):
        """This is a function that will run within the DAG execution"""
        time.sleep(random_base)

    sleeping_task = my_sleeping_function(random_base=float(i) / 10)

    run_this >> sleeping_task

また Virtualenv を使って独立した Python 環境でタスクを実行することもできる.今回は shutil.whichVirtualenv の有無を確認している.他にも PythonVirtualenvOperator もある.Python に限定せずに環境を独立させるなら DockerOperator を使うのが良さそう.

if not shutil.which("virtualenv"):
    log.warning("The virtalenv_python example task requires virtualenv, please install it.")
else:
    # [START howto_operator_python_venv]
    @task.virtualenv(
        task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
    )
    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.
        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + 'some red text')
        print(Back.GREEN + 'and with a green background')
        print(Style.DIM + 'and in dim text')
        print(Style.RESET_ALL)
        for _ in range(10):
            print(Style.DIM + 'Please wait...', flush=True)
            sleep(10)
        print('Finished')

    virtualenv_task = callable_virtualenv()

PythonOperator のドキュメントは以下にある.

airflow.apache.org

BranchPythonOperator : example_branch_operator DAG

最後は BranchPythonOperator を試す.Airflow の DAG でどうやって条件分岐を実装するのか気になっていた.今回はプリセットされている example_branch_operator DAG を使う.コードは以下にも載っている.

github.com

f:id:kakku22:20220301162611p:plain

f:id:kakku22:20220301162622p:plain

今回は BranchPythonOperator を使って python_callable に条件分岐を実装した関数を指定する.今回は branch_a から branch_d まででランダムに1個選ぶという条件分岐になっていてわかりにくさもあるけど,ここに if/else を実装すれば条件分岐を実現できる.実行結果として branch_a を通るパターンと branch_c を通るパターンを載せた.他にも Airflow の Edge Labels という機能を使って依存関係(矢印)にラベルを付ける機能も学べた.

run_this_first = DummyOperator(
    task_id='run_this_first',
)

options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

for option in options:
    t = DummyOperator(
        task_id=option,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
    )

    # Label is optional here, but it can help identify more complex branches
    branching >> Label(option) >> t >> dummy_follow >> join

BranchPythonOperator を使った条件分岐の仕組みは以下のドキュメントにも載っている.

airflow.apache.org

まとめ

今回は Airflow にプリセットされている DAG を使って BashOperatorPythonOperatorBranchPythonOperator を試した.特に PythonOperator を使うことで BashOperator よりも自由度が高く多くの処理を実装できるようになった.次回もまた別のオペレーターを試していくぞー!

Airflow 関連記事

Airflow : Tutorial を使って Python で実装された DAG の基礎を学ぶ

前回の記事で構築した Airflow 検証環境を使って「Airflow Tutorial」を進めていく!今回紹介する「Airflow Tutorial」は本当によくできてて,Airflow の基本的な仕組みや操作を学べる.そして Python で実装された DAG も読めるようになる.どの項目も丁寧に書かれていて挫折しにくく工夫されていると思う.

kakakakakku.hatenablog.com

Airflow Tutorial

airflow.apache.org

検証環境に tutorial という DAG がプリセットされている.Airflow 画面で「Graph タブ」を見るとワークフローの流れをザッと理解できる.以下の3種類のタスクから構成されていて,依存関係があることも確認できる.

  • print_date
  • sleep
  • templated

f:id:kakku22:20220228002923p:plain

タスクの詳細は Airflow 画面で「Code タブ」を見ると確認できる.Airflow の DAG は Python で実装されてるからスラスラ読めるのかと思ったけど,全然そんなことはなかった.特に最初は読みにくく感じた.ドキュメントにも 最初は誰にとっても直感的ではないかも と書いてある.でも「Airflow Tutorial」を進めていくと最終的に読めるようになるから最初は細かいところは気にせずに進めて良いかと!

One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code.

f:id:kakku22:20220228002946p:plain

なお tutorial を含めてプリセットされている DAG のコードは GitHub で確認できる.

github.com

DAG と BashOperator

まず重要なのは DAG オブジェクトで tutorial という DAG id を指定している.ワークフロー全体を意味するオブジェクトになる.パラメータは多くあるけど,例えば schedule_interval はワークフローの実行間隔を指定する.今回は1日間隔で実行する設定になっている.

with DAG(
    'tutorial',
    # (中略)
    schedule_interval=timedelta(days=1),
) as dag:

次に重要なのは DAG オブジェクトの中にネストする「タスク」で,今回は BashOperator という任意のスクリプトを実行する仕組みを使う.3種類あるタスクは全て BashOperator を使っている.ポイントは BashOperatorbash_command パラメータで date(時刻を表示する)sleep 5(5秒待つ) を設定している.単純にスクリプトを実行しているため簡単!

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
)

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
)

では t3bash_command に設定している templated_command って何?と疑問に思う.これは独自コマンドを定義していて,Airflow では Jinja テンプレートを使ってダイナミックに実装できる.以下は単純な例で for 記法を使って5回 ds : 実行する論理日付macros.ds_add(ds, 7) : 実行する論理日付の7日後 を繰り返し表示する実装になっている.

templated_command = dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

dsmacros.ds_add など,テンプレートとして使える値とマクロはドキュメントに載っている.

airflow.apache.org

依存関係

現状では3種類あるタスク t1t2t3 は独立している.Airflow では以下のようにビットシフト記法 >> を使ってタスク間の依存関係を定義できる.直感的でわかりやすい!もしくはビットシフト記法 >> を使わずに以下のように set_downstream を使っても同じワークフローを定義できる.個人的にはビットシフト記法が好きかなぁー!

t1 >> [t2, t3]
# もしくは
t1.set_downstream([t2, t3])

ドキュメント

既に tutorial.py を多く読めるようになった!残るは以下の .doc 記法で DAG とタスクにドキュメントを設定できる.この記法は今まで知らなかった!Markdown もサポートしている.ワークフローは日々増えるだろうからドキュメントを残せるのは便利!

t1.doc_md = dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)

dag.doc_md = __doc__
dag.doc_md = """
This is a documentation placed anywhere
"""

f:id:kakku22:20220228014811p:plain

テスト実行

やっとここで DAG を実行する.「Airflow Tutorial」では最初に /airflow.sh tasks test コマンドを使ってテスト実行をする.前回の記事でも使った ./airflow.sh dags trigger コマンドとの違いは,依存関係を気にせずタスク単体で実行できる点と実行ログは出力されるけど実行結果をデータベースに登録しない点となる.あくまでテスト用途になる.

$ ./airflow.sh tasks test tutorial print_date 2015-06-01
[2022-02-27 16:36:54,240] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'date']
[2022-02-27 16:36:54,252] {subprocess.py:85} INFO - Output:
[2022-02-27 16:36:54,263] {subprocess.py:89} INFO - Sun Feb 27 16:36:54 UTC 2022
[2022-02-27 16:36:54,264] {subprocess.py:93} INFO - Command exited with return code 0

$ ./airflow.sh tasks test tutorial sleep 2015-06-01
[2022-02-27 16:37:20,533] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'sleep 5']
[2022-02-27 16:37:20,545] {subprocess.py:85} INFO - Output:
[2022-02-27 16:37:25,553] {subprocess.py:93} INFO - Command exited with return code 0

$ ./airflow.sh tasks test tutorial templated 2015-06-01
[2022-02-27 16:37:42,015] {subprocess.py:74} INFO - Running command: ['bash', '-c', '\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n\n    echo "2015-06-01"\n    echo "2015-06-08"\n']
[2022-02-27 16:37:42,030] {subprocess.py:85} INFO - Output:
[2022-02-27 16:37:42,033] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,033] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-08
[2022-02-27 16:37:42,034] {subprocess.py:89} INFO - 2015-06-01
[2022-02-27 16:37:42,035] {subprocess.py:89} INFO - 2015-06-08

バックフィル実行 (backfill)

次に過去日付を指定して実行する.「Airflow Tutorial」では backfill と書かれていた.Airflow の DAG は基本的には定期実行をするため,実行日付を意識する必要がある.今回は ./airflow.sh dags backfill コマンドを使って 2015-06-01 ~ 2015-06-07 を指定して実行する.Airflow 画面で「Calendar タブ」を見たら期待通り,過去日付で1週間分実行されていた.

$ ./airflow.sh dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

(中略)

[2022-02-27 16:41:35,370] {backfill_job.py:397} INFO - [backfill progress] | finished run 7 of 7 | tasks waiting: 0 | succeeded: 21 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-02-27 16:41:35,377] {backfill_job.py:851} INFO - Backfill done. Exiting.

f:id:kakku22:20220228014358p:plain

まとめ

前回の記事で構築した Airflow 検証環境を使って「Airflow Tutorial」を進めた.Python で実装された Airflow の DAG も読めるようになって楽しくなってきた!ちなみに「Airflow Tutorial」の後半に載っている ETL の例はまた別途試す予定!次回は BashOperator 以外のオペレーターとして PythonOperatorBranchPythonOperator に入門するぞー!

airflow.apache.org