AWS CDK で Amazon SQS x Amazon EventBridge Pipes x AWS Step Functions の構成を設定する流れは前にまとめた📝
前にまとめた設定では Amazon SQS キューに登録したメッセージをデフォルト設定のまま Amazon EventBridge Pipes 経由で AWS Step Functions に流しているけど,実際に使ってみると Amazon SQS のメッセージ形式のまま AWS Step Functions に流れてくるため,AWS Step Functions 側でインプットの取り回しがしにくく微妙に使いにくいことに気付く💨
具体例
例えば以下の JSON を Amazon SQS キューに登録する.
{ "key1": "value1", "key2": "value2", "key3": "value3" }
AWS Step Functions の入力としては以下のような JSON が流れてくる(一部の値は書き換えた).AWS Step Functions 側では body
に含まれている JSON のみで十分ということも多いと思う💡
[ { "messageId": "0f6ddca6-8fc2-45e3-8c51-226eca45dbb0", "receiptHandle": "xxxxx", "body": "{\n \"key1\": \"value1\",\n \"key2\": \"value2\",\n \"key3\": \"value3\"\n}", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1709209227935", "SenderId": "xxxxx", "ApproximateFirstReceiveTimestamp": "1709209227937" }, "messageAttributes": {}, "md5OfBody": "b4fc128c9cb169639ef7083a9a4d78dd", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:ap-northeast-1:000000000000:sandbox-cdk-sqs-pipes-stepfunctions-queue", "awsRegion": "ap-northeast-1" } ]
ターゲット入力トランスフォーマーを使う
そんなときは Amazon EventBridge Pipes で「ターゲット入力トランスフォーマー (Target Input Transformer)」を設定すれば OK👌今回は Amazon SQS キューに登録したメッセージの body
のみ AWS Step Functions に流したいため,以下のように設定する❗️
マネジメントコンソールなら
{ "body": <$.body> }
AWS CDK なら
CfnPipe
の inputTemplate
に設定する.AWS CDK コード全体は AWS CDK で Amazon EventBridge Pipes(SQS ソース・Step Functions ターゲット)を設定する - kakakakakku blog 参照📝
new aws_pipes.CfnPipe(this, 'SandboxCdkPipes', { name: 'sandbox-cdk-sqs-pipes-stepfunctions-pipes', roleArn: pipeRole.roleArn, source: queue.queueArn, target: stateMachine.stateMachineArn, targetParameters: { inputTemplate: '{ "body": <$.body> }', stepFunctionStateMachineParameters: { invocationType: 'FIRE_AND_FORGET', } } });
実行結果
「ターゲット入力トランスフォーマー」を設定してもう1度 Amazon SQS キューに同じ JSON を登録すると,以下のように body
に含まれている JSON のみ AWS Step Functions に流せるようになった \( 'ω')/
[ { "body": { "key1": "value1", "key2": "value2", "key3": "value3" } } ]
ちなみに AWS Step Functions 側で JSON 配列になっているのは Amazon EventBridge Pipes の仕様で,ドキュメントには Lambda または Step Functions エンリッチメントまたはターゲットの場合、バッチサイズが 1 であっても、バッチは JSON 配列としてターゲットに配信されます。
と書いてある.これはハマりポイントの1つかも💨