血と汗となみだを流す

個人の思ったこと、やったことの吐き出し口です。

Amazon S3 の PUT イベントを EventBridge -> Kinesis Data Firehose -> OpenSearch Service に入れてみる

Amazon S3 にオブジェクトを配置したときに発生する PUT イベントを OpenSearch Service に入れてみます。

Architecture

Amazon EventBridge の[ルール]-[イベントパターン]は以下です。

{
  "source": ["aws.s3"],
  "detail-type": ["Object Access Tier Changed", "Object ACL Updated", "Object Created", "Object Deleted", "Object Restore Completed", "Object Restore Expired", "Object Restore Initiated", "Object Storage Class Changed", "Object Tags Added", "Object Tags Deleted"],
  "detail": {
    "bucket": {
      "name": ["(バケット名)"]
    }
  }
}

ターゲットを Kinesis Data Firehose に指定します。
Kinesis Data Firehose のターゲットは OpenSearch Service を指定します。設定はすべてデフォルトです。
index 名は「kinesis」にしています。

S3 にオブジェクトを配置 (Object Created) したときに、OpenSearch Service に格納された PUT イベントデータは以下の形式でした。

{
        "_index" : "kinesis",
        "_id" : "49641493248204684799282029079614073971816170911334137858.0",
        "_score" : 1.0,
        "_source" : {
          "version" : "0",
          "id" : "062098df-c30a-c277-4d09-b16931dad59d",
          "detail-type" : "Object Created",
          "source" : "aws.s3",
          "account" : "999999999999",
          "time" : "2023-06-07T15:00:50Z",
          "region" : "us-east-1",
          "resources" : [
            "arn:aws:s3:::(バケット名)"
          ],
          "detail" : {
            "version" : "0",
            "bucket" : {
              "name" : "(バケット名)"
            },
            "object" : {
              "key" : "logs/yomple6.log",
              "size" : 202,
              "etag" : "3bde6fa2a35bca388769f10379523b26",
              "sequencer" : "0064809BA2121BBCA8"
            },
            "request-id" : "0FQ6PZCKNVCH78XD",
            "requester" : "999999999999",
            "source-ip-address" : "xxx.xxx.xxx.xxx",
            "reason" : "PutObject"
          }
        }
      }

ちなみに、PUT イベントから Lambda 関数をトリガーし出力した内容は以下でした。

{
  "Records": [{
        "eventVersion": "2.1",
        "eventSource": "aws:s3",
        "awsRegion": "us-east-1",
        "eventTime": "2023-06-07T02:15:25.358Z",
        "eventName": "ObjectCreated:Put",
        "userIdentity": {
            "principalId": "AWS:Axxxxxxxxxx"
        },
        "requestParameters": {
            "sourceIPAddress": "xxx.xxx.xxx.xxx"
        },
        "responseElements": {
            "x-amz-request-id": "ZN37J4JFE5YSXBKX",
            "x-amz-id-2": "QU+jgk1+c6TKKu9F3G2XDJL4VLhOgvV+Dg8ay7aMeEjKDN2lHfrR5TNeqDvqFgr/Mbm2AN8gEH4C89nPydRPSo2DR3qU0pZs"
        },
        "s3": {
            "s3SchemaVersion": "1.0",
            "configurationId": "a7c88a0e-e501-4910-b624-9312d2c6e5bf",
            "bucket": {
                "name": "(バケット名)",
                "ownerIdentity": {
                    "principalId": "A2YKLWBP9TAV5B"
                },
                "arn": "arn:aws:s3:::(バケット名)"
            },
            "object": {
                "key": "logs/sample.log",
                "size": 131,
                "eTag": "afa66f2c812defe614f69845dbea43a0",
                "sequencer": "00647FE83D4E98C147"
            }
        }
    }]
}

出力した Lambda (Python) の中身は以下です。

import json
from logging import getLogger, INFO

logger = getLogger(__name__)
logger.setLevel(INFO)

def lambda_handler(event, context):
    print("============ logger.info の出力 ============")
    logger.info(json.dumps(event))

    return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "hello world",
        }),
    }

イベントデータの中身がどのようなデータ形式になるかの参考になればと思います。

プライバシーポリシー