kakakakakku blog

Weekly Tech Blog: Keep on Learning!

Casbin 認可ポリシーを DynamoDB に保存できるアダプター「python-dycasbin」を試した

Casbin で認可ポリシーを保存する1番簡単な選択肢は CSV ファイルだけど,アダプターを使うと認可ポリシーをデータベースで管理できる👌今回は PyCasbin で,ドキュメントに載っている DynamoDB Adapter (python-dycasbin) を使って,Casbin 認可ポリシーを Amazon DynamoDB に保存してみた.試したログをまとめておく❗️

casbin.org

また LoadPolicy()SavePolicy() など,アダプターインタフェースを実装すればカスタムアダプターも実装できる👀

casbin.org

👾 requirements.txt

今回は PyCasbin と python-dycasbin の最新バージョンを使う👌

casbin==1.36.2
python-dycasbin==0.4.1

pypi.org

pypi.org

👾 model.conf

そして,Casbin Model はドキュメントにも載っている ACL (Access Control List) をそのまま使う👌

[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act

casbin.org

👾 add-policy.py

まず,python-dycasbin を使って Amazon DynamoDB テーブルに Casbin 認可ポリシーを登録する.今回はサンプルとして Amazon S3 を参考に s3:PutObject アクションを制御することにした.

  • sub (kakakakakku) は obj1 (arn:aws:s3:::sample-bucket-1/*) に対して act (s3:PutObject) できる
  • sub (kakakakakku) は obj2 (arn:aws:s3:::sample-bucket-2/*) に対して act (s3:PutObject) できる

また Amazon DynamoDB は LocalStack を使ってローカル環境でアクセスできるようにしておく✅

import casbin
from python_dycasbin import adapter

adapter = adapter.Adapter(endpoint_url='http://localhost:4566')

e = casbin.Enforcer('model.conf', adapter, True)

sub = 'kakakakakku'
obj1 = 'arn:aws:s3:::sample-bucket-1/*'
obj2 = 'arn:aws:s3:::sample-bucket-2/*'
act = 's3:PutObject'

e.add_policy(sub, obj1, act)
e.add_policy(sub, obj2, act)

add-policy.py を実行すると,Adapter 経由で自動的に Amazon DynamoDB テーブル(デフォルトだと casbin_rule)が構築される.Adapter クラスの初期化 (__init__) で毎回テーブルを構築しようと試みるところは微妙に感じるし,テーブル設定として WCU / RCU が 10 で固定されているのも厳しい気がした😇オンデマンドキャパシティを選択するオプションがあっても良さそう.

github.com

登録した Casbin 認可ポリシーは Amazon DynamoDB テーブルでは以下のように登録されていた.

$ awslocal dynamodb scan --table-name casbin_rule
{
    "Items": [
        {
            "ptype": {
                "S": "p"
            },
            "v0": {
                "S": "kakakakakku"
            },
            "v1": {
                "S": "arn:aws:s3:::sample-bucket-1/*"
            },
            "id": {
                "S": "bf571b8e16cef131aeac79c15a800421"
            },
            "v2": {
                "S": "s3:PutObject"
            }
        },
        {
            "ptype": {
                "S": "p"
            },
            "v0": {
                "S": "kakakakakku"
            },
            "v1": {
                "S": "arn:aws:s3:::sample-bucket-2/*"
            },
            "id": {
                "S": "46808bf495552ff34293bdb9f8f0a9b2"
            },
            "v2": {
                "S": "s3:PutObject"
            }
        }
    ],
    "Count": 2,
    "ScannedCount": 2,
    "ConsumedCapacity": null
}

登録されたデータ(アイテム)は LocalStack Resource Browser でも確認できた👌

👾 enforce.py

今度は arn:aws:s3:::sample-bucket-1/*arn:aws:s3:::sample-bucket-2/*arn:aws:s3:::sample-bucket-3/* に対する s3:PutObject アクションの許可を確認する.

import casbin
from python_dycasbin import adapter

adapter = adapter.Adapter(endpoint_url='http://localhost:4566')

e = casbin.Enforcer('model.conf', adapter, True)

sub = 'kakakakakku'
obj1 = 'arn:aws:s3:::sample-bucket-1/*'
obj2 = 'arn:aws:s3:::sample-bucket-2/*'
obj3 = 'arn:aws:s3:::sample-bucket-3/*'
act = 's3:PutObject'

e.enforce(sub, obj1, act)
e.enforce(sub, obj2, act)
e.enforce(sub, obj3, act)

期待通りに True, True, False と認可判断ができていた❗️

2024-06-21 19:00:00,000 Request: kakakakakku, arn:aws:s3:::sample-bucket-1/*, s3:PutObject ---> True
2024-06-21 19:00:00,000 Request: kakakakakku, arn:aws:s3:::sample-bucket-2/*, s3:PutObject ---> True
2024-06-21 19:00:00,000 Request: kakakakakku, arn:aws:s3:::sample-bucket-3/*, s3:PutObject ---> False

ちなみに python-dycasbin の実装を読むと,Amazon DynamoDB テーブルに対して Scan を実行していて気になる💨Amazon DynamoDB テーブルに GSI (Global Secondary Index) を追加して,Scan ではなく Query を使うように変更したプルリクエストも出ているようだった.

👾 remove-policy.py

最後は arn:aws:s3:::sample-bucket-2/* に対する Casbin 認可ポリシーを削除してみる.

import casbin
from python_dycasbin import adapter

adapter = adapter.Adapter(endpoint_url='http://localhost:4566')

e = casbin.Enforcer('model.conf', adapter, True)

sub = 'kakakakakku'
obj2 = 'arn:aws:s3:::sample-bucket-2/*'
act = 's3:PutObject'

e.remove_policy(sub, obj2, act)

もう一度 s3:PutObject アクションの許可を確認したところ,期待通りに True, False, False と認可判断ができていた❗️

2024-06-21 19:00:00,000 Request: kakakakakku, arn:aws:s3:::sample-bucket-1/*, s3:PutObject ---> True
2024-06-21 19:00:00,000 Request: kakakakakku, arn:aws:s3:::sample-bucket-2/*, s3:PutObject ---> False
2024-06-21 19:00:00,000 Request: kakakakakku, arn:aws:s3:::sample-bucket-3/*, s3:PutObject ---> False

Amazon DynamoDB テーブルのデータ(アイテム)も削除されていた👌

$ awslocal dynamodb scan --table-name casbin_rule
{
    "Items": [
        {
            "ptype": {
                "S": "p"
            },
            "v0": {
                "S": "kakakakakku"
            },
            "v1": {
                "S": "arn:aws:s3:::sample-bucket-1/*"
            },
            "id": {
                "S": "bf571b8e16cef131aeac79c15a800421"
            },
            "v2": {
                "S": "s3:PutObject"
            }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

Powertools for AWS Lambda (Python) の Validation で UUID フォーマットをバリデーションする

Powertools for AWS Lambda (Python) の Validation でプロパティの「UUID フォーマット」をチェックする場合は以下のようにスキーマを実装すると良さそう👌

{
    '$schema': 'http://json-schema.org/draft-07/schema',
    'type': 'object',
    'required': ['id'],
    'properties': {
        'id': {
            'type': 'string',
            'pattern': '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
        },
    },
}

便利な Powertools for AWS Lambda (Python) の Validation に関しては前に紹介記事を書いた📝

kakakakakku.hatenablog.com

公式ドキュメントも参照📝

docs.powertools.aws.dev

検証環境

今回は AWS SAM を使って Amazon API Gateway (REST API) と AWS Lambda 関数を構築する.Amazon API Gateway に POST リクエストを送ってバリデーションロジックの確認をした❗️

動作確認

👾 template.yaml

AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31

Resources:
  Function:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: powertools-validation-uuid
      CodeUri: src/
      Handler: app.lambda_handler
      Runtime: python3.12
      Architectures:
        - x86_64
      Layers:
        - arn:aws:lambda:ap-northeast-1:017000801446:layer:AWSLambdaPowertoolsPythonV2:73
      Events:
        Api:
          Type: Api
          Properties:
            Path: /
            Method: POST

👾 app.py

今回はサンプルとしてパラメータが正常であれば 200 OK を返して,異常であれば 400 BAD_REQUEST を返すようにした.

import json
from http import HTTPStatus

from aws_lambda_powertools.utilities.validation import SchemaValidationError, envelopes, validate

SCHEMA = {
    '$schema': 'http://json-schema.org/draft-07/schema',
    'type': 'object',
    'required': ['id'],
    'properties': {
        'id': {
            'type': 'string',
            'pattern': '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
        },
    },
}


def main(event):
    try:
        validate(event=event, schema=SCHEMA, envelope=envelopes.API_GATEWAY_REST)
    except SchemaValidationError as e:
        return {
            'statusCode': HTTPStatus.BAD_REQUEST,
            'body': json.dumps({'message': e.validation_message}),
        }

    return {
        'statusCode': HTTPStatus.OK,
        'body': json.dumps({'message': 'ok'}),
    }


def lambda_handler(event, context):
    return main(event)


if __name__ == '__main__':
    with open('./events/event.json', 'r') as f:
        event = json.load(f)

    response = main(event)
    print(response['body'])

動作確認

ok

event.json

{
    "id": "92380de4-cfe1-4b9c-9a67-2928dcba10e0"
}

UUID Generator で取得した UUID を id プロパティに指定すると ok になった🙆‍♂

$ curl -s -X POST --data @event.json ${ENDPOINT}
{"message": "ok"}

data.id must match pattern ^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$

event.json

{
    "id": "92380de4-cfe1-4b9c-9a67-2928dcba10e00"
}

UUID 以外のフォーマット(1文字余分に追加した)で id プロパティを指定するとバリデーションエラーになった🙅‍♂️

$ curl -s -X POST --data @event.json ${ENDPOINT}
{"message": "data.id must match pattern ^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"}

JSON Schema 2019-09

ちなみに JSON Schema 自体はビルトインフォーマットとして uuid をサポートしているけど,Powertools for AWS Lambda (Python) の Validation で 'format': 'uuid' とスキーマを実装すると Unknown format: uuid というエラーが出てしまう🔥

json-schema.org

実は uuid フォーマットは JSON Schema 2019-09(1つ前は JSON Schema draft-07)でサポートされていて,Powertools for AWS Lambda (Python) の Validation が依存している fastjsonschema (Fast JSON schema for Python)draft-04draft-06draft-07 のみをサポートしているという背景があって現状では使えなかった📝

horejsek.github.io

boto3 Config で Amazon S3 Transfer Acceleration エンドポイントを使えるようにする

Amazon S3 で Transfer Acceleration を有効化すると,エッジロケーションを活用してオブジェクトを高速にアップロード・ダウンロードできるようになる.そして bucketname.s3-accelerate.amazonaws.com というエンドポイントが追加される👀

aws.amazon.com

docs.aws.amazon.com

boto3 x Amazon S3 Transfer Acceleration

Python (boto3) で Amazon S3 Transfer Acceleration エンドポイントを使う場合は Boto3 Config の use_accelerate_endpoint で設定できる👌今回は検証も兼ねて Config 設定の動作確認をしてみた.

boto3.amazonaws.com

準備

自宅(東京)から動作確認をするため,今回は us-east-1(バージニア)に Amazon S3 バケットを作った.そして aws s3api put-bucket-accelerate-configuration コマンドで Amazon S3 Transfer Acceleration を有効化した.あと mkfile 50m 50mb コマンドで 50MB のダミーファイルを作っておく📁

$ aws s3 mb s3://transfer-acceleration-sandbox --region us-east-1
$ aws s3api put-bucket-accelerate-configuration --bucket transfer-acceleration-sandbox --accelerate-configuration Status=Enabled --region us-east-1

1. Config 設定なし

今回はエンドポイントを確認するため botocore のデバッグログを出力できるようにしておく👌

import time
import uuid

import boto3
import botocore

botocore.session.Session().set_debug_logger()

session = boto3.Session(profile_name='xxxxx', region_name='us-east-1')

s3_client = session.client(
    's3',
)

start = time.time()
s3_client.upload_file('50mb', 'transfer-acceleration-sandbox', f'50mb-{uuid.uuid4().hex}')
end = time.time()

print(f'{end - start} seconds')

実行すると s3.amazonaws.com エンドポイントにリクエストを送信していた🛜

Sending http request:
(中略)
url=https://transfer-acceleration-sandbox.s3.amazonaws.com/50mb-a3637f080158476fa0333dc838d700d7?uploadId=xxx
(中略)

計3回実行したアップロード時間は以下だった💡

  • 35.75226593017578 seconds
  • 36.11813402175903 seconds
  • 42.113038063049316 seconds

2. Config 設定あり

今度は Config で use_accelerate_endpoint を設定する❗️

import time
import uuid

import boto3
import botocore

botocore.session.Session().set_debug_logger()

session = boto3.Session(profile_name='xxxxx', region_name='us-east-1')

s3_client = session.client(
    's3',
    config=boto3.session.Config(
        s3={
            'use_accelerate_endpoint': True,
        },
    ),
)

start = time.time()
s3_client.upload_file('50mb', 'transfer-acceleration-sandbox', f'50mb-{uuid.uuid4().hex}')
end = time.time()

print(f'{end - start} seconds')

実行すると今度は s3-accelerate.amazonaws.com エンドポイントにリクエストを送信していた🛜

Sending http request:
(中略)
url=https://transfer-acceleration-sandbox.s3-accelerate.amazonaws.com/50mb-8b5a312964c94d5a8e803655e1aa6792?uploadId=xxx
(中略)

計3回実行したアップロード時間は以下だった💡速くなってる〜 \( 'ω')/

  • 14.759350061416626 seconds
  • 12.844246625900269 seconds
  • 13.640271663665771 seconds

LocalStack を使って CloudWatch Logs サブスクリプションフィルタをローカル環境で試す

Amazon CloudWatch Logs サブスクリプションフィルタを使ってログを AWS Lambda に流す構成を AWS アカウントにデプロイする前に LocalStack にデプロイして確認してみた❗

docs.aws.amazon.com

LocalStack は Amazon CloudWatch Logs サブスクリプションフィルタもサポートしている👌しかし注意点はあって,サブスクリプションフィルタに設定するフィルタパターン(JSON フィルタ・正規表現フィルタなど)は LocalStack Pro でサポートされている💡よって,今回はフィルタなし(すべてのログを流す)で試す.

docs.localstack.cloud

あくまでサンプルとして以下の構成を LocalStack を使ってローカル環境にデプロイする.

サンプルコード

👾 template.yaml(AWS SAM テンプレート)

今回は以下のように AWS SAM テンプレートを実装した📝AWS Lambda 関数は log-senderlog-receiver の2つを準備して,log-sender の Amazon CloudWatch Logs ロググループにサブスクリプションフィルタを設定してある👌

AWSTemplateFormatVersion: 2010-09-09
Transform: AWS::Serverless-2016-10-31

Resources:
  LogSenderFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: log-sender
      CodeUri: src/
      Handler: log-sender.lambda_handler
      Runtime: python3.12
      Architectures:
        - x86_64
  LogSenderLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /aws/lambda/log-sender
  LogSenderSubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      LogGroupName: !Ref LogSenderLogGroup
      FilterPattern: ""
      DestinationArn: !GetAtt LogReceiverFunction.Arn
  LogReceiverFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: log-receiver
      CodeUri: src/
      Handler: log-receiver.lambda_handler
      Runtime: python3.12
      Architectures:
        - x86_64

👾 log-sender.py

log-sender ではシンプルに {"id": "27a1cc30-b4c4-4192-9db9-d19962fe8f33", "message": "sample message"} のように UUID と固定メッセージを含んだ構造化ログ (JSON) を出力する👌

import json
import uuid


def main():
    print(
        json.dumps(
            {
                'id': str(uuid.uuid4()),
                'message': 'sample message',
            }
        )
    )


def lambda_handler(event, context):
    main()


if __name__ == '__main__':
    main()

👾 log-receiver.py

log-receiver はサブスクリプションフィルタから流れてきたログをそのままログに出力する.サブスクリプションフィルタ経由だとログは GZIP 圧縮と Base64 エンコードで変換された状態になるけど,今回は Powertools for AWS Lambda (Python) の Event Source Data Classes で CloudWatchLogsEventCloudWatchLogsDecodedData を使ってお手軽に実装した👏 便利〜 \( 'ω')/

docs.powertools.aws.dev

import json

from aws_lambda_powertools.utilities.data_classes import CloudWatchLogsEvent
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData


def main(event):
    event = CloudWatchLogsEvent(event)
    decompressed: CloudWatchLogsDecodedData = event.parse_logs_data()
    logs = decompressed.log_events
    for log in logs:
        print(log.message)


def lambda_handler(event, context):
    main(event)


if __name__ == '__main__':
    with open('./events/event.json', 'r') as f:
        event = json.load(f)

    main(event)

ちなみに @event_source(data_class=CloudWatchLogsEvent) のようにデコレータを使えば event = CloudWatchLogsEvent(event) という値の詰め直しは不要になるけど,Lambda コンテキストに依存していてローカル開発がしにくく採用しなかった.前に紹介した Powertools for AWS Lambda (Python) の Validation でも Lambda コンテキスト依存を避ける実装を紹介していたりする💡

kakakakakku.hatenablog.com

動作確認

samlocal コマンドでビルド・デプロイをして,awslocal コマンドで AWS Lambda 関数を実行した.

$ samlocal build
$ samlocal deploy

$ awslocal lambda invoke --function-name log-sender outfile

LocalStack の Amazon CloudWatch Logs を確認すると,期待通りに log-senderlog-receiver どちらにも同じログが出ていた👌

log-sender

{"id": "27a1cc30-b4c4-4192-9db9-d19962fe8f33", "message": "sample message"}

log-receiver

{"id": "27a1cc30-b4c4-4192-9db9-d19962fe8f33", "message": "sample message"}

localstack-utils: 単体テスト実行時に使い捨て可能な LocalStack を起動しよう

LocalStack から公式に提供されている localstack-utils を使うと,pytest など Python で単体テストを実行するときに一時的な(使い捨て可能な)LocalStack 環境を起動できる🌍

docs.localstack.cloud

ちなみに僕は普段仕事で testcontainers-pythonLocalStackContainer を使ってて最高に便利なんだけど,結果的に localstack-utils もほとんど同じように使うことができた👌

kakakakakku.hatenablog.com

現在最新は localstack-utils 1.0.1 だった.

github.com

pypi.org

サンプルコード

👾 app.py

まずは Amazon DynamoDB の Forum テーブルからアイテムを取得する search_forum() 関数を今回のテスト対象とする.Forum テーブルというのは Amazon DynamoDB のドキュメントに載っているサンプルでそのまま使うことにした.

そして,boto3 client は環境変数 ENV によって3種類作れるようにしてある👌

  • local: ローカル開発用 (LocalStack)
  • test: テスト用 (localstack-utils / LocalStack)
  • その他: 実際の AWS アカウント

ちなみに localstack-utils のデフォルト設定では 4566 ポートで LocalStack を起動する.それだとローカル開発用の LocalStack とポートが競合してしまうため,今回は 14566 ポートで起動することにした.

import os

import boto3

TABLE_NAME = 'Forum'

if os.environ['ENV'] == 'local':
    dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:4566')
elif os.environ['ENV'] == 'test':
    dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:14566')
else:
    dynamodb = boto3.client('dynamodb')


def search_forum(name):
    return dynamodb.get_item(
        TableName=TABLE_NAME,
        Key={
            'Name': {'S': name},
        },
    )

👾 test_app.py

pytest 実行時に呼び出すフィクスチャとして @pytest.fixture デコレータで _setup() 関数を実装した.localstack-utils には LocalStack を起動する startup_localstack() 関数と LocalStack を停止する stop_localstack() 関数が実装されているため,それを _setup() 関数内で実行している.14566 ポートで起動する設定もしてある👌

そして,Amazon DynamoDB テーブル Forum を作って,サンプルデータ(アイテム)を1つ登録している.最後にテストケースとしては search_forum() 関数を呼び出して「アイテムを取得できる場合」「アイテムを取得できない場合」を確認している✔️

import boto3
import pytest
from app import search_forum
from localstack_utils.localstack import startup_localstack, stop_localstack

TABLE_NAME = 'Forum'


@pytest.fixture(scope='module', autouse=True)
def _setup():
    startup_localstack(gateway_listen='0.0.0.0:14566')

    dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:14566')

    dynamodb.create_table(
        TableName=TABLE_NAME,
        KeySchema=[
            {
                'AttributeName': 'Name',
                'KeyType': 'HASH',
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'Name',
                'AttributeType': 'S',
            }
        ],
        BillingMode='PAY_PER_REQUEST',
    )

    item = {
        'Name': {'S': 'Amazon DynamoDB'},
        'Category': {'S': 'Amazon Web Services'},
        'Threads': {'N': '2'},
        'Messages': {'N': '4'},
        'Views': {'N': '1000'},
    }

    dynamodb.put_item(TableName=TABLE_NAME, Item=item)

    yield

    stop_localstack()


def test_search_forum():
    item = search_forum('Amazon DynamoDB')
    assert item['Item']['Category']['S'] == 'Amazon Web Services'
    assert item['Item']['Views']['N'] == '1000'

    item = search_forum('Amazon S3')
    assert 'Item' not in item

動作確認

期待通り実行できた👏

$ ENV=test pytest -p no:warnings
=========================================================================================== test session starts ============================================================================================
(中略)
configfile: pyproject.toml
collected 1 item

tests/test_app.py .                                                                                                                                                                                  [100%]

============================================================================================ 1 passed in 4.97s =============================================================================================

詳しくはコード参照

localstack-utils にはドキュメントというドキュメントはなく,オプションなど詳しくは GitHub のコードを確認する必要がある.例えば今回は startup_localstack() 関数に gateway_listen を設定したけど,他にも image_name / tag / pro なども設定できる👌

github.com

まとめ

localstack-utils を使って Python で単体テストを実行するときに一時的な(使い捨て可能な)LocalStack 環境を活用しよう❗️