kakakakakku blog

Weekly Tech Blog: Keep on Learning!

AWS CDK で Amazon EventBridge Pipes(SQS ソース・Lambda エンリッチメント・ECS ターゲット)を設定する

AWS CDK で Amazon EventBridge Pipes を設定する場合,AWS CloudFormation に沿った L1 ConstructCfnPipeを使う必要がある.今回はソース(Amazon SQS キュー)・エンリッチメント(AWS Lambda 関数)・ターゲット(Amazon ECS タスク)で Amazon EventBridge Pipes を構成する👌

docs.aws.amazon.com

AWS CDK で試す

L1 Construct なので AWS CloudFormation の AWS::Pipes::Pipe のドキュメントも参考にしながら進めた.

docs.aws.amazon.com

👾 sandbox-cdk-sqs-pipes-ecs-stack.ts

実装を簡単にするため,以下のリソースは別途作ったリソースを参照している💡また実行する Amazon ECS タスクはサンプルとして hello-world を使っている🐳

  • AWS Lambda 関数
  • Amazon VPC
  • Amazon ECS クラスタ
import {
  Duration,
  Stack,
  StackProps,
  aws_ec2,
  aws_ecs,
  aws_iam,
  aws_lambda,
  aws_logs,
  aws_pipes,
  aws_sqs,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class SandboxCdkSqsPipesEcsStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const queue = new aws_sqs.Queue(this, 'SandboxCdkQueue', {
      queueName: 'sandbox-cdk-sqs-pipes-ecs-queue',
      visibilityTimeout: Duration.seconds(30),
      receiveMessageWaitTime: Duration.seconds(20),
    });

    const lambda = aws_lambda.Function.fromFunctionArn(this, 'SandboxCdkLambdaFunctionHello', 'arn:aws:lambda:ap-northeast-1:111111111111:function:xxx');

    const vpc = aws_ec2.Vpc.fromLookup(this, 'SandboxCdkVpc', {
      vpcId: 'vpc-00000000000000000',
    });

    const ecsCluster = aws_ecs.Cluster.fromClusterArn(this, 'SandboxCdkEcsCluster', 'arn:aws:ecs:ap-northeast-1:111111111111:cluster/xxx');

    const ecsTaskDefinition = new aws_ecs.FargateTaskDefinition(this, 'SandboxCdkEcsTaskDefinition', {
      cpu: 256,
      memoryLimitMiB: 512,
    });

    const logGroup = new aws_logs.LogGroup(this, 'SandboxCdkLogGroup', {
      logGroupName: 'sandbox-cdk-sqs-pipes-ecs-log-group',
      retention: aws_logs.RetentionDays.ONE_WEEK,
    });

    ecsTaskDefinition.addContainer('defaultContainer', {
      image: aws_ecs.ContainerImage.fromRegistry('hello-world'),
      logging: aws_ecs.LogDriver.awsLogs({
        logGroup: logGroup,
        streamPrefix: '/hello-world/',
      })
    });

    const pipeRole = new aws_iam.Role(this, 'SandboxCdkPipesRole', {
      roleName: 'sandbox-cdk-sqs-pipes-ecs-role',
      assumedBy: new aws_iam.ServicePrincipal('pipes.amazonaws.com')
    });

    queue.grantConsumeMessages(pipeRole)
    ecsTaskDefinition.grantRun(pipeRole)
    lambda.grantInvoke(pipeRole)

    new aws_pipes.CfnPipe(this, 'SandboxCdkPipes', {
      name: 'sandbox-cdk-sqs-pipes-ecs-pipes',
      roleArn: pipeRole.roleArn,
      source: queue.queueArn,
      enrichment: lambda.functionArn,
      target: ecsCluster.clusterArn,
      targetParameters: {
        ecsTaskParameters: {
          launchType: aws_ecs.LaunchType.FARGATE,
          taskDefinitionArn: ecsTaskDefinition.taskDefinitionArn,
          taskCount: 1,
          networkConfiguration: {
            awsvpcConfiguration: {
              subnets: vpc.selectSubnets({ subnetType: aws_ec2.SubnetType.PUBLIC }).subnetIds,
              assignPublicIp: 'ENABLED',
            },
          },
        }
      },
    });
  }
}

結果

以下のように期待通り Amazon EventBridge Pipes を設定できた❗️また動作確認として Amazon SQS キューにメッセージを送信すると,最終的に Amazon ECS タスクが実行された👌

Amazon EventBridge Pipes 画面

ポイント(ソース・エンリッチメント・ターゲット)

Amazon SQS キューを Amazon EventBridge Pipes の「ソース」に設定する場合は source に ARN を設定する.細かくパラメータを設定する場合は SourceParameterssqsQueueParameters を使う.

docs.aws.amazon.com

AWS Lambda 関数を Amazon EventBridge Pipes の「エンリッチメント(強化)」に設定する場合は enrichment に ARN を設定する.細かくパラメータを設定する場合は enrichmentParameters を使う.

docs.aws.amazon.com

Amazon ECS タスクを Amazon EventBridge Pipes の「ターゲット」に設定する場合は target に ARN を設定する.細かくパラメータを設定する場合は targetParametersecsTaskParameters を使う.ecsTaskParameters の設定項目は多いけど,今回は必要最低限で launchType / taskDefinitionArn / taskCount / networkConfiguration のみ設定した.

docs.aws.amazon.com

ポイント(実行ロール)

Amazon EventBridge Pipes の実行ロールに Amazon SQS キューの読み取り権限・AWS Lambda 関数の実行権限・Amazon ECS タスクの実行権限を与える必要がある.最小権限を考えるとポリシーの設定が大変だけど grantConsumeMessages(grantee) / grantRun(grantee) / grantInvoke(grantee) を使えば簡単に設定できる.AWS CDK の grant は本当に便利だ〜💡

docs.aws.amazon.com

docs.aws.amazon.com

docs.aws.amazon.com