swastudy’s diary

AWSに関することは楽しみに書いてます。

Amazon SQSとLambdaを組み合わせたメッセージの効率的な処理

Amazon SQSキュー内のメッセージを処理する際に、Lambda 関数を使用することがよくあるパターンです。Lambdaがメッセージを複数回処理しないようにするには、二つの方法があります。

  • 関数レスポンスにバッチアイテムの失敗を含めるようにイベントソースマッピングを設定します。

  • Amazon SQS APIアクションのDeleteMessageを使用します。

それぞれのメリットとデメリットを確認しましょう。

それぞれのメリットとデメリットを確認しましょう。

Amazon SQS APIの DeleteMessage

Lambdaの処理でエラーメッセージをキューに残したまま、処理済みメッセージを SQS から正常に削除することが必要になります。2021年末まで、LambdaのコードでDeleteMessageを使って個別なメッセージを削除するかまたバッチ削除するのがよく見られるパターンです。しかし、この方法はSQSのAPIのコストがかかります。

イベントソースマッピングの設定

2021年末から、関数レスポンスにバッチアイテム(ReportBatchItemFailures)の失敗を含めるようにイベントソースマッピングを設定して、コストも削減できます。

失敗したバッチ内の正常に処理されたメッセージを再処理しないようにするために、失敗したメッセージのみを再び表示するようにイベントソースマッピングを設定できます。これを部分的なバッチレスポンスと呼びます。部分的なバッチレスポンスをオンにするには、イベントソースマッピングを設定するときに FunctionResponseTypes アクション用に ReportBatchItemFailures を指定します。そうすると、関数が部分的な成功を返すようになるため、レコードでの不必要な再試行回数を減らすことができます。

例えば、CDKで設定します。

new lambdaEventSources.SqsEventSource(this.queue, {
        //他の設定
        reportBatchItemFailures: true,
    });

Python を使用した Lambda での SQS バッチアイテム失敗のレポートします。

def lambda_handler(event, context):
    if event:
        batch_item_failures = []
        
    for record in event["Records"]:
        try:
            #メッセージ処理
        except Exception as e:
            batch_item_failures.append({"itemIdentifier": record['messageId']})

    return {"batchItemFailures": batch_item_failures}

参考

同じ SQS メッセージについて Lambda 関数が再試行されないようにする | AWS re:Post Amazon SQS での Lambda の使用 - AWS Lambda