kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Airflow : SlackAPIPostOperator と SqsPublishOperator に入門する

前回の記事では Airflow 検証環境にプリセットされている DAG を使って BashOperatorPythonOperatorBranchPythonOperator を試した.今回は新しく DAG を作りつつ,気になる他のオペレーターを試す.

kakakakakku.hatenablog.com

SlackAPIPostOperator を試す

Docker を使って構築した環境だと ./dags ディレクトリに Python ファイルを置くと DAG として認識される.まずは Slack に通知をする SlackAPIPostOperator を試したく ./dags/my_dag_slack.py を以下のように作る.SlackAPIPostOperator のパラメータは必要最小限にして以下を設定した.

  • task_id : Airflow タスク名
  • token : Slack API Token(Airflow Variables を使う)
  • text : Slack 投稿文
  • channel : Slack Channel
import pendulum

from airflow import DAG
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.slack_operator import SlackAPIPostOperator

with DAG(
    'my_dag_slack',
    start_date=pendulum.datetime(2022, 3, 10, tz='UTC'),
    schedule_interval='0 0 * * *'
) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start"'
    )

    notification = SlackAPIPostOperator(
        task_id='notification',
        token=Variable.get('slack_api_token'),
        text='notification DAG finish!',
        channel='#general'
    )

    start >> notification

f:id:kakku22:20220311132352p:plain

token で今回新しく Airflow Variable を使う.Airflow 画面のメニューで AdminVariables と進むと値を登録できて,DAG では Variable.get() で参照できる.DAG に設定値を直接書くのではなく,Airflow 側で管理できるのは便利そう.今回は Slack に Bots app をインストールして API TokenAirflow Variable に設定しておく.

slack.com

実行すると Slack に通知できた.簡単!便利!

f:id:kakku22:20220311132954p:plain

SlackAPIPostOperator の詳細は以下のドキュメントに載っている.

airflow.apache.org

SqsPublishOperator を試す

次に Amazon AWS Operators にある SqsPublishOperator を試す.Amazon SQS Queue にメッセージを送信できる.他にも EcsOperator など他のオペレーターもあり以下のドキュメントに載っている.事前にセットアップは済ませておく.

$ pip install 'apache-airflow[amazon]'

airflow.apache.org

さっそく ./dags/my_dag_sqs.py を以下のように作る.SqsPublishOperator のパラメータは必要最小限にして以下を設定した.

  • task_id : Airflow タスク名
  • aws_conn_id : 認証情報(Airflow Connection を使う)
  • sqs_queue : SQS Queue 名
  • message_content : SQS メッセージ
import pendulum

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.amazon.aws.operators.sqs import SqsPublishOperator

with DAG(
    'my_dag_sqs',
    start_date=pendulum.datetime(2022, 3, 10, tz='UTC'),
    schedule_interval='0 0 * * *'
) as dag:
    start = BashOperator(
        task_id='start',
        bash_command='echo "start"'
    )

    send_message_to_queue = SqsPublishOperator(
        task_id='send_message_to_queue',
        aws_conn_id='aws',
        sqs_queue='airflow-sandbox',
        message_content="{{ task_instance_key_str }}-{{ execution_date }}"
    )

    start >> send_message_to_queue

f:id:kakku22:20220311132403p:plain

aws_conn_id で今回新しく Airflow Connection を使う.Airflow 画面のメニューで AdminConnection と進むと AWS などにアクセスするための認証情報を登録できる.今回は簡単に試すために一時的に作ったアクセスキーを設定したけど,以下のドキュメントを読むと Assume Role にも対応している.

airflow.apache.org

実行すると SQS にメッセージを送信できた.以下に動作確認をしたメッセージを載せておく.簡単!便利!

my_dag_sqs__send_message_to_queue__20220311-2022-03-11T04:30:00.000000+00:00

SqsPublishOperator の詳細は以下のドキュメントに載っている.

airflow.apache.org

まとめ

今回は Airflow で新しく DAG を作って SlackAPIPostOperatorSqsPublishOperator を試した.Airflow のメリットとしてオペレーターの多さがあり一部を体験することができた.また今回新しく VariableConnection を試せたのも良かった.

f:id:kakku22:20220311132226p:plain

Airflow 関連記事