前回の記事では Airflow 検証環境にプリセットされている DAG を使って BashOperator
と PythonOperator
と BranchPythonOperator
を試した.今回は新しく DAG を作りつつ,気になる他のオペレーターを試す.
SlackAPIPostOperator を試す
Docker を使って構築した環境だと ./dags
ディレクトリに Python ファイルを置くと DAG として認識される.まずは Slack に通知をする SlackAPIPostOperator
を試したく ./dags/my_dag_slack.py
を以下のように作る.SlackAPIPostOperator
のパラメータは必要最小限にして以下を設定した.
task_id
: Airflow タスク名token
: Slack API Token(Airflow Variables を使う)text
: Slack 投稿文channel
: Slack Channel
import pendulum from airflow import DAG from airflow.models import Variable from airflow.operators.bash_operator import BashOperator from airflow.operators.slack_operator import SlackAPIPostOperator with DAG( 'my_dag_slack', start_date=pendulum.datetime(2022, 3, 10, tz='UTC'), schedule_interval='0 0 * * *' ) as dag: start = BashOperator( task_id='start', bash_command='echo "start"' ) notification = SlackAPIPostOperator( task_id='notification', token=Variable.get('slack_api_token'), text='notification DAG finish!', channel='#general' ) start >> notification
token
で今回新しく Airflow Variable を使う.Airflow 画面のメニューで Admin
→ Variables
と進むと値を登録できて,DAG では Variable.get()
で参照できる.DAG に設定値を直接書くのではなく,Airflow 側で管理できるのは便利そう.今回は Slack に Bots app をインストールして API Token を Airflow Variable に設定しておく.
実行すると Slack に通知できた.簡単!便利!
SlackAPIPostOperator
の詳細は以下のドキュメントに載っている.
SqsPublishOperator を試す
次に Amazon AWS Operators にある SqsPublishOperator
を試す.Amazon SQS Queue にメッセージを送信できる.他にも EcsOperator
など他のオペレーターもあり以下のドキュメントに載っている.事前にセットアップは済ませておく.
$ pip install 'apache-airflow[amazon]'
さっそく ./dags/my_dag_sqs.py
を以下のように作る.SqsPublishOperator
のパラメータは必要最小限にして以下を設定した.
task_id
: Airflow タスク名aws_conn_id
: 認証情報(Airflow Connection を使う)sqs_queue
: SQS Queue 名message_content
: SQS メッセージ
import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator with DAG( 'my_dag_sqs', start_date=pendulum.datetime(2022, 3, 10, tz='UTC'), schedule_interval='0 0 * * *' ) as dag: start = BashOperator( task_id='start', bash_command='echo "start"' ) send_message_to_queue = SqsPublishOperator( task_id='send_message_to_queue', aws_conn_id='aws', sqs_queue='airflow-sandbox', message_content="{{ task_instance_key_str }}-{{ execution_date }}" ) start >> send_message_to_queue
aws_conn_id
で今回新しく Airflow Connection を使う.Airflow 画面のメニューで Admin
→ Connection
と進むと AWS などにアクセスするための認証情報を登録できる.今回は簡単に試すために一時的に作ったアクセスキーを設定したけど,以下のドキュメントを読むと Assume Role にも対応している.
実行すると SQS にメッセージを送信できた.以下に動作確認をしたメッセージを載せておく.簡単!便利!
my_dag_sqs__send_message_to_queue__20220311-2022-03-11T04:30:00.000000+00:00
SqsPublishOperator
の詳細は以下のドキュメントに載っている.
まとめ
今回は Airflow で新しく DAG を作って SlackAPIPostOperator
と SqsPublishOperator
を試した.Airflow のメリットとしてオペレーターの多さがあり一部を体験することができた.また今回新しく Variable と Connection を試せたのも良かった.