🐈

Kinesis Data Firehoseで受け取ったレコードをLambdaでフィルタリングする

2024/02/18に公開

Kinesis Data Firehoseにおいて、データソースから受け取ったレコードをフィルタリングしたい場合があると思います。

CloudWatch Logsのサブスクリプションフィルターを使っている場合には、あらかじめ送信するデータのみFirehoseに渡されるとは思いますが、今回はFirehoseに設定するデータ変換Lambdaにおいてフィルタリングする方法を紹介します。

※情報にたどり着くまでに時間がかかったため、備忘も込めて・・・。。

結論

データ変換用のLambdaで受け取った各レコードのパラメータのうち、resultを編集してreturnする。

Ok :Firehoseの送信先に送信する
Dropped:Firehoseの送信先に送信しない(=データをフィルタ可能)

{
    "recordId": "<recordId from the Lambda input>",
    "result": "Ok",
    "data": "<Base64 encoded Transformed data>"
},
{
    "recordId": "<recordId from the Lambda input>",
    "result": "Dropped",
    "data": "<Base64 encoded Transformed data>"
}

以下、公式ドキュメント。

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation.html#data-transformation-status-model

データ変換用Lambdaで受け取る/返すレコードについて

クラスメソッドさんの以下の記事が大変参考になりました。
https://dev.classmethod.jp/articles/memorandum-kinesis-firehose-lambda-convert/#toc-2

  • 基本的に使用するのは、recordsの中身
    • 特にrecordIdはresponseで使用する
  • recordsの各要素のdataは各データをBase64エンコードしたもの

使用例 -AWS WAFのログをフィルタする-

今回はAWS WAFのログについて、BLOCKした際のログのみをFirehoseの送信先に送信するようにLambdaを実装します。

WAFのログについて

AWS WAFのログですが、ログフィールドでWAFによるアクションの判別が可能です。
BLOCKの場合はactionフィールドに「BLOCK」と記録されます。
※COUNTなどを判別したい場合はnonTerminatingMatchingRulesの中のactionを参照する必要があります。この辺りは別途書ければ書きたいと思います。
詳細は公式ドキュメントを参照ください。
https://docs.aws.amazon.com/ja_jp/waf/latest/developerguide/logging-fields.html

また、WAFのログの例についても公式ドキュメントを参照ください。
https://docs.aws.amazon.com/ja_jp/waf/latest/developerguide/logging-examples.html

データ変換用のLambdaについて

Lambdaは「Process records sent to an Amazon Data Firehose streams」というブループリントを使用します。

修正前のコード(ブループリント)は以下の通りです。

import base64

print('Loading function')

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

ブループリントを元に、AWS WAFのログのうち、actionに記録されているWAFアクションを判断してフィルタする処理を加えます。
実装したコードは以下の通りです。
check_waf_actionという関数を定義し、AWS WAFのログアクションを判断、resultのパラメータを返しています。

import base64
import json

print('Loading function')

# WAFのアクションを確認する関数
def check_waf_action(payload):
    # dict型に変換
    waf_log = json.loads(payload)
    # actionを判定
    # BLOCKの場合はOk => 送信
    # それ以外はDropped => 非送信
    if(waf_log['action'] == 'BLOCK'):
        return 'Ok'
    return 'Dropped'

def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')

        # Do custom processing on the payload here
        # check_waf_action関数を呼び出す
        result = check_waf_action(payload)

        output_record = {
            'recordId': record['recordId'],
            'result': result,
            'data': record['data']
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}

おわりに

Kinesis Data Firehoseに設定できるデータ変換Lambdaにおいて、レコードをフィルタリングする処理についてまとめました。

本記事がどなたかの参考になれば幸いです。

参考

https://docs.aws.amazon.com/ja_jp/firehose/latest/dev/data-transformation.html#data-transformation-status-model
https://dev.classmethod.jp/articles/memorandum-kinesis-firehose-lambda-convert/
https://docs.aws.amazon.com/ja_jp/waf/latest/developerguide/logging-fields.html
https://docs.aws.amazon.com/ja_jp/waf/latest/developerguide/logging-examples.html

Discussion