前回の記事では Airflow 検証環境にプリセットされている DAG を使って BashOperator と PythonOperator と BranchPythonOperator を試した.今回は新しく DAG を作りつつ,気になる他のオペレーターを試す.
SlackAPIPostOperator を試す
Docker を使って構築した環境だと ./dags ディレクトリに Python ファイルを置くと DAG として認識される.まずは Slack に通知をする SlackAPIPostOperator を試したく ./dags/my_dag_slack.py を以下のように作る.SlackAPIPostOperator のパラメータは必要最小限にして以下を設定した.
task_id: Airflow タスク名token: Slack API Token(Airflow Variables を使う)text: Slack 投稿文channel: Slack Channel
import pendulum from airflow import DAG from airflow.models import Variable from airflow.operators.bash_operator import BashOperator from airflow.operators.slack_operator import SlackAPIPostOperator with DAG( 'my_dag_slack', start_date=pendulum.datetime(2022, 3, 10, tz='UTC'), schedule_interval='0 0 * * *' ) as dag: start = BashOperator( task_id='start', bash_command='echo "start"' ) notification = SlackAPIPostOperator( task_id='notification', token=Variable.get('slack_api_token'), text='notification DAG finish!', channel='#general' ) start >> notification

token で今回新しく Airflow Variable を使う.Airflow 画面のメニューで Admin → Variables と進むと値を登録できて,DAG では Variable.get() で参照できる.DAG に設定値を直接書くのではなく,Airflow 側で管理できるのは便利そう.今回は Slack に Bots app をインストールして API Token を Airflow Variable に設定しておく.
実行すると Slack に通知できた.簡単!便利!

SlackAPIPostOperator の詳細は以下のドキュメントに載っている.
SqsPublishOperator を試す
次に Amazon AWS Operators にある SqsPublishOperator を試す.Amazon SQS Queue にメッセージを送信できる.他にも EcsOperator など他のオペレーターもあり以下のドキュメントに載っている.事前にセットアップは済ませておく.
$ pip install 'apache-airflow[amazon]'
さっそく ./dags/my_dag_sqs.py を以下のように作る.SqsPublishOperator のパラメータは必要最小限にして以下を設定した.
task_id: Airflow タスク名aws_conn_id: 認証情報(Airflow Connection を使う)sqs_queue: SQS Queue 名message_content: SQS メッセージ
import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator with DAG( 'my_dag_sqs', start_date=pendulum.datetime(2022, 3, 10, tz='UTC'), schedule_interval='0 0 * * *' ) as dag: start = BashOperator( task_id='start', bash_command='echo "start"' ) send_message_to_queue = SqsPublishOperator( task_id='send_message_to_queue', aws_conn_id='aws', sqs_queue='airflow-sandbox', message_content="{{ task_instance_key_str }}-{{ execution_date }}" ) start >> send_message_to_queue

aws_conn_id で今回新しく Airflow Connection を使う.Airflow 画面のメニューで Admin → Connection と進むと AWS などにアクセスするための認証情報を登録できる.今回は簡単に試すために一時的に作ったアクセスキーを設定したけど,以下のドキュメントを読むと Assume Role にも対応している.
実行すると SQS にメッセージを送信できた.以下に動作確認をしたメッセージを載せておく.簡単!便利!
my_dag_sqs__send_message_to_queue__20220311-2022-03-11T04:30:00.000000+00:00
SqsPublishOperator の詳細は以下のドキュメントに載っている.
まとめ
今回は Airflow で新しく DAG を作って SlackAPIPostOperator と SqsPublishOperator を試した.Airflow のメリットとしてオペレーターの多さがあり一部を体験することができた.また今回新しく Variable と Connection を試せたのも良かった.
